From 6985efb6f9f0ce233f2dea85ca85b0b3c95a60c7 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 13 Mar 2024 13:43:43 +0100 Subject: [PATCH 01/10] Added ResetStopwatch functionality to IDiagnosticContext --- Tapeti.Flow/Default/FlowBindingMiddleware.cs | 5 ++++- Tapeti.Flow/Default/FlowProvider.cs | 4 ++-- Tapeti.Serilog/Default/DiagnosticContext.cs | 19 ++++++++++++++++++- Tapeti.Serilog/IDiagnosticContext.cs | 12 ++++++++++-- .../MessageHandlerLoggingMessageMiddleware.cs | 6 +++--- 5 files changed, 37 insertions(+), 9 deletions(-) diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 6f6a887..c635f88 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -148,7 +148,10 @@ namespace Tapeti.Flow.Default private static ValueTask HandleParallelResponse(IMessageContext context) { - if (context.TryGet(out var flowPayload) && flowPayload.FlowIsConverging) + if (!context.TryGet(out var flowPayload)) + return default; + + if (flowPayload.FlowIsConverging) return default; var flowHandler = context.Config.DependencyResolver.Resolve(); diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 62e3176..4d14bf9 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -72,7 +72,7 @@ namespace Tapeti.Flow.Default internal async Task PrepareRequest(FlowContext context, ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName = null, bool convergeMethodTaskSync = false) + string? convergeMethodName = null, bool convergeMethodTaskSync = false) { if (!context.HasFlowStateAndLock) { @@ -101,7 +101,7 @@ namespace Tapeti.Flow.Default internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName = null, bool convergeMethodTaskSync = false) + string? convergeMethodName = null, bool convergeMethodTaskSync = false) { var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync); await context.Store(responseHandlerInfo.IsDurableQueue); diff --git a/Tapeti.Serilog/Default/DiagnosticContext.cs b/Tapeti.Serilog/Default/DiagnosticContext.cs index 0f7f291..3f7905a 100644 --- a/Tapeti.Serilog/Default/DiagnosticContext.cs +++ b/Tapeti.Serilog/Default/DiagnosticContext.cs @@ -1,4 +1,6 @@ using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; using Serilog.Core; using Serilog.Events; @@ -10,16 +12,20 @@ namespace Tapeti.Serilog.Default public class DiagnosticContext : IDiagnosticContext { private readonly global::Serilog.ILogger logger; + private readonly Stopwatch stopwatch; private readonly List properties = new(); + private int resetCount; /// /// Creates a new instance of a DiagnosticContext /// /// The Serilog ILogger which will be enriched - public DiagnosticContext(global::Serilog.ILogger logger) + /// The Stopwatch instance that monitors the run time of the message handler + public DiagnosticContext(global::Serilog.ILogger logger, Stopwatch stopwatch) { this.logger = logger; + this.stopwatch = stopwatch; } @@ -31,6 +37,17 @@ namespace Tapeti.Serilog.Default } + /// + public void ResetStopwatch(bool addToContext = true, string propertyNamePrefix = "stopwatchReset") + { + var newResetCount = Interlocked.Increment(ref resetCount); + if (addToContext) + Set(propertyNamePrefix + newResetCount, stopwatch.ElapsedMilliseconds); + + stopwatch.Restart(); + } + + /// /// Returns a Serilog ILogger which is enriched with the properties set by this DiagnosticContext /// diff --git a/Tapeti.Serilog/IDiagnosticContext.cs b/Tapeti.Serilog/IDiagnosticContext.cs index 544c395..cc0833f 100644 --- a/Tapeti.Serilog/IDiagnosticContext.cs +++ b/Tapeti.Serilog/IDiagnosticContext.cs @@ -5,8 +5,7 @@ /// MessageHandlerLogging middleware. /// /// - /// This is a one-to-one copy of the IDiagnosticContext in Serilog.Extensions.Hosting which - /// saves a reference to that package while allowing similar usage within Tapeti message handlers. + /// Similar to IDiagnosticContext in Serilog.Extensions.Hosting but slightly extended. /// public interface IDiagnosticContext { @@ -19,5 +18,14 @@ /// If true, the value will be serialized as structured /// data if possible; if false, the object will be recorded as a scalar or simple array. void Set(string propertyName, object value, bool destructureObjects = false); + + /// + /// Resets the timer which is used to monitor how long a message handler takes to complete. + /// Useful for example when a message handler is throttled by a rate limiter in the message + /// handler method and you want to measure only the time taken after it is allowed to start. + /// + /// If true, the time taken until this reset is added to this diagnostic context as an incrementally named property for logging purposes. The value will be the time in milliseconds. + /// The prefix for the property name(s) when addToContext is true. The number of times ResetStopwatch is called will be appended (stopwatchReset1, stopwatchReset2, etc). + void ResetStopwatch(bool addToContext = true, string propertyNamePrefix = "stopwatchReset"); } } diff --git a/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs b/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs index 1043039..04905de 100644 --- a/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs +++ b/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs @@ -32,11 +32,11 @@ namespace Tapeti.Serilog.Middleware public async ValueTask Handle(IMessageContext context, Func next) { var logger = context.Config.DependencyResolver.Resolve(); - - var diagnosticContext = new DiagnosticContext(logger); + + var stopwatch = new Stopwatch(); + var diagnosticContext = new DiagnosticContext(logger, stopwatch); context.Store(new DiagnosticContextPayload(diagnosticContext)); - var stopwatch = new Stopwatch(); stopwatch.Start(); await next(); From b6bff2a2b85b2f1504b4fa23c0963db009d95771 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 13 Mar 2024 14:06:18 +0100 Subject: [PATCH 02/10] Fixed Tapeti.Annotations reference to stable version --- Tapeti/Tapeti.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 664d75d..62ffb60 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -34,6 +34,6 @@ - + From aa48b4bce3cc9da2397b06a6e2169fe2ab8aa50f Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 27 Mar 2024 14:17:40 +0100 Subject: [PATCH 03/10] Bump RabbitMQ.Client to 6.8.1 Might fix a timeout issue. See issue 1355 on rabbitmq-dotnet-client Github. If it turns out to be another issue, upgrading the library is good either way. --- Tapeti/Tapeti.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 62ffb60..15ef0ff 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -22,7 +22,7 @@ - + From db9e95772638680dae9d693c75dc5e6e96fc16ed Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 8 Apr 2024 14:19:16 +0200 Subject: [PATCH 04/10] Changed to Async consumer with ConsumerDispatchConcurrency --- Tapeti/Connection/TapetiBasicConsumer.cs | 25 ++++++++++-------------- Tapeti/Connection/TapetiClient.cs | 6 +++++- Tapeti/Helpers/ConnectionstringParser.cs | 1 + Tapeti/TapetiConnectionParams.cs | 14 ++++++++++++- 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs index 42e3181..757292d 100644 --- a/Tapeti/Connection/TapetiBasicConsumer.cs +++ b/Tapeti/Connection/TapetiBasicConsumer.cs @@ -17,7 +17,7 @@ namespace Tapeti.Connection /// /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// - internal class TapetiBasicConsumer : DefaultBasicConsumer + internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer { private readonly IConsumer consumer; private readonly long connectionReference; @@ -34,7 +34,7 @@ namespace Tapeti.Connection /// - public override void HandleBasicDeliver(string consumerTag, + public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, @@ -50,20 +50,15 @@ namespace Tapeti.Connection // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 var bodyArray = body.ToArray(); - // Changing to AsyncDefaultBasicConsumer does not mean HandleBasicDeliver runs in parallel, the Task.Run would - // still be necessary, which is why TapetiBasicConsumer is a DefaultBasicConsumer. - Task.Run(async () => + try { - try - { - var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); - await onRespond(connectionReference, deliveryTag, response); - } - catch - { - await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); - } - }); + var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray).ConfigureAwait(false); + await onRespond(connectionReference, deliveryTag, response).ConfigureAwait(false); + } + catch + { + await onRespond(connectionReference, deliveryTag, ConsumeResult.Error).ConfigureAwait(false); + } } } } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 1c4b016..9ea0694 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -777,9 +777,13 @@ namespace Tapeti.Connection Password = connectionParams.Password, AutomaticRecoveryEnabled = false, TopologyRecoveryEnabled = false, - RequestedHeartbeat = TimeSpan.FromSeconds(30) + RequestedHeartbeat = TimeSpan.FromSeconds(30), + DispatchConsumersAsync = true }; + if (connectionParams.ConsumerDispatchConcurrency > 0) + connectionFactory.ConsumerDispatchConcurrency = connectionParams.ConsumerDispatchConcurrency; + if (connectionParams.ClientProperties != null) foreach (var pair in connectionParams.ClientProperties) { diff --git a/Tapeti/Helpers/ConnectionstringParser.cs b/Tapeti/Helpers/ConnectionstringParser.cs index c872a79..4445b7e 100644 --- a/Tapeti/Helpers/ConnectionstringParser.cs +++ b/Tapeti/Helpers/ConnectionstringParser.cs @@ -126,6 +126,7 @@ namespace Tapeti.Helpers case "password": result.Password = value; break; case "prefetchcount": result.PrefetchCount = ushort.Parse(value); break; case "managementport": result.ManagementPort = int.Parse(value); break; + case "consumerDispatchConcurrency": result.ConsumerDispatchConcurrency = int.Parse(value); break; } } } diff --git a/Tapeti/TapetiConnectionParams.cs b/Tapeti/TapetiConnectionParams.cs index 43f923f..5e8e50e 100644 --- a/Tapeti/TapetiConnectionParams.cs +++ b/Tapeti/TapetiConnectionParams.cs @@ -50,6 +50,17 @@ namespace Tapeti /// public int ManagementPort { get; set; } = 15672; + /// + /// The maximum number of consumers which are run concurrently. + /// + /// + /// The number of consumers is usually roughly equal to the number of queues consumed. + /// Do not set too high to avoid overloading the thread pool. + /// The RabbitMQ Client library defaults to 1. Due to older Tapeti versions implementing concurrency + /// effectively limited by the PrefetchCount, this will default to Environment.ProcessorCount instead. + /// + public int ConsumerDispatchConcurrency { get; set; } + /// /// Key-value pair of properties that are set on the connection. These will be visible in the RabbitMQ Management interface. /// Note that you can either set a new dictionary entirely, to allow for inline declaration, or use this property directly @@ -69,6 +80,7 @@ namespace Tapeti /// public TapetiConnectionParams() { + ConsumerDispatchConcurrency = Environment.ProcessorCount; } /// @@ -77,7 +89,7 @@ namespace Tapeti /// new TapetiConnectionParams(new Uri("amqp://username:password@hostname/")) /// new TapetiConnectionParams(new Uri("amqp://username:password@hostname:5672/virtualHost")) /// - public TapetiConnectionParams(Uri uri) + public TapetiConnectionParams(Uri uri) : this() { HostName = uri.Host; VirtualHost = string.IsNullOrEmpty(uri.AbsolutePath) ? "/" : uri.AbsolutePath; From a0c9d9769446ff9e43dc9f1adebfafbcb835b152 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 8 Apr 2024 14:20:15 +0200 Subject: [PATCH 05/10] Added ConfigureAwait to (hopefully) all awaits Ugly as heck, but it's recommended for libraries --- .../SqlConnectionFlowRepository.cs | 98 +++++++++---------- Tapeti.Flow.SQL/SqlRetryHelper.cs | 8 +- Tapeti.Flow/Default/DelegateYieldPoint.cs | 2 +- Tapeti.Flow/Default/FlowBindingMiddleware.cs | 18 ++-- .../Default/FlowContinuationMiddleware.cs | 20 ++-- Tapeti.Flow/Default/FlowProvider.cs | 32 +++--- Tapeti.Flow/Default/FlowStarter.cs | 12 +-- Tapeti.Flow/Default/FlowStore.cs | 12 +-- .../MessageHandlerLoggingMessageMiddleware.cs | 2 +- Tapeti.Transient/TransientGenericBinding.cs | 2 +- Tapeti.Transient/TransientPublisher.cs | 2 +- Tapeti.Transient/TransientRouter.cs | 4 +- Tapeti/Connection/TapetiChannel.cs | 4 +- Tapeti/Connection/TapetiClient.cs | 52 +++++----- Tapeti/Connection/TapetiConsumer.cs | 18 ++-- Tapeti/Connection/TapetiPublisher.cs | 20 ++-- Tapeti/Connection/TapetiSubscriber.cs | 48 ++++----- Tapeti/Default/ControllerMethodBinding.cs | 24 ++--- Tapeti/Default/MessageContext.cs | 4 +- Tapeti/Default/PublishResultBinding.cs | 12 +-- Tapeti/Default/ResponseFilterMiddleware.cs | 2 +- Tapeti/Helpers/MiddlewareHelper.cs | 8 +- Tapeti/TapetiConnection.cs | 22 +++-- 23 files changed, 215 insertions(+), 211 deletions(-) diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs index 8a387bf..829ebc3 100644 --- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs +++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs @@ -44,27 +44,26 @@ namespace Tapeti.Flow.SQL { return await SqlRetryHelper.Execute(async () => { - using (var connection = await GetConnection()) + using var connection = await GetConnection().ConfigureAwait(false); + + var flowQuery = new SqlCommand($"select FlowID, CreationTime, StateJson from {tableName}", connection); + var flowReader = await flowQuery.ExecuteReaderAsync().ConfigureAwait(false); + + var result = new List>(); + + while (await flowReader.ReadAsync().ConfigureAwait(false)) { - var flowQuery = new SqlCommand($"select FlowID, CreationTime, StateJson from {tableName}", connection); - var flowReader = await flowQuery.ExecuteReaderAsync(); + var flowID = flowReader.GetGuid(0); + var creationTime = flowReader.GetDateTime(1); + var stateJson = flowReader.GetString(2); - var result = new List>(); - - while (await flowReader.ReadAsync()) - { - var flowID = flowReader.GetGuid(0); - var creationTime = flowReader.GetDateTime(1); - var stateJson = flowReader.GetString(2); - - var state = JsonConvert.DeserializeObject(stateJson); - if (state != null) - result.Add(new FlowRecord(flowID, creationTime, state)); - } - - return result; + var state = JsonConvert.DeserializeObject(stateJson); + if (state != null) + result.Add(new FlowRecord(flowID, creationTime, state)); } - }); + + return result; + }).ConfigureAwait(false); } /// @@ -72,23 +71,22 @@ namespace Tapeti.Flow.SQL { await SqlRetryHelper.Execute(async () => { - using (var connection = await GetConnection()) - { - var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" + - "values (@FlowID, @StateJson, @CreationTime)", - connection); + using var connection = await GetConnection().ConfigureAwait(false); - var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); - var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar); - var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2); + var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" + + "values (@FlowID, @StateJson, @CreationTime)", + connection); - flowIDParam.Value = flowID; - stateJsonParam.Value = JsonConvert.SerializeObject(state); - creationTimeParam.Value = timestamp; + var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); + var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar); + var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2); - await query.ExecuteNonQueryAsync(); - } - }); + flowIDParam.Value = flowID; + stateJsonParam.Value = JsonConvert.SerializeObject(state); + creationTimeParam.Value = timestamp; + + await query.ExecuteNonQueryAsync().ConfigureAwait(false); + }).ConfigureAwait(false); } /// @@ -96,19 +94,18 @@ namespace Tapeti.Flow.SQL { await SqlRetryHelper.Execute(async () => { - using (var connection = await GetConnection()) - { - var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection); + using var connection = await GetConnection().ConfigureAwait(false); + + var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection); - var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); - var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar); + var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); + var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar); - flowIDParam.Value = flowID; - stateJsonParam.Value = JsonConvert.SerializeObject(state); + flowIDParam.Value = flowID; + stateJsonParam.Value = JsonConvert.SerializeObject(state); - await query.ExecuteNonQueryAsync(); - } - }); + await query.ExecuteNonQueryAsync().ConfigureAwait(false); + }).ConfigureAwait(false); } /// @@ -116,23 +113,22 @@ namespace Tapeti.Flow.SQL { await SqlRetryHelper.Execute(async () => { - using (var connection = await GetConnection()) - { - var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection); + using var connection = await GetConnection().ConfigureAwait(false); - var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); - flowIDParam.Value = flowID; + var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection); - await query.ExecuteNonQueryAsync(); - } - }); + var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); + flowIDParam.Value = flowID; + + await query.ExecuteNonQueryAsync().ConfigureAwait(false); + }).ConfigureAwait(false); } private async Task GetConnection() { var connection = new SqlConnection(connectionString); - await connection.OpenAsync(); + await connection.OpenAsync().ConfigureAwait(false); return connection; } diff --git a/Tapeti.Flow.SQL/SqlRetryHelper.cs b/Tapeti.Flow.SQL/SqlRetryHelper.cs index b38b498..9a377f8 100644 --- a/Tapeti.Flow.SQL/SqlRetryHelper.cs +++ b/Tapeti.Flow.SQL/SqlRetryHelper.cs @@ -27,14 +27,14 @@ namespace Tapeti.Flow.SQL { try { - await callback(); + await callback().ConfigureAwait(false); break; } catch (SqlException e) { if (SqlExceptionHelper.IsTransientError(e)) { - await Task.Delay(ExponentialBackoff[retryAttempt]); + await Task.Delay(ExponentialBackoff[retryAttempt]).ConfigureAwait(false); if (retryAttempt < ExponentialBackoff.Length - 1) retryAttempt++; } @@ -51,8 +51,8 @@ namespace Tapeti.Flow.SQL await Execute(async () => { - returnValue = await callback(); - }); + returnValue = await callback().ConfigureAwait(false); + }).ConfigureAwait(false); return returnValue!; } diff --git a/Tapeti.Flow/Default/DelegateYieldPoint.cs b/Tapeti.Flow/Default/DelegateYieldPoint.cs index dc1aa35..2285487 100644 --- a/Tapeti.Flow/Default/DelegateYieldPoint.cs +++ b/Tapeti.Flow/Default/DelegateYieldPoint.cs @@ -16,7 +16,7 @@ namespace Tapeti.Flow.Default public async Task Execute(FlowContext context) { - await onExecute(context); + await onExecute(context).ConfigureAwait(false); } } } diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index c635f88..763254e 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -61,8 +61,8 @@ namespace Tapeti.Flow.Default if (value == null) throw new InvalidOperationException("Return value should be a Task, not null"); - await (Task)value; - await HandleParallelResponse(messageContext); + await ((Task)value).ConfigureAwait(false); + await HandleParallelResponse(messageContext).ConfigureAwait(false); }); } else if (context.Result.Info.ParameterType == typeof(ValueTask)) @@ -73,8 +73,8 @@ namespace Tapeti.Flow.Default // ValueTask is a struct and should never be null throw new UnreachableException("Return value should be a ValueTask, not null"); - await (ValueTask)value; - await HandleParallelResponse(messageContext); + await ((ValueTask)value).ConfigureAwait(false); + await HandleParallelResponse(messageContext).ConfigureAwait(false); }); } else if (context.Result.Info.ParameterType == typeof(void)) @@ -116,8 +116,8 @@ namespace Tapeti.Flow.Default if (value == null) throw new InvalidOperationException("Return value should be a Task, not null"); - var yieldPoint = await (Task)value; - await HandleYieldPoint(messageContext, yieldPoint); + var yieldPoint = await ((Task)value).ConfigureAwait(false); + await HandleYieldPoint(messageContext, yieldPoint).ConfigureAwait(false); }); break; @@ -128,8 +128,8 @@ namespace Tapeti.Flow.Default // ValueTask is a struct and should never be null throw new UnreachableException("Return value should be a ValueTask, not null"); - var yieldPoint = await (ValueTask)value; - await HandleYieldPoint(messageContext, yieldPoint); + var yieldPoint = await ((ValueTask)value).ConfigureAwait(false); + await HandleYieldPoint(messageContext, yieldPoint).ConfigureAwait(false); }); break; @@ -159,7 +159,7 @@ namespace Tapeti.Flow.Default { // IFlowParallelRequest.AddRequest will store the flow immediately if (!flowPayload.FlowContext.IsStoredOrDeleted()) - await flowContext.Store(context.Binding.QueueType == QueueType.Durable); + await flowContext.Store(context.Binding.QueueType == QueueType.Durable).ConfigureAwait(false); })); } diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index ebd4fc6..875157f 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -16,14 +16,14 @@ namespace Tapeti.Flow.Default if (!context.TryGet(out var controllerPayload)) return; - var flowContext = await EnrichWithFlowContext(context); + var flowContext = await EnrichWithFlowContext(context).ConfigureAwait(false); if (flowContext?.ContinuationMetadata == null) return; if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method)) return; - await next(); + await next().ConfigureAwait(false); } @@ -44,22 +44,22 @@ namespace Tapeti.Flow.Default // Remove Continuation now because the IYieldPoint result handler will store the new state flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); - await next(); + await next().ConfigureAwait(false); if (flowPayload.FlowIsConverging) { var flowHandler = flowContext.HandlerContext.Config.DependencyResolver.Resolve(); - await flowHandler.Converge(new FlowHandlerContext(context)); + await flowHandler.Converge(new FlowHandlerContext(context)).ConfigureAwait(false); } } else - await next(); + await next().ConfigureAwait(false); } public async ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult, Func next) { - await next(); + await next().ConfigureAwait(false); if (!context.TryGet(out var controllerPayload)) return; @@ -78,7 +78,7 @@ namespace Tapeti.Flow.Default if (!flowContext.IsStoredOrDeleted()) // The exception strategy can set the consume result to Success. Instead, check if the yield point // was handled. The flow provider ensures we only end up here in case of an exception. - await flowContext.FlowStateLock.DeleteFlowState(); + await flowContext.FlowStateLock.DeleteFlowState().ConfigureAwait(false); flowContext.FlowStateLock.Dispose(); } @@ -100,13 +100,13 @@ namespace Tapeti.Flow.Default var flowStore = context.Config.DependencyResolver.Resolve(); - var flowID = await flowStore.FindFlowID(continuationID); + var flowID = await flowStore.FindFlowID(continuationID).ConfigureAwait(false); if (!flowID.HasValue) return null; - var flowStateLock = await flowStore.LockFlowState(flowID.Value); + var flowStateLock = await flowStore.LockFlowState(flowID.Value).ConfigureAwait(false); - var flowState = await flowStateLock.GetFlowState(); + var flowState = await flowStateLock.GetFlowState().ConfigureAwait(false); if (flowState == null) return null; diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 4d14bf9..b686f1a 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -76,7 +76,7 @@ namespace Tapeti.Flow.Default { if (!context.HasFlowStateAndLock) { - await CreateNewFlowState(context); + await CreateNewFlowState(context).ConfigureAwait(false); Debug.Assert(context.FlowState != null, "context.FlowState != null"); } @@ -103,10 +103,10 @@ namespace Tapeti.Flow.Default internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, string? convergeMethodName = null, bool convergeMethodTaskSync = false) { - var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync); - await context.Store(responseHandlerInfo.IsDurableQueue); + var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync).ConfigureAwait(false); + await context.Store(responseHandlerInfo.IsDurableQueue).ConfigureAwait(false); - await publisher.Publish(message, properties, true); + await publisher.Publish(message, properties, true).ConfigureAwait(false); } @@ -129,17 +129,17 @@ namespace Tapeti.Flow.Default // TODO disallow if replyto is not specified? if (reply.ReplyTo != null) - await publisher.PublishDirect(message, reply.ReplyTo, properties, reply.Mandatory); + await publisher.PublishDirect(message, reply.ReplyTo, properties, reply.Mandatory).ConfigureAwait(false); else - await publisher.Publish(message, properties, reply.Mandatory); + await publisher.Publish(message, properties, reply.Mandatory).ConfigureAwait(false); - await context.Delete(); + await context.Delete().ConfigureAwait(false); } internal static async Task EndFlow(FlowContext context) { - await context.Delete(); + await context.Delete().ConfigureAwait(false); if (context is { HasFlowStateAndLock: true, FlowState.Metadata.Reply: { } }) throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); @@ -194,7 +194,7 @@ namespace Tapeti.Flow.Default var flowStore = flowContext.HandlerContext.Config.DependencyResolver.Resolve(); var flowID = Guid.NewGuid(); - var flowStateLock = await flowStore.LockFlowState(flowID); + var flowStateLock = await flowStore.LockFlowState(flowID).ConfigureAwait(false); if (flowStateLock == null) throw new InvalidOperationException("Unable to lock a new flow"); @@ -232,7 +232,7 @@ namespace Tapeti.Flow.Default try { - await executableYieldPoint.Execute(flowContext); + await executableYieldPoint.Execute(flowContext).ConfigureAwait(false); } catch (YieldPointException e) { @@ -272,7 +272,7 @@ namespace Tapeti.Flow.Default if (flowContext.ContinuationMetadata.ConvergeMethodName == null) throw new InvalidOperationException("Missing ConvergeMethodName in FlowContext ContinuationMetadata"); - await Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync); + await Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync).ConfigureAwait(false); })); } @@ -305,13 +305,13 @@ namespace Tapeti.Flow.Default if (yieldPointTask == null) throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}"); - yieldPoint = await (Task)yieldPointTask; + yieldPoint = await ((Task)yieldPointTask).ConfigureAwait(false); } if (yieldPoint == null) throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}"); - await Execute(flowContext.HandlerContext, yieldPoint); + await Execute(flowContext.HandlerContext, yieldPoint).ConfigureAwait(false); } @@ -424,13 +424,13 @@ namespace Tapeti.Flow.Default context, requestInfo.ResponseHandlerInfo, convergeMethod.Method.Name, - convergeMethodSync); + convergeMethodSync).ConfigureAwait(false); preparedRequests.Add(new PreparedRequest(requestInfo.Message, properties)); } - await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); - await Task.WhenAll(preparedRequests.Select(r => publisher.Publish(r.Message, r.Properties, true))); + await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)).ConfigureAwait(false); + await Task.WhenAll(preparedRequests.Select(r => publisher.Publish(r.Message, r.Properties, true))).ConfigureAwait(false); }); } } diff --git a/Tapeti.Flow/Default/FlowStarter.cs b/Tapeti.Flow/Default/FlowStarter.cs index b8952cc..b9c4520 100644 --- a/Tapeti.Flow/Default/FlowStarter.cs +++ b/Tapeti.Flow/Default/FlowStarter.cs @@ -25,25 +25,25 @@ namespace Tapeti.Flow.Default /// public async Task Start(Expression>> methodSelector) where TController : class { - await CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), Array.Empty()); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), Array.Empty()).ConfigureAwait(false); } /// public async Task Start(Expression>>> methodSelector) where TController : class { - await CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, Array.Empty()); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, Array.Empty()).ConfigureAwait(false); } /// public async Task Start(Expression>> methodSelector, TParameter parameter) where TController : class { - await CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object?[] {parameter}); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object?[] {parameter}).ConfigureAwait(false); } /// public async Task Start(Expression>>> methodSelector, TParameter parameter) where TController : class { - await CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, new object?[] {parameter}); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, new object?[] {parameter}).ConfigureAwait(false); } @@ -54,12 +54,12 @@ namespace Tapeti.Flow.Default if (result == null) throw new InvalidOperationException($"Method {method.Name} must return an IYieldPoint or Task, got null"); - var yieldPoint = await getYieldPointResult(result); + var yieldPoint = await getYieldPointResult(result).ConfigureAwait(false); var context = new FlowHandlerContext(config, controller, method); var flowHandler = config.DependencyResolver.Resolve(); - await flowHandler.Execute(context, yieldPoint); + await flowHandler.Execute(context, yieldPoint).ConfigureAwait(false); } diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index 6a7306c..3e18a54 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -64,7 +64,7 @@ namespace Tapeti.Flow.Default validatedMethods = new HashSet(); try { - foreach (var flowStateRecord in await repository.GetStates()) + foreach (var flowStateRecord in await repository.GetStates().ConfigureAwait(false)) { flowStates.TryAdd(flowStateRecord.FlowID, new CachedFlowState(flowStateRecord.FlowState, flowStateRecord.CreationTime, true)); @@ -134,7 +134,7 @@ namespace Tapeti.Flow.Default inUse = true; - var flowStatelock = new FlowStateLock(this, flowID, await locks.GetLock(flowID)); + var flowStatelock = new FlowStateLock(this, flowID, await locks.GetLock(flowID).ConfigureAwait(false)); return flowStatelock; } @@ -215,18 +215,18 @@ namespace Tapeti.Flow.Default // Storing the flowstate in the underlying repository if (isNew) { - await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, cachedFlowState.CreationTime); + await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, cachedFlowState.CreationTime).ConfigureAwait(false); } else { - await owner.repository.UpdateState(FlowID, cachedFlowState.FlowState); + await owner.repository.UpdateState(FlowID, cachedFlowState.FlowState).ConfigureAwait(false); } } else if (wasPersistent) { // We transitioned from a durable queue to a dynamic queue, // remove the persistent state but keep the in-memory version - await owner.repository.DeleteState(FlowID); + await owner.repository.DeleteState(FlowID).ConfigureAwait(false); } } @@ -244,7 +244,7 @@ namespace Tapeti.Flow.Default cachedFlowState = null; if (removedFlowState is { IsPersistent: true }) - await owner.repository.DeleteState(FlowID); + await owner.repository.DeleteState(FlowID).ConfigureAwait(false); } } } diff --git a/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs b/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs index 04905de..98ee489 100644 --- a/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs +++ b/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs @@ -39,7 +39,7 @@ namespace Tapeti.Serilog.Middleware stopwatch.Start(); - await next(); + await next().ConfigureAwait(false); stopwatch.Stop(); diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs index 10c633f..14e7056 100644 --- a/Tapeti.Transient/TransientGenericBinding.cs +++ b/Tapeti.Transient/TransientGenericBinding.cs @@ -32,7 +32,7 @@ namespace Tapeti.Transient /// public async ValueTask Apply(IBindingTarget target) { - QueueName = await target.BindDynamicDirect(dynamicQueuePrefix, null); + QueueName = await target.BindDynamicDirect(dynamicQueuePrefix, null).ConfigureAwait(false); router.TransientResponseQueueName = QueueName; } diff --git a/Tapeti.Transient/TransientPublisher.cs b/Tapeti.Transient/TransientPublisher.cs index 25880f7..561995c 100644 --- a/Tapeti.Transient/TransientPublisher.cs +++ b/Tapeti.Transient/TransientPublisher.cs @@ -23,7 +23,7 @@ namespace Tapeti.Transient /// public async Task RequestResponse(TRequest request) where TRequest : class where TResponse : class { - return (TResponse)await router.RequestResponse(publisher, request); + return (TResponse)await router.RequestResponse(publisher, request).ConfigureAwait(false); } } } diff --git a/Tapeti.Transient/TransientRouter.cs b/Tapeti.Transient/TransientRouter.cs index 5425e4c..c7c60fc 100644 --- a/Tapeti.Transient/TransientRouter.cs +++ b/Tapeti.Transient/TransientRouter.cs @@ -71,7 +71,7 @@ namespace Tapeti.Transient Persistent = false }; - await ((IInternalPublisher)publisher).Publish(request, properties, false); + await ((IInternalPublisher)publisher).Publish(request, properties, false).ConfigureAwait(false); } catch (Exception) { @@ -84,7 +84,7 @@ namespace Tapeti.Transient await using (new Timer(TimeoutResponse, tcs, defaultTimeoutMs, -1)) { - return await tcs.Task; + return await tcs.Task.ConfigureAwait(false); } } diff --git a/Tapeti/Connection/TapetiChannel.cs b/Tapeti/Connection/TapetiChannel.cs index da75767..1ec9c87 100644 --- a/Tapeti/Connection/TapetiChannel.cs +++ b/Tapeti/Connection/TapetiChannel.cs @@ -45,7 +45,7 @@ namespace Tapeti.Connection if (capturedTaskQueue == null) return; - await capturedTaskQueue.Add(() => { }); + await capturedTaskQueue.Add(() => { }).ConfigureAwait(false); capturedTaskQueue.Dispose(); } @@ -74,7 +74,7 @@ namespace Tapeti.Connection { return GetTaskQueue().Add(async () => { - await operation(modelProvider); + await operation(modelProvider).ConfigureAwait(false); }); } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 9ea0694..3818238 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -175,7 +175,7 @@ namespace Tapeti.Connection var delayCancellationTokenSource = new CancellationTokenSource(); var signalledTask = await Task.WhenAny( publishResultTask, - Task.Delay(MandatoryReturnTimeout, delayCancellationTokenSource.Token)); + Task.Delay(MandatoryReturnTimeout, delayCancellationTokenSource.Token)).ConfigureAwait(false); if (signalledTask != publishResultTask) throw new TimeoutException( @@ -201,7 +201,7 @@ namespace Tapeti.Connection throw new NoRouteException( $"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' could not be delivered, reply code: {replyCode}"); } - }); + }).ConfigureAwait(false); } @@ -226,7 +226,7 @@ namespace Tapeti.Connection capturedConnectionReference = Interlocked.Read(ref connectionReference); var basicConsumer = new TapetiBasicConsumer(consumer, capturedConnectionReference, Respond); consumerTag = channel.BasicConsume(queueName, false, basicConsumer); - }); + }).ConfigureAwait(false); return consumerTag == null ? null @@ -257,7 +257,7 @@ namespace Tapeti.Connection return; channel.BasicCancel(consumerTag.ConsumerTag); - }); + }).ConfigureAwait(false); } @@ -291,13 +291,13 @@ namespace Tapeti.Connection default: throw new ArgumentOutOfRangeException(nameof(result), result, null); } - }); + }).ConfigureAwait(false); } private async Task GetDurableQueueDeclareRequired(string queueName, IRabbitMQArguments? arguments) { - var existingQueue = await GetQueueInfo(queueName); + var existingQueue = await GetQueueInfo(queueName).ConfigureAwait(false); if (existingQueue == null) return true; @@ -342,9 +342,9 @@ namespace Tapeti.Connection /// public async Task DurableQueueDeclare(string queueName, IEnumerable bindings, IRabbitMQArguments? arguments, CancellationToken cancellationToken) { - var declareRequired = await GetDurableQueueDeclareRequired(queueName, arguments); + var declareRequired = await GetDurableQueueDeclareRequired(queueName, arguments).ConfigureAwait(false); - var existingBindings = (await GetQueueBindings(queueName)).ToList(); + var existingBindings = (await GetQueueBindings(queueName).ConfigureAwait(false)).ToList(); var currentBindings = bindings.ToList(); var bindingLogger = logger as IBindingLogger; @@ -371,7 +371,7 @@ namespace Tapeti.Connection bindingLogger?.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey); channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey); } - }); + }).ConfigureAwait(false); } @@ -386,7 +386,7 @@ namespace Tapeti.Connection /// public async Task DurableQueueVerify(string queueName, IRabbitMQArguments? arguments, CancellationToken cancellationToken) { - if (!await GetDurableQueueDeclareRequired(queueName, arguments)) + if (!await GetDurableQueueDeclareRequired(queueName, arguments).ConfigureAwait(false)) return; await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => @@ -396,7 +396,7 @@ namespace Tapeti.Connection (logger as IBindingLogger)?.QueueDeclare(queueName, true, true); channel.QueueDeclarePassive(queueName); - }); + }).ConfigureAwait(false); } @@ -413,7 +413,7 @@ namespace Tapeti.Connection return; deletedMessages = channel.QueueDelete(queueName); - }); + }).ConfigureAwait(false); deletedQueues.Add(queueName); (logger as IBindingLogger)?.QueueObsolete(queueName, true, deletedMessages); @@ -434,7 +434,7 @@ namespace Tapeti.Connection // Get queue information from the Management API, since the AMQP operations will // throw an error if the queue does not exist or still contains messages and resets // the connection. The resulting reconnect will cause subscribers to reset. - var queueInfo = await GetQueueInfo(queueName); + var queueInfo = await GetQueueInfo(queueName).ConfigureAwait(false); if (queueInfo == null) { deletedQueues.Add(queueName); @@ -467,7 +467,7 @@ namespace Tapeti.Connection else { // Remove all bindings instead - var existingBindings = (await GetQueueBindings(queueName)).ToList(); + var existingBindings = (await GetQueueBindings(queueName).ConfigureAwait(false)).ToList(); if (existingBindings.Count > 0) { @@ -481,7 +481,7 @@ namespace Tapeti.Connection (logger as IBindingLogger)?.QueueObsolete(queueName, false, queueInfo.Messages); } } while (retry); - }); + }).ConfigureAwait(false); } @@ -507,7 +507,7 @@ namespace Tapeti.Connection queueName = channel.QueueDeclare(arguments: GetDeclareArguments(arguments)).QueueName; bindingLogger?.QueueDeclare(queueName, false, false); } - }); + }).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); if (queueName == null) @@ -527,7 +527,7 @@ namespace Tapeti.Connection DeclareExchange(channel, binding.Exchange); (logger as IBindingLogger)?.QueueBind(queueName, false, binding.Exchange, binding.RoutingKey); channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey); - }); + }).ConfigureAwait(false); } @@ -551,8 +551,8 @@ namespace Tapeti.Connection } // Empty the queue - await consumeChannel.Reset(); - await publishChannel.Reset(); + await consumeChannel.Reset().ConfigureAwait(false); + await publishChannel.Reset().ConfigureAwait(false); // No need to close the channels as the connection will be closed capturedConsumeModel?.Dispose(); @@ -619,9 +619,9 @@ namespace Tapeti.Connection response.EnsureSuccessStatusCode(); - var content = await response.Content.ReadAsStringAsync(); + var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false); return JsonConvert.DeserializeObject(content); - }); + }).ConfigureAwait(false); } @@ -659,7 +659,7 @@ namespace Tapeti.Connection { response.EnsureSuccessStatusCode(); - var content = await response.Content.ReadAsStringAsync(); + var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false); var bindings = JsonConvert.DeserializeObject>(content); // Filter out the binding to an empty source, which is always present for direct-to-queue routing @@ -667,7 +667,7 @@ namespace Tapeti.Connection .Where(binding => !string.IsNullOrEmpty(binding.Source) && !string.IsNullOrEmpty(binding.RoutingKey)) .Select(binding => new QueueBinding(binding.Source!, binding.RoutingKey!)) ?? Enumerable.Empty(); - }); + }).ConfigureAwait(false); } @@ -702,8 +702,8 @@ namespace Tapeti.Connection { try { - var response = await managementClient.SendAsync(request); - return await handleResponse(response); + var response = await managementClient.SendAsync(request).ConfigureAwait(false); + return await handleResponse(response).ConfigureAwait(false); } catch (TimeoutException) { @@ -717,7 +717,7 @@ namespace Tapeti.Connection throw; } - await Task.Delay(ExponentialBackoff[retryDelayIndex]); + await Task.Delay(ExponentialBackoff[retryDelayIndex]).ConfigureAwait(false); if (retryDelayIndex < ExponentialBackoff.Length - 1) retryDelayIndex++; diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index fb0ddb8..95238fb 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -60,7 +60,7 @@ namespace Tapeti.Connection Exchange = exchange, RoutingKey = routingKey, Properties = properties - }); + }).ConfigureAwait(false); } catch (Exception dispatchException) { @@ -78,7 +78,7 @@ namespace Tapeti.Connection }; var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException); - await HandleException(exceptionContext); + await HandleException(exceptionContext).ConfigureAwait(false); return exceptionContext.ConsumeResult; } @@ -93,7 +93,7 @@ namespace Tapeti.Connection foreach (var binding in bindings.Where(binding => binding.Accept(messageType))) { - var consumeResult = await InvokeUsingBinding(message, messageContextData, binding); + var consumeResult = await InvokeUsingBinding(message, messageContextData, binding).ConfigureAwait(false); validMessageType = true; if (consumeResult != ConsumeResult.Success) @@ -125,18 +125,18 @@ namespace Tapeti.Connection try { await MiddlewareHelper.GoAsync(config.Middleware.Message, - async (handler, next) => await handler.Handle(context, next), - async () => { await binding.Invoke(context); }); + async (handler, next) => await handler.Handle(context, next).ConfigureAwait(false), + async () => { await binding.Invoke(context).ConfigureAwait(false); }); - await binding.Cleanup(context, ConsumeResult.Success); + await binding.Cleanup(context, ConsumeResult.Success).ConfigureAwait(false); return ConsumeResult.Success; } catch (Exception invokeException) { var exceptionContext = new ExceptionStrategyContext(context, invokeException); - await HandleException(exceptionContext); + await HandleException(exceptionContext).ConfigureAwait(false); - await binding.Cleanup(context, exceptionContext.ConsumeResult); + await binding.Cleanup(context, exceptionContext.ConsumeResult).ConfigureAwait(false); return exceptionContext.ConsumeResult; } } @@ -153,7 +153,7 @@ namespace Tapeti.Connection try { - await exceptionStrategy.HandleException(exceptionContext); + await exceptionStrategy.HandleException(exceptionContext).ConfigureAwait(false); } catch (Exception strategyException) { diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs index 8f7d06b..2e7c1d0 100644 --- a/Tapeti/Connection/TapetiPublisher.cs +++ b/Tapeti/Connection/TapetiPublisher.cs @@ -34,21 +34,21 @@ namespace Tapeti.Connection /// public async Task Publish(object message) { - await Publish(message, null, IsMandatory(message)); + await Publish(message, null, IsMandatory(message)).ConfigureAwait(false); } /// public async Task PublishRequest(TRequest message, Expression>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class { - await PublishRequest(message, responseMethodSelector.Body); + await PublishRequest(message, responseMethodSelector.Body).ConfigureAwait(false); } /// public async Task PublishRequest(TRequest message, Expression>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class { - await PublishRequest(message, responseMethodSelector.Body); + await PublishRequest(message, responseMethodSelector.Body).ConfigureAwait(false); } @@ -87,14 +87,14 @@ namespace Tapeti.Connection ReplyTo = binding.QueueName }; - await Publish(message, properties, IsMandatory(message)); + await Publish(message, properties, IsMandatory(message)).ConfigureAwait(false); } /// public async Task SendToQueue(string queueName, object message) { - await PublishDirect(message, queueName, null, IsMandatory(message)); + await PublishDirect(message, queueName, null, IsMandatory(message)).ConfigureAwait(false); } @@ -105,14 +105,14 @@ namespace Tapeti.Connection var exchange = exchangeStrategy.GetExchange(messageClass); var routingKey = routingKeyStrategy.GetRoutingKey(messageClass); - await Publish(message, properties, exchange, routingKey, mandatory); + await Publish(message, properties, exchange, routingKey, mandatory).ConfigureAwait(false); } /// public async Task PublishDirect(object message, string queueName, IMessageProperties? properties, bool mandatory) { - await Publish(message, properties, null, queueName, mandatory); + await Publish(message, properties, null, queueName, mandatory).ConfigureAwait(false); } @@ -136,12 +136,12 @@ namespace Tapeti.Connection await MiddlewareHelper.GoAsync( config.Middleware.Publish, - async (handler, next) => await handler.Handle(context, next), + async (handler, next) => await handler.Handle(context, next).ConfigureAwait(false), async () => { var body = messageSerializer.Serialize(message, writableProperties); - await clientFactory().Publish(body, writableProperties, exchange, routingKey, mandatory); - }); + await clientFactory().Publish(body, writableProperties, exchange, routingKey, mandatory).ConfigureAwait(false); + }).ConfigureAwait(false); } diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index ad3bab8..df0db5a 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -29,7 +29,7 @@ namespace Tapeti.Connection public async ValueTask DisposeAsync() { if (consuming) - await Stop(); + await Stop().ConfigureAwait(false); } @@ -48,7 +48,7 @@ namespace Tapeti.Connection public async Task ApplyBindings() { initializeCancellationTokenSource = new CancellationTokenSource(); - await ApplyBindings(initializeCancellationTokenSource.Token); + await ApplyBindings(initializeCancellationTokenSource.Token).ConfigureAwait(false); } @@ -81,10 +81,10 @@ namespace Tapeti.Connection Task.Run(async () => { - await ApplyBindings(cancellationToken); + await ApplyBindings(cancellationToken).ConfigureAwait(false); if (consuming && !cancellationToken.IsCancellationRequested) - await ConsumeQueues(cancellationToken); + await ConsumeQueues(cancellationToken).ConfigureAwait(false); }, CancellationToken.None); } @@ -98,7 +98,7 @@ namespace Tapeti.Connection consuming = true; initializeCancellationTokenSource = new CancellationTokenSource(); - await ConsumeQueues(initializeCancellationTokenSource.Token); + await ConsumeQueues(initializeCancellationTokenSource.Token).ConfigureAwait(false); } @@ -111,7 +111,7 @@ namespace Tapeti.Connection initializeCancellationTokenSource?.Cancel(); initializeCancellationTokenSource = null; - await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag))); + await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag))).ConfigureAwait(false); consumerTags.Clear(); consuming = false; @@ -133,9 +133,9 @@ namespace Tapeti.Connection bindingTarget = new NoVerifyBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); foreach (var binding in config.Bindings) - await binding.Apply(bindingTarget); + await binding.Apply(bindingTarget).ConfigureAwait(false); - await bindingTarget.Apply(); + await bindingTarget.Apply().ConfigureAwait(false); } @@ -155,8 +155,8 @@ namespace Tapeti.Connection var queueName = group.Key; var consumer = new TapetiConsumer(cancellationToken, config, queueName, group); - return await clientFactory().Consume(queueName, consumer, cancellationToken); - }))) + return await clientFactory().Consume(queueName, consumer, cancellationToken).ConfigureAwait(false); + })).ConfigureAwait(false)) .Where(t => t?.ConsumerTag != null) .Cast()); } @@ -201,14 +201,14 @@ namespace Tapeti.Connection public async ValueTask BindDynamic(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments) { - var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments); + var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments).ConfigureAwait(false); if (!result.IsNewMessageClass) return result.QueueName; var routingKey = RoutingKeyStrategy.GetRoutingKey(messageClass); var exchange = ExchangeStrategy.GetExchange(messageClass); - await ClientFactory().DynamicQueueBind(result.QueueName, new QueueBinding(exchange, routingKey), CancellationToken); + await ClientFactory().DynamicQueueBind(result.QueueName, new QueueBinding(exchange, routingKey), CancellationToken).ConfigureAwait(false); return result.QueueName; } @@ -216,7 +216,7 @@ namespace Tapeti.Connection public async ValueTask BindDynamicDirect(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments) { - var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments); + var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments).ConfigureAwait(false); return result.QueueName; } @@ -225,7 +225,7 @@ namespace Tapeti.Connection { // If we don't know the routing key, always create a new queue to ensure there is no overlap. // Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either. - return await ClientFactory().DynamicQueueDeclare(queuePrefix, arguments, CancellationToken); + return await ClientFactory().DynamicQueueDeclare(queuePrefix, arguments, CancellationToken).ConfigureAwait(false); } @@ -267,7 +267,7 @@ namespace Tapeti.Connection } // Declare a new queue - var queueName = await ClientFactory().DynamicQueueDeclare(queuePrefix, arguments, CancellationToken); + var queueName = await ClientFactory().DynamicQueueDeclare(queuePrefix, arguments, CancellationToken).ConfigureAwait(false); var queueInfo = new DynamicQueueInfo { QueueName = queueName, @@ -363,8 +363,8 @@ namespace Tapeti.Connection public override async Task Apply() { var client = ClientFactory(); - await DeclareQueues(client); - await DeleteObsoleteQueues(client); + await DeclareQueues(client).ConfigureAwait(false); + await DeleteObsoleteQueues(client).ConfigureAwait(false); } @@ -380,8 +380,8 @@ namespace Tapeti.Connection return new QueueBinding(exchange, routingKey); }); - await client.DurableQueueDeclare(queue.Key, bindings, queue.Value.Arguments, CancellationToken); - })); + await client.DurableQueueDeclare(queue.Key, bindings, queue.Value.Arguments, CancellationToken).ConfigureAwait(false); + })).ConfigureAwait(false); } @@ -389,8 +389,8 @@ namespace Tapeti.Connection { await Task.WhenAll(obsoleteDurableQueues.Except(durableQueues.Keys).Select(async queue => { - await client.DurableQueueDelete(queue, true, CancellationToken); - })); + await client.DurableQueueDelete(queue, true, CancellationToken).ConfigureAwait(false); + })).ConfigureAwait(false); } } @@ -407,12 +407,12 @@ namespace Tapeti.Connection public override async ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments) { - await VerifyDurableQueue(queueName, arguments); + await VerifyDurableQueue(queueName, arguments).ConfigureAwait(false); } public override async ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments) { - await VerifyDurableQueue(queueName, arguments); + await VerifyDurableQueue(queueName, arguments).ConfigureAwait(false); } public override ValueTask BindDurableObsolete(string queueName) @@ -426,7 +426,7 @@ namespace Tapeti.Connection if (!durableQueues.Add(queueName)) return; - await ClientFactory().DurableQueueVerify(queueName, arguments, CancellationToken); + await ClientFactory().DurableQueueVerify(queueName, arguments, CancellationToken).ConfigureAwait(false); } } diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index 93ca8c1..0c74764 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -117,10 +117,10 @@ namespace Tapeti.Default { case BindingTargetMode.Default: if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Dynamic) - QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); + QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments).ConfigureAwait(false); else { - await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments); + await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments).ConfigureAwait(false); QueueName = bindingInfo.QueueInfo.Name; } @@ -128,10 +128,10 @@ namespace Tapeti.Default case BindingTargetMode.Direct: if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Dynamic) - QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); + QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments).ConfigureAwait(false); else { - await target.BindDurableDirect(bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments); + await target.BindDurableDirect(bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments).ConfigureAwait(false); QueueName = bindingInfo.QueueInfo.Name; } @@ -143,7 +143,7 @@ namespace Tapeti.Default } else if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Durable) { - await target.BindDurableObsolete(bindingInfo.QueueInfo.Name!); + await target.BindDurableObsolete(bindingInfo.QueueInfo.Name!).ConfigureAwait(false); QueueName = bindingInfo.QueueInfo.Name; } } @@ -165,14 +165,14 @@ namespace Tapeti.Default var controller = Method.IsStatic ? null : dependencyResolver.Resolve(bindingInfo.ControllerType); context.Store(new ControllerMessageContextPayload(controller, (IControllerMethodBinding)context.Binding)); - if (!await FilterAllowed(context)) + if (!await FilterAllowed(context).ConfigureAwait(false)) return; await MiddlewareHelper.GoAsync( bindingInfo.MessageMiddleware, - async (handler, next) => await handler.Handle(context, next), - async () => await messageHandler(context)); + async (handler, next) => await handler.Handle(context, next).ConfigureAwait(false), + async () => await messageHandler(context).ConfigureAwait(false)).ConfigureAwait(false); } @@ -181,8 +181,8 @@ namespace Tapeti.Default { await MiddlewareHelper.GoAsync( bindingInfo.CleanupMiddleware, - async (handler, next) => await handler.Cleanup(context, consumeResult, next), - () => default); + async (handler, next) => await handler.Cleanup(context, consumeResult, next).ConfigureAwait(false), + () => default).ConfigureAwait(false); } @@ -191,12 +191,12 @@ namespace Tapeti.Default var allowed = false; await MiddlewareHelper.GoAsync( bindingInfo.FilterMiddleware, - async (handler, next) => await handler.Filter(context, next), + async (handler, next) => await handler.Filter(context, next).ConfigureAwait(false), () => { allowed = true; return default; - }); + }).ConfigureAwait(false); return allowed; } diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs index 697ed30..b2a0aad 100644 --- a/Tapeti/Default/MessageContext.cs +++ b/Tapeti/Default/MessageContext.cs @@ -79,7 +79,7 @@ namespace Tapeti.Default switch (payload) { case IAsyncDisposable asyncDisposable: - await asyncDisposable.DisposeAsync(); + await asyncDisposable.DisposeAsync().ConfigureAwait(false); break; case IDisposable disposable: @@ -151,7 +151,7 @@ namespace Tapeti.Default foreach (var item in items.Values) { if (item is IAsyncDisposable asyncDisposable) - await asyncDisposable.DisposeAsync(); + await asyncDisposable.DisposeAsync().ConfigureAwait(false); } } } diff --git a/Tapeti/Default/PublishResultBinding.cs b/Tapeti/Default/PublishResultBinding.cs index efdc8a0..8bfde1b 100644 --- a/Tapeti/Default/PublishResultBinding.cs +++ b/Tapeti/Default/PublishResultBinding.cs @@ -75,15 +75,15 @@ namespace Tapeti.Default private static async ValueTask PublishGenericTaskResult(IMessageContext messageContext, object value) where T : class { - var message = await (Task)value; - await Reply(message, messageContext); + var message = await ((Task)value).ConfigureAwait(false); + await Reply(message, messageContext).ConfigureAwait(false); } private static async ValueTask PublishGenericValueTaskResult(IMessageContext messageContext, object value) where T : class { - var message = await (ValueTask)value; - await Reply(message, messageContext); + var message = await ((ValueTask)value).ConfigureAwait(false); + await Reply(message, messageContext).ConfigureAwait(false); } @@ -99,9 +99,9 @@ namespace Tapeti.Default }; if (!string.IsNullOrEmpty(messageContext.Properties.ReplyTo)) - await publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, messageContext.Properties.Persistent.GetValueOrDefault(true)); + await publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, messageContext.Properties.Persistent.GetValueOrDefault(true)).ConfigureAwait(false); else - await publisher.Publish(message, properties, false); + await publisher.Publish(message, properties, false).ConfigureAwait(false); } } } diff --git a/Tapeti/Default/ResponseFilterMiddleware.cs b/Tapeti/Default/ResponseFilterMiddleware.cs index 8d990b6..ed790c2 100644 --- a/Tapeti/Default/ResponseFilterMiddleware.cs +++ b/Tapeti/Default/ResponseFilterMiddleware.cs @@ -31,7 +31,7 @@ namespace Tapeti.Default return; } - await next(); + await next().ConfigureAwait(false); } } } diff --git a/Tapeti/Helpers/MiddlewareHelper.cs b/Tapeti/Helpers/MiddlewareHelper.cs index cdd3446..e33d3fa 100644 --- a/Tapeti/Helpers/MiddlewareHelper.cs +++ b/Tapeti/Helpers/MiddlewareHelper.cs @@ -50,7 +50,7 @@ namespace Tapeti.Helpers var handlerIndex = middleware?.Count - 1 ?? -1; if (middleware == null || handlerIndex == -1) { - await lastHandler(); + await lastHandler().ConfigureAwait(false); return; } @@ -58,12 +58,12 @@ namespace Tapeti.Helpers { handlerIndex--; if (handlerIndex >= 0) - await handle(middleware[handlerIndex], HandleNext); + await handle(middleware[handlerIndex], HandleNext).ConfigureAwait(false); else - await lastHandler(); + await lastHandler().ConfigureAwait(false); } - await handle(middleware[handlerIndex], HandleNext); + await handle(middleware[handlerIndex], HandleNext).ConfigureAwait(false); } } } diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index 8ab7802..8898b3a 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -63,11 +63,11 @@ namespace Tapeti if (subscriber == null) { subscriber = new TapetiSubscriber(() => client.Value, config); - await subscriber.ApplyBindings(); + await subscriber.ApplyBindings().ConfigureAwait(false); } if (startConsuming) - await subscriber.Resume(); + await subscriber.Resume().ConfigureAwait(false); return subscriber; } @@ -91,28 +91,36 @@ namespace Tapeti public async Task Close() { if (client.IsValueCreated) - await client.Value.Close(); + await client.Value.Close().ConfigureAwait(false); } /// public void Dispose() { - if (!disposed) - DisposeAsync().GetAwaiter().GetResult(); + GC.SuppressFinalize(this); + + if (disposed) + return; + + var disposeAsyncTask = DisposeAsync(); + if (!disposeAsyncTask.IsCompleted) + disposeAsyncTask.AsTask().GetAwaiter().GetResult(); } /// public async ValueTask DisposeAsync() { + GC.SuppressFinalize(this); + if (disposed) return; if (subscriber != null) - await subscriber.DisposeAsync(); + await subscriber.DisposeAsync().ConfigureAwait(false); - await Close(); + await Close().ConfigureAwait(false); disposed = true; } From a6f2b24ea04e1bded887734dae8352b5b861471a Mon Sep 17 00:00:00 2001 From: Frederik Date: Wed, 1 May 2024 09:25:13 +0200 Subject: [PATCH 06/10] Added possibility to publish a request-response directly to a queue --- Tapeti.Flow/Default/FlowProvider.cs | 31 ++++++++++++++++++ Tapeti.Flow/IFlowProvider.cs | 49 +++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 62e3176..49df2d2 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -45,6 +45,20 @@ namespace Tapeti.Flow.Default return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } + /// + public IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo)); + } + + /// + public IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo)); + } + /// public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) where TRequest : class where TResponse : class { @@ -52,6 +66,13 @@ namespace Tapeti.Flow.Default return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } + /// + public IYieldPoint YieldWithRequestDirectSync(TRequest message, string queueName, Func responseHandler) where TRequest : class where TResponse : class + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo)); + } + /// public IFlowParallelRequestBuilder YieldWithParallelRequest() { @@ -110,6 +131,16 @@ namespace Tapeti.Flow.Default } + internal async Task SendRequestDirect(FlowContext context, object message, string queueName, ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName = null, bool convergeMethodTaskSync = false) + { + var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync); + await context.Store(responseHandlerInfo.IsDurableQueue); + + await publisher.PublishDirect(message, queueName, properties, true); + } + + private async Task SendResponse(FlowContext context, object message) { var reply = context.HasFlowStateAndLock diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index 613e82d..e1b8fc8 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -35,6 +35,34 @@ namespace Tapeti.Flow IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) where TRequest : class where TResponse : class; + /// + /// Publish a request message directly to a queue and continue the flow when the response arrives. + /// The exchange and routing key are not used. + /// The request message must be marked with the [Request] attribute, and the + /// Response type must match. Used for asynchronous response handlers. + /// + /// + /// + /// + /// + /// + IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class; + + + /// + /// Publish a request message directly to a queue and continue the flow when the response arrives. + /// The exchange and routing key are not used. + /// The request message must be marked with the [Request] attribute, and the + /// Response type must match. Used for asynchronous response handlers. + /// + /// + /// + /// + /// + /// + IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class; + + /// /// Publish a request message and continue the flow when the response arrives. /// The request message must be marked with the [Request] attribute, and the @@ -54,6 +82,27 @@ namespace Tapeti.Flow IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) where TRequest : class where TResponse : class; + /// + /// Publish a request message directly to a queue and continue the flow when the response arrives. + /// The exchange and routing key are not used. + /// The request message must be marked with the [Request] attribute, and the + /// Response type must match. Used for synchronous response handlers. + /// + /// + /// The reason why this requires the extra 'Sync' in the name: one does not simply overload methods + /// with Task vs non-Task Funcs. "Ambiguous call". Apparantly this is because a return type + /// of a method is not part of its signature,according to: + /// http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter + /// + /// + /// + /// + /// + /// + /// + IYieldPoint YieldWithRequestDirectSync(TRequest message, string queueName, Func responseHandler) where TRequest : class where TResponse : class; + + /// /// Create a request builder to publish one or more requests messages. Call Yield on the resulting builder /// to acquire an IYieldPoint. From d4ab9f87d16c684bcacc6a1a6ca4417a00e90431 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 4 Nov 2024 10:42:43 +0100 Subject: [PATCH 07/10] Updated NuGet API key --- appveyor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/appveyor.yml b/appveyor.yml index b3135e6..1a7370a 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -44,7 +44,7 @@ configuration: deploy: - provider: NuGet api_key: - secure: HJ6sQ5J8aQUCalJSppNpuEydKri1AhSLSOXDwM63xKwiTvA462KQnqmBB7gljHA3 + secure: yR7Sj3XoMgWBEj2roujkdErQYgGo22X//FqpCcE4AHQ4i/EyFjqETv1hxC06GCtg skip_symbols: false artifact: /.*(\.|\.s)nupkg/ \ No newline at end of file From d4cc06195577be75404430a63246ab85b871f6d5 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 2 Dec 2024 09:26:12 +0100 Subject: [PATCH 08/10] Added support for delaying feature initialisation --- .readthedocs.yaml | 9 ++ Tapeti/Config/ITapetiConfig.cs | 7 +- Tapeti/Config/ITapetiConfigBuilder.cs | 46 +++++++++-- Tapeti/Connection/TapetiClient.cs | 4 +- Tapeti/Connection/TapetiSubscriber.cs | 4 +- Tapeti/TapetiConfig.cs | 113 +++++++++++++++++++++----- 6 files changed, 151 insertions(+), 32 deletions(-) create mode 100644 .readthedocs.yaml diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..9db0418 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,9 @@ +version 2 + +build + os ubuntu-22.04 + tools + python 3.12 + +sphinx + configuration docs/conf.py \ No newline at end of file diff --git a/Tapeti/Config/ITapetiConfig.cs b/Tapeti/Config/ITapetiConfig.cs index 4bdad56..27c393e 100644 --- a/Tapeti/Config/ITapetiConfig.cs +++ b/Tapeti/Config/ITapetiConfig.cs @@ -17,7 +17,10 @@ namespace Tapeti.Config /// /// Various Tapeti features which can be turned on or off. /// - ITapetiConfigFeatues Features { get; } + /// + /// Calling this method will freeze the feature set if is used. + /// + ITapetiConfigFeatures GetFeatures(); /// /// Provides access to the different kinds of registered middleware. @@ -34,7 +37,7 @@ namespace Tapeti.Config /// /// Various Tapeti features which can be turned on or off. /// - public interface ITapetiConfigFeatues + public interface ITapetiConfigFeatures { /// /// Determines whether 'publisher confirms' are used. This RabbitMQ features allows Tapeti to diff --git a/Tapeti/Config/ITapetiConfigBuilder.cs b/Tapeti/Config/ITapetiConfigBuilder.cs index 5d517c9..6351293 100644 --- a/Tapeti/Config/ITapetiConfigBuilder.cs +++ b/Tapeti/Config/ITapetiConfigBuilder.cs @@ -53,6 +53,40 @@ namespace Tapeti.Config void RegisterBinding(IBinding binding); + /// + ITapetiConfigBuilder DisablePublisherConfirms(); + + /// + ITapetiConfigBuilder SetPublisherConfirms(bool enabled); + + /// + ITapetiConfigBuilder EnableDeclareDurableQueues(); + + /// + ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled); + + /// + ITapetiConfigBuilder DisableVerifyDurableQueues(); + + /// + ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled); + + + /// + /// Allows the core features to be determine on-demand when first required by the connection instead + /// of before is constructed. + /// + /// Called when the feature set is required. From that moment on the feature set is frozen. + ITapetiConfigBuilder DelayFeatures(Action onBuild); + } + + + + /// + /// Configures Tapeti core features. Every method returns the builder instance for method chaining. + /// + public interface ITapetiConfigFeaturesBuilder + { /// /// Disables 'publisher confirms'. This RabbitMQ features allows Tapeti to be notified if a message /// has no route, and guarantees delivery for request-response style messages and those marked with @@ -62,7 +96,7 @@ namespace Tapeti.Config /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may /// result in never-ending flows. Only disable if you can accept those consequences. /// - ITapetiConfigBuilder DisablePublisherConfirms(); + ITapetiConfigFeaturesBuilder DisablePublisherConfirms(); /// @@ -74,7 +108,7 @@ namespace Tapeti.Config /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may /// result in never-ending flows. Only disable if you can accept those consequences. /// - ITapetiConfigBuilder SetPublisherConfirms(bool enabled); + ITapetiConfigFeaturesBuilder SetPublisherConfirms(bool enabled); /// @@ -84,7 +118,7 @@ namespace Tapeti.Config /// Note that access to the RabbitMQ Management plugin's REST API is required for this /// feature to work, since AMQP does not provide a way to query existing bindings. /// - ITapetiConfigBuilder EnableDeclareDurableQueues(); + ITapetiConfigFeaturesBuilder EnableDeclareDurableQueues(); /// /// Configures the automatic creation of durable queues and updating of their bindings. @@ -93,7 +127,7 @@ namespace Tapeti.Config /// Note that access to the RabbitMQ Management plugin's REST API is required for this /// feature to work, since AMQP does not provide a way to query existing bindings. /// - ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled); + ITapetiConfigFeaturesBuilder SetDeclareDurableQueues(bool enabled); /// @@ -102,7 +136,7 @@ namespace Tapeti.Config /// exchange, which do not correspond to Tapeti's configuration, as these will cause an error /// while verifying. /// - ITapetiConfigBuilder DisableVerifyDurableQueues(); + ITapetiConfigFeaturesBuilder DisableVerifyDurableQueues(); /// @@ -111,7 +145,7 @@ namespace Tapeti.Config /// exchange, which do not correspond to Tapeti's configuration, as these will cause an error /// while verifying. /// - ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled); + ITapetiConfigFeaturesBuilder SetVerifyDurableQueues(bool enabled); } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 3818238..23d5cbf 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -135,7 +135,7 @@ namespace Tapeti.Connection DeclareExchange(channel, exchange); // The delivery tag is lost after a reconnect, register under the new tag - if (config.Features.PublisherConfirms) + if (config.GetFeatures().PublisherConfirms) { lastDeliveryTag++; @@ -848,7 +848,7 @@ namespace Tapeti.Connection } - if (config.Features.PublisherConfirms) + if (config.GetFeatures().PublisherConfirms) { lastDeliveryTag = 0; diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index df0db5a..b7023eb 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -125,9 +125,9 @@ namespace Tapeti.Connection CustomBindingTarget bindingTarget; - if (config.Features.DeclareDurableQueues) + if (config.GetFeatures().DeclareDurableQueues) bindingTarget = new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); - else if (config.Features.VerifyDurableQueues) + else if (config.GetFeatures().VerifyDurableQueues) bindingTarget = new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); else bindingTarget = new NoVerifyBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index ee48d3a..56188a7 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -135,7 +135,7 @@ namespace Tapeti /// public ITapetiConfigBuilder DisablePublisherConfirms() { - GetConfig().SetPublisherConfirms(false); + GetConfig().GetFeaturesBuilder().DisablePublisherConfirms(); return this; } @@ -143,7 +143,7 @@ namespace Tapeti /// public ITapetiConfigBuilder SetPublisherConfirms(bool enabled) { - GetConfig().SetPublisherConfirms(enabled); + GetConfig().GetFeaturesBuilder().SetPublisherConfirms(enabled); return this; } @@ -151,7 +151,7 @@ namespace Tapeti /// public ITapetiConfigBuilder EnableDeclareDurableQueues() { - GetConfig().SetDeclareDurableQueues(true); + GetConfig().GetFeaturesBuilder().EnableDeclareDurableQueues(); return this; } @@ -159,7 +159,7 @@ namespace Tapeti /// public ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled) { - GetConfig().SetDeclareDurableQueues(enabled); + GetConfig().GetFeaturesBuilder().SetDeclareDurableQueues(enabled); return this; } @@ -167,7 +167,7 @@ namespace Tapeti /// public ITapetiConfigBuilder DisableVerifyDurableQueues() { - GetConfig().SetVerifyDurableQueues(false); + GetConfig().GetFeaturesBuilder().DisablePublisherConfirms(); return this; } @@ -175,7 +175,15 @@ namespace Tapeti /// public ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled) { - GetConfig().SetVerifyDurableQueues(enabled); + GetConfig().GetFeaturesBuilder().SetVerifyDurableQueues(enabled); + return this; + } + + + /// + public ITapetiConfigBuilder DelayFeatures(Action onBuild) + { + GetConfig().GetFeaturesBuilder().DelayFeatures(onBuild); return this; } @@ -221,12 +229,13 @@ namespace Tapeti /// internal class Config : ITapetiConfig { - private readonly ConfigFeatures features = new(); + private ConfigFeaturesBuilder? featuresBuilder = new(); + private ITapetiConfigFeatures? features; + private readonly ConfigMiddleware middleware = new(); private readonly ConfigBindings bindings = new(); public IDependencyResolver DependencyResolver { get; } - public ITapetiConfigFeatues Features => features; public ITapetiConfigMiddleware Middleware => middleware; public ITapetiConfigBindings Bindings => bindings; @@ -237,6 +246,17 @@ namespace Tapeti } + public ITapetiConfigFeatures GetFeatures() + { + if (features != null) + return features; + + features = featuresBuilder!.Build(); + featuresBuilder = null; + return features; + } + + public void Lock() { bindings.Lock(); @@ -260,24 +280,17 @@ namespace Tapeti } - public void SetPublisherConfirms(bool enabled) + internal ConfigFeaturesBuilder GetFeaturesBuilder() { - features.PublisherConfirms = enabled; - } + if (featuresBuilder == null) + throw new InvalidOperationException("Tapeti features are already frozen"); - public void SetDeclareDurableQueues(bool enabled) - { - features.DeclareDurableQueues = enabled; - } - - public void SetVerifyDurableQueues(bool enabled) - { - features.VerifyDurableQueues = enabled; + return featuresBuilder; } } - internal class ConfigFeatures : ITapetiConfigFeatues + internal class ConfigFeatures : ITapetiConfigFeatures { public bool PublisherConfirms { get; internal set; } = true; public bool DeclareDurableQueues { get; internal set; } @@ -285,6 +298,66 @@ namespace Tapeti } + internal class ConfigFeaturesBuilder : ITapetiConfigFeaturesBuilder + { + private bool publisherConfirms = true; + private bool declareDurableQueues; + private bool verifyDurableQueues = true; + private Action? onBuild; + + + public ITapetiConfigFeaturesBuilder DisablePublisherConfirms() + { + return SetPublisherConfirms(false); + } + + public ITapetiConfigFeaturesBuilder SetPublisherConfirms(bool enabled) + { + publisherConfirms = enabled; + return this; + } + + public ITapetiConfigFeaturesBuilder EnableDeclareDurableQueues() + { + return SetDeclareDurableQueues(true); + } + + public ITapetiConfigFeaturesBuilder SetDeclareDurableQueues(bool enabled) + { + declareDurableQueues = enabled; + return this; + } + + public ITapetiConfigFeaturesBuilder DisableVerifyDurableQueues() + { + return SetVerifyDurableQueues(false); + } + + public ITapetiConfigFeaturesBuilder SetVerifyDurableQueues(bool enabled) + { + verifyDurableQueues = enabled; + return this; + } + + // ReSharper disable once ParameterHidesMember + public void DelayFeatures(Action onBuild) + { + this.onBuild = onBuild; + } + + public ITapetiConfigFeatures Build() + { + onBuild?.Invoke(this); + return new ConfigFeatures + { + DeclareDurableQueues = declareDurableQueues, + PublisherConfirms = publisherConfirms, + VerifyDurableQueues = verifyDurableQueues + }; + } + } + + internal class ConfigMiddleware : ITapetiConfigMiddleware { private readonly List messageMiddleware = new(); From c361af3b3c4e6eabdfb7fe4428bc2cfb6e2b7fbe Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 20 Dec 2024 11:31:17 +0100 Subject: [PATCH 09/10] Added Unsubscribe method Clarified contribution license requirements in readme --- README.md | 5 ++++- Tapeti/IConnection.cs | 6 ++++++ Tapeti/TapetiConnection.cs | 7 +++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c9b0c72..6313ced 100644 --- a/README.md +++ b/README.md @@ -67,4 +67,7 @@ Master build (stable release) [![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x/branch/master?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti/branch/master) Latest build -[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti) \ No newline at end of file +[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti) + +## Contributing +By contributing to Tapeti's main repository (https://github.com/MvRens/Tapeti) you agree to dedicate your code-base contributions to the public domain under the Unlicense license. \ No newline at end of file diff --git a/Tapeti/IConnection.cs b/Tapeti/IConnection.cs index da478e0..c08379c 100644 --- a/Tapeti/IConnection.cs +++ b/Tapeti/IConnection.cs @@ -84,6 +84,12 @@ namespace Tapeti ISubscriber SubscribeSync(bool startConsuming = true); + /// + /// Stops the current subscriber. + /// + Task Unsubscribe(); + + /// /// Returns an IPublisher implementation for the current connection. /// diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index 8898b3a..820d5e1 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -80,6 +80,13 @@ namespace Tapeti } + /// + public Task Unsubscribe() + { + return subscriber?.Stop() ?? Task.CompletedTask; + } + + /// public IPublisher GetPublisher() { From f9d1f7e0dea108c65b84d9bd162d0eefa78a901f Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 27 Jan 2025 10:47:26 +0100 Subject: [PATCH 10/10] Wait for running message handlers when closing the connection --- Tapeti.Flow/Default/FlowProvider.cs | 2 +- Tapeti.sln.DotSettings | 4 +- Tapeti/Connection/TapetiBasicConsumer.cs | 36 ++++++++++------ Tapeti/Connection/TapetiClient.cs | 7 +++- Tapeti/Default/MessageHandlerTracker.cs | 40 ++++++++++++++++++ Tapeti/Helpers/WaitHandleExtensions.cs | 52 ++++++++++++++++++++++++ Tapeti/IMessageHandlerTracker.cs | 18 ++++++++ 7 files changed, 143 insertions(+), 16 deletions(-) create mode 100644 Tapeti/Default/MessageHandlerTracker.cs create mode 100644 Tapeti/Helpers/WaitHandleExtensions.cs create mode 100644 Tapeti/IMessageHandlerTracker.cs diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 199f058..8b7a518 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -132,7 +132,7 @@ namespace Tapeti.Flow.Default internal async Task SendRequestDirect(FlowContext context, object message, string queueName, ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName = null, bool convergeMethodTaskSync = false) + string? convergeMethodName = null, bool convergeMethodTaskSync = false) { var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync); await context.Store(responseHandlerInfo.IsDurableQueue); diff --git a/Tapeti.sln.DotSettings b/Tapeti.sln.DotSettings index c44a322..55840d7 100644 --- a/Tapeti.sln.DotSettings +++ b/Tapeti.sln.DotSettings @@ -8,7 +8,9 @@ SQL UTF <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /> + <Policy><Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"><ElementKinds><Kind Name="FIELD" /><Kind Name="READONLY_FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="aaBb" /></Policy> True True True - True \ No newline at end of file + True + True \ No newline at end of file diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs index 757292d..9974d96 100644 --- a/Tapeti/Connection/TapetiBasicConsumer.cs +++ b/Tapeti/Connection/TapetiBasicConsumer.cs @@ -20,14 +20,16 @@ namespace Tapeti.Connection internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer { private readonly IConsumer consumer; + private readonly IMessageHandlerTracker messageHandlerTracker; private readonly long connectionReference; private readonly ResponseFunc onRespond; /// - public TapetiBasicConsumer(IConsumer consumer, long connectionReference, ResponseFunc onRespond) + public TapetiBasicConsumer(IConsumer consumer, IMessageHandlerTracker messageHandlerTracker, long connectionReference, ResponseFunc onRespond) { this.consumer = consumer; + this.messageHandlerTracker = messageHandlerTracker; this.connectionReference = connectionReference; this.onRespond = onRespond; } @@ -42,22 +44,30 @@ namespace Tapeti.Connection IBasicProperties properties, ReadOnlyMemory body) { - // RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing - // from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support - // is implemented we need to rethink the way the body is passed around and maybe deserialize it sooner - // (which changes exception handling, which is now done in TapetiConsumer exclusively). - // - // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 - var bodyArray = body.ToArray(); - + messageHandlerTracker.Enter(); try { - var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray).ConfigureAwait(false); - await onRespond(connectionReference, deliveryTag, response).ConfigureAwait(false); + // RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing + // from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support + // is implemented we need to rethink the way the body is passed around and maybe deserialize it sooner + // (which changes exception handling, which is now done in TapetiConsumer exclusively). + // + // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 + var bodyArray = body.ToArray(); + + try + { + var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray).ConfigureAwait(false); + await onRespond(connectionReference, deliveryTag, response).ConfigureAwait(false); + } + catch + { + await onRespond(connectionReference, deliveryTag, ConsumeResult.Error).ConfigureAwait(false); + } } - catch + finally { - await onRespond(connectionReference, deliveryTag, ConsumeResult.Error).ConfigureAwait(false); + messageHandlerTracker.Exit(); } } } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 23d5cbf..02cb024 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -33,6 +33,7 @@ namespace Tapeti.Connection private const int ReconnectDelay = 5000; private const int MandatoryReturnTimeout = 300000; private const int MinimumConnectedReconnectDelay = 1000; + private const int CloseMessageHandlersTimeout = 30000; private readonly TapetiConnectionParams connectionParams; @@ -49,6 +50,7 @@ namespace Tapeti.Connection private readonly TapetiChannel consumeChannel; private readonly TapetiChannel publishChannel; private readonly HttpClient managementClient; + private readonly MessageHandlerTracker messageHandlerTracker = new(); // These fields must be locked using connectionLock private readonly object connectionLock = new(); @@ -224,7 +226,7 @@ namespace Tapeti.Connection return; capturedConnectionReference = Interlocked.Read(ref connectionReference); - var basicConsumer = new TapetiBasicConsumer(consumer, capturedConnectionReference, Respond); + var basicConsumer = new TapetiBasicConsumer(consumer, messageHandlerTracker, capturedConnectionReference, Respond); consumerTag = channel.BasicConsume(queueName, false, basicConsumer); }).ConfigureAwait(false); @@ -570,6 +572,9 @@ namespace Tapeti.Connection capturedConnection.Dispose(); } } + + // Wait for message handlers to finish + await messageHandlerTracker.WaitForIdle(CloseMessageHandlersTimeout); } diff --git a/Tapeti/Default/MessageHandlerTracker.cs b/Tapeti/Default/MessageHandlerTracker.cs new file mode 100644 index 0000000..a805352 --- /dev/null +++ b/Tapeti/Default/MessageHandlerTracker.cs @@ -0,0 +1,40 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Tapeti.Helpers; + +namespace Tapeti.Default +{ + /// + public class MessageHandlerTracker : IMessageHandlerTracker + { + private volatile int runningCount; + private readonly ManualResetEventSlim idleEvent = new(true); + + + /// + public void Enter() + { + if (Interlocked.Increment(ref runningCount) == 1) + idleEvent.Reset(); + } + + + /// + public void Exit() + { + if (Interlocked.Decrement(ref runningCount) == 0) + idleEvent.Set(); + } + + + /// + /// Waits for the amount of currently running message handlers to reach zero. + /// + /// The timeout after which an OperationCanceledException is thrown. + public Task WaitForIdle(int timeoutMilliseconds) + { + return idleEvent.WaitHandle.WaitOneAsync(CancellationToken.None, timeoutMilliseconds); + } + } +} diff --git a/Tapeti/Helpers/WaitHandleExtensions.cs b/Tapeti/Helpers/WaitHandleExtensions.cs new file mode 100644 index 0000000..b0e8d19 --- /dev/null +++ b/Tapeti/Helpers/WaitHandleExtensions.cs @@ -0,0 +1,52 @@ +using System.Threading.Tasks; +using System.Threading; +using System; + +namespace Tapeti.Helpers +{ + /// + /// Provides a WaitOneAsync method for . + /// + public static class WaitHandleExtensions + { + /// + /// Provides a way to wait for a WaitHandle asynchronously. + /// + /// + /// Credit: + /// + public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite) + { + if (waitHandle == null) + throw new ArgumentNullException(nameof(waitHandle)); + + var tcs = new TaskCompletionSource(); + var ctr = cancellationToken.Register(() => tcs.TrySetCanceled()); + var timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan; + + var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle, + (_, timedOut) => + { + if (timedOut) + { + tcs.TrySetCanceled(); + } + else + { + tcs.TrySetResult(true); + } + }, + null, timeout, true); + + var task = tcs.Task; + + _ = task.ContinueWith(_ => + { + rwh.Unregister(null); + return ctr.Unregister(); + }, CancellationToken.None); + + return task; + } + } +} diff --git a/Tapeti/IMessageHandlerTracker.cs b/Tapeti/IMessageHandlerTracker.cs new file mode 100644 index 0000000..fd43979 --- /dev/null +++ b/Tapeti/IMessageHandlerTracker.cs @@ -0,0 +1,18 @@ +namespace Tapeti +{ + /// + /// Tracks the number of currently running message handlers. + /// + public interface IMessageHandlerTracker + { + /// + /// Registers the start of a message handler. + /// + void Enter(); + + /// + /// Registers the end of a message handler. + /// + void Exit(); + } +}