From f9d1f7e0dea108c65b84d9bd162d0eefa78a901f Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 27 Jan 2025 10:47:26 +0100 Subject: [PATCH] Wait for running message handlers when closing the connection --- Tapeti.Flow/Default/FlowProvider.cs | 2 +- Tapeti.sln.DotSettings | 4 +- Tapeti/Connection/TapetiBasicConsumer.cs | 36 ++++++++++------ Tapeti/Connection/TapetiClient.cs | 7 +++- Tapeti/Default/MessageHandlerTracker.cs | 40 ++++++++++++++++++ Tapeti/Helpers/WaitHandleExtensions.cs | 52 ++++++++++++++++++++++++ Tapeti/IMessageHandlerTracker.cs | 18 ++++++++ 7 files changed, 143 insertions(+), 16 deletions(-) create mode 100644 Tapeti/Default/MessageHandlerTracker.cs create mode 100644 Tapeti/Helpers/WaitHandleExtensions.cs create mode 100644 Tapeti/IMessageHandlerTracker.cs diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 199f058..8b7a518 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -132,7 +132,7 @@ namespace Tapeti.Flow.Default internal async Task SendRequestDirect(FlowContext context, object message, string queueName, ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName = null, bool convergeMethodTaskSync = false) + string? convergeMethodName = null, bool convergeMethodTaskSync = false) { var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync); await context.Store(responseHandlerInfo.IsDurableQueue); diff --git a/Tapeti.sln.DotSettings b/Tapeti.sln.DotSettings index c44a322..55840d7 100644 --- a/Tapeti.sln.DotSettings +++ b/Tapeti.sln.DotSettings @@ -8,7 +8,9 @@ SQL UTF <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /> + <Policy><Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"><ElementKinds><Kind Name="FIELD" /><Kind Name="READONLY_FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="aaBb" /></Policy> True True True - True \ No newline at end of file + True + True \ No newline at end of file diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs index 757292d..9974d96 100644 --- a/Tapeti/Connection/TapetiBasicConsumer.cs +++ b/Tapeti/Connection/TapetiBasicConsumer.cs @@ -20,14 +20,16 @@ namespace Tapeti.Connection internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer { private readonly IConsumer consumer; + private readonly IMessageHandlerTracker messageHandlerTracker; private readonly long connectionReference; private readonly ResponseFunc onRespond; /// - public TapetiBasicConsumer(IConsumer consumer, long connectionReference, ResponseFunc onRespond) + public TapetiBasicConsumer(IConsumer consumer, IMessageHandlerTracker messageHandlerTracker, long connectionReference, ResponseFunc onRespond) { this.consumer = consumer; + this.messageHandlerTracker = messageHandlerTracker; this.connectionReference = connectionReference; this.onRespond = onRespond; } @@ -42,22 +44,30 @@ namespace Tapeti.Connection IBasicProperties properties, ReadOnlyMemory body) { - // RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing - // from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support - // is implemented we need to rethink the way the body is passed around and maybe deserialize it sooner - // (which changes exception handling, which is now done in TapetiConsumer exclusively). - // - // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 - var bodyArray = body.ToArray(); - + messageHandlerTracker.Enter(); try { - var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray).ConfigureAwait(false); - await onRespond(connectionReference, deliveryTag, response).ConfigureAwait(false); + // RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing + // from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support + // is implemented we need to rethink the way the body is passed around and maybe deserialize it sooner + // (which changes exception handling, which is now done in TapetiConsumer exclusively). + // + // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 + var bodyArray = body.ToArray(); + + try + { + var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray).ConfigureAwait(false); + await onRespond(connectionReference, deliveryTag, response).ConfigureAwait(false); + } + catch + { + await onRespond(connectionReference, deliveryTag, ConsumeResult.Error).ConfigureAwait(false); + } } - catch + finally { - await onRespond(connectionReference, deliveryTag, ConsumeResult.Error).ConfigureAwait(false); + messageHandlerTracker.Exit(); } } } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 23d5cbf..02cb024 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -33,6 +33,7 @@ namespace Tapeti.Connection private const int ReconnectDelay = 5000; private const int MandatoryReturnTimeout = 300000; private const int MinimumConnectedReconnectDelay = 1000; + private const int CloseMessageHandlersTimeout = 30000; private readonly TapetiConnectionParams connectionParams; @@ -49,6 +50,7 @@ namespace Tapeti.Connection private readonly TapetiChannel consumeChannel; private readonly TapetiChannel publishChannel; private readonly HttpClient managementClient; + private readonly MessageHandlerTracker messageHandlerTracker = new(); // These fields must be locked using connectionLock private readonly object connectionLock = new(); @@ -224,7 +226,7 @@ namespace Tapeti.Connection return; capturedConnectionReference = Interlocked.Read(ref connectionReference); - var basicConsumer = new TapetiBasicConsumer(consumer, capturedConnectionReference, Respond); + var basicConsumer = new TapetiBasicConsumer(consumer, messageHandlerTracker, capturedConnectionReference, Respond); consumerTag = channel.BasicConsume(queueName, false, basicConsumer); }).ConfigureAwait(false); @@ -570,6 +572,9 @@ namespace Tapeti.Connection capturedConnection.Dispose(); } } + + // Wait for message handlers to finish + await messageHandlerTracker.WaitForIdle(CloseMessageHandlersTimeout); } diff --git a/Tapeti/Default/MessageHandlerTracker.cs b/Tapeti/Default/MessageHandlerTracker.cs new file mode 100644 index 0000000..a805352 --- /dev/null +++ b/Tapeti/Default/MessageHandlerTracker.cs @@ -0,0 +1,40 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Tapeti.Helpers; + +namespace Tapeti.Default +{ + /// + public class MessageHandlerTracker : IMessageHandlerTracker + { + private volatile int runningCount; + private readonly ManualResetEventSlim idleEvent = new(true); + + + /// + public void Enter() + { + if (Interlocked.Increment(ref runningCount) == 1) + idleEvent.Reset(); + } + + + /// + public void Exit() + { + if (Interlocked.Decrement(ref runningCount) == 0) + idleEvent.Set(); + } + + + /// + /// Waits for the amount of currently running message handlers to reach zero. + /// + /// The timeout after which an OperationCanceledException is thrown. + public Task WaitForIdle(int timeoutMilliseconds) + { + return idleEvent.WaitHandle.WaitOneAsync(CancellationToken.None, timeoutMilliseconds); + } + } +} diff --git a/Tapeti/Helpers/WaitHandleExtensions.cs b/Tapeti/Helpers/WaitHandleExtensions.cs new file mode 100644 index 0000000..b0e8d19 --- /dev/null +++ b/Tapeti/Helpers/WaitHandleExtensions.cs @@ -0,0 +1,52 @@ +using System.Threading.Tasks; +using System.Threading; +using System; + +namespace Tapeti.Helpers +{ + /// + /// Provides a WaitOneAsync method for . + /// + public static class WaitHandleExtensions + { + /// + /// Provides a way to wait for a WaitHandle asynchronously. + /// + /// + /// Credit: + /// + public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite) + { + if (waitHandle == null) + throw new ArgumentNullException(nameof(waitHandle)); + + var tcs = new TaskCompletionSource(); + var ctr = cancellationToken.Register(() => tcs.TrySetCanceled()); + var timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan; + + var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle, + (_, timedOut) => + { + if (timedOut) + { + tcs.TrySetCanceled(); + } + else + { + tcs.TrySetResult(true); + } + }, + null, timeout, true); + + var task = tcs.Task; + + _ = task.ContinueWith(_ => + { + rwh.Unregister(null); + return ctr.Unregister(); + }, CancellationToken.None); + + return task; + } + } +} diff --git a/Tapeti/IMessageHandlerTracker.cs b/Tapeti/IMessageHandlerTracker.cs new file mode 100644 index 0000000..fd43979 --- /dev/null +++ b/Tapeti/IMessageHandlerTracker.cs @@ -0,0 +1,18 @@ +namespace Tapeti +{ + /// + /// Tracks the number of currently running message handlers. + /// + public interface IMessageHandlerTracker + { + /// + /// Registers the start of a message handler. + /// + void Enter(); + + /// + /// Registers the end of a message handler. + /// + void Exit(); + } +}