diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs
index 199f058..8b7a518 100644
--- a/Tapeti.Flow/Default/FlowProvider.cs
+++ b/Tapeti.Flow/Default/FlowProvider.cs
@@ -132,7 +132,7 @@ namespace Tapeti.Flow.Default
internal async Task SendRequestDirect(FlowContext context, object message, string queueName, ResponseHandlerInfo responseHandlerInfo,
- string convergeMethodName = null, bool convergeMethodTaskSync = false)
+ string? convergeMethodName = null, bool convergeMethodTaskSync = false)
{
var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync);
await context.Store(responseHandlerInfo.IsDurableQueue);
diff --git a/Tapeti.sln.DotSettings b/Tapeti.sln.DotSettings
index c44a322..55840d7 100644
--- a/Tapeti.sln.DotSettings
+++ b/Tapeti.sln.DotSettings
@@ -8,7 +8,9 @@
SQL
UTF
<Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
+ <Policy><Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"><ElementKinds><Kind Name="FIELD" /><Kind Name="READONLY_FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="aaBb" /></Policy>
True
True
True
- True
\ No newline at end of file
+ True
+ True
\ No newline at end of file
diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs
index 757292d..9974d96 100644
--- a/Tapeti/Connection/TapetiBasicConsumer.cs
+++ b/Tapeti/Connection/TapetiBasicConsumer.cs
@@ -20,14 +20,16 @@ namespace Tapeti.Connection
internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer
{
private readonly IConsumer consumer;
+ private readonly IMessageHandlerTracker messageHandlerTracker;
private readonly long connectionReference;
private readonly ResponseFunc onRespond;
///
- public TapetiBasicConsumer(IConsumer consumer, long connectionReference, ResponseFunc onRespond)
+ public TapetiBasicConsumer(IConsumer consumer, IMessageHandlerTracker messageHandlerTracker, long connectionReference, ResponseFunc onRespond)
{
this.consumer = consumer;
+ this.messageHandlerTracker = messageHandlerTracker;
this.connectionReference = connectionReference;
this.onRespond = onRespond;
}
@@ -42,22 +44,30 @@ namespace Tapeti.Connection
IBasicProperties properties,
ReadOnlyMemory body)
{
- // RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing
- // from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support
- // is implemented we need to rethink the way the body is passed around and maybe deserialize it sooner
- // (which changes exception handling, which is now done in TapetiConsumer exclusively).
- //
- // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
- var bodyArray = body.ToArray();
-
+ messageHandlerTracker.Enter();
try
{
- var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray).ConfigureAwait(false);
- await onRespond(connectionReference, deliveryTag, response).ConfigureAwait(false);
+ // RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing
+ // from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support
+ // is implemented we need to rethink the way the body is passed around and maybe deserialize it sooner
+ // (which changes exception handling, which is now done in TapetiConsumer exclusively).
+ //
+ // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
+ var bodyArray = body.ToArray();
+
+ try
+ {
+ var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray).ConfigureAwait(false);
+ await onRespond(connectionReference, deliveryTag, response).ConfigureAwait(false);
+ }
+ catch
+ {
+ await onRespond(connectionReference, deliveryTag, ConsumeResult.Error).ConfigureAwait(false);
+ }
}
- catch
+ finally
{
- await onRespond(connectionReference, deliveryTag, ConsumeResult.Error).ConfigureAwait(false);
+ messageHandlerTracker.Exit();
}
}
}
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index 23d5cbf..02cb024 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -33,6 +33,7 @@ namespace Tapeti.Connection
private const int ReconnectDelay = 5000;
private const int MandatoryReturnTimeout = 300000;
private const int MinimumConnectedReconnectDelay = 1000;
+ private const int CloseMessageHandlersTimeout = 30000;
private readonly TapetiConnectionParams connectionParams;
@@ -49,6 +50,7 @@ namespace Tapeti.Connection
private readonly TapetiChannel consumeChannel;
private readonly TapetiChannel publishChannel;
private readonly HttpClient managementClient;
+ private readonly MessageHandlerTracker messageHandlerTracker = new();
// These fields must be locked using connectionLock
private readonly object connectionLock = new();
@@ -224,7 +226,7 @@ namespace Tapeti.Connection
return;
capturedConnectionReference = Interlocked.Read(ref connectionReference);
- var basicConsumer = new TapetiBasicConsumer(consumer, capturedConnectionReference, Respond);
+ var basicConsumer = new TapetiBasicConsumer(consumer, messageHandlerTracker, capturedConnectionReference, Respond);
consumerTag = channel.BasicConsume(queueName, false, basicConsumer);
}).ConfigureAwait(false);
@@ -570,6 +572,9 @@ namespace Tapeti.Connection
capturedConnection.Dispose();
}
}
+
+ // Wait for message handlers to finish
+ await messageHandlerTracker.WaitForIdle(CloseMessageHandlersTimeout);
}
diff --git a/Tapeti/Default/MessageHandlerTracker.cs b/Tapeti/Default/MessageHandlerTracker.cs
new file mode 100644
index 0000000..a805352
--- /dev/null
+++ b/Tapeti/Default/MessageHandlerTracker.cs
@@ -0,0 +1,40 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Tapeti.Helpers;
+
+namespace Tapeti.Default
+{
+ ///
+ public class MessageHandlerTracker : IMessageHandlerTracker
+ {
+ private volatile int runningCount;
+ private readonly ManualResetEventSlim idleEvent = new(true);
+
+
+ ///
+ public void Enter()
+ {
+ if (Interlocked.Increment(ref runningCount) == 1)
+ idleEvent.Reset();
+ }
+
+
+ ///
+ public void Exit()
+ {
+ if (Interlocked.Decrement(ref runningCount) == 0)
+ idleEvent.Set();
+ }
+
+
+ ///
+ /// Waits for the amount of currently running message handlers to reach zero.
+ ///
+ /// The timeout after which an OperationCanceledException is thrown.
+ public Task WaitForIdle(int timeoutMilliseconds)
+ {
+ return idleEvent.WaitHandle.WaitOneAsync(CancellationToken.None, timeoutMilliseconds);
+ }
+ }
+}
diff --git a/Tapeti/Helpers/WaitHandleExtensions.cs b/Tapeti/Helpers/WaitHandleExtensions.cs
new file mode 100644
index 0000000..b0e8d19
--- /dev/null
+++ b/Tapeti/Helpers/WaitHandleExtensions.cs
@@ -0,0 +1,52 @@
+using System.Threading.Tasks;
+using System.Threading;
+using System;
+
+namespace Tapeti.Helpers
+{
+ ///
+ /// Provides a WaitOneAsync method for .
+ ///
+ public static class WaitHandleExtensions
+ {
+ ///
+ /// Provides a way to wait for a WaitHandle asynchronously.
+ ///
+ ///
+ /// Credit:
+ ///
+ public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
+ {
+ if (waitHandle == null)
+ throw new ArgumentNullException(nameof(waitHandle));
+
+ var tcs = new TaskCompletionSource();
+ var ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
+ var timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;
+
+ var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
+ (_, timedOut) =>
+ {
+ if (timedOut)
+ {
+ tcs.TrySetCanceled();
+ }
+ else
+ {
+ tcs.TrySetResult(true);
+ }
+ },
+ null, timeout, true);
+
+ var task = tcs.Task;
+
+ _ = task.ContinueWith(_ =>
+ {
+ rwh.Unregister(null);
+ return ctr.Unregister();
+ }, CancellationToken.None);
+
+ return task;
+ }
+ }
+}
diff --git a/Tapeti/IMessageHandlerTracker.cs b/Tapeti/IMessageHandlerTracker.cs
new file mode 100644
index 0000000..fd43979
--- /dev/null
+++ b/Tapeti/IMessageHandlerTracker.cs
@@ -0,0 +1,18 @@
+namespace Tapeti
+{
+ ///
+ /// Tracks the number of currently running message handlers.
+ ///
+ public interface IMessageHandlerTracker
+ {
+ ///
+ /// Registers the start of a message handler.
+ ///
+ void Enter();
+
+ ///
+ /// Registers the end of a message handler.
+ ///
+ void Exit();
+ }
+}