1
0
mirror of synced 2025-01-31 20:33:08 +01:00

Wait for running message handlers when closing the connection

This commit is contained in:
Mark van Renswoude 2025-01-27 10:47:26 +01:00
parent c14938649c
commit f9d1f7e0de
7 changed files with 143 additions and 16 deletions

View File

@ -132,7 +132,7 @@ namespace Tapeti.Flow.Default
internal async Task SendRequestDirect(FlowContext context, object message, string queueName, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false)
string? convergeMethodName = null, bool convergeMethodTaskSync = false)
{
var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync);
await context.Store(responseHandlerInfo.IsDurableQueue);

View File

@ -8,7 +8,9 @@
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SQL/@EntryIndexedValue">SQL</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=UTF/@EntryIndexedValue">UTF</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /&gt;</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/UserRules/=4a98fdf6_002D7d98_002D4f5a_002Dafeb_002Dea44ad98c70c/@EntryIndexedValue">&lt;Policy&gt;&lt;Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"&gt;&lt;ElementKinds&gt;&lt;Kind Name="FIELD" /&gt;&lt;Kind Name="READONLY_FIELD" /&gt;&lt;/ElementKinds&gt;&lt;/Descriptor&gt;&lt;Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="aaBb" /&gt;&lt;/Policy&gt;</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EPredefinedNamingRulesToUserRulesUpgrade/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

View File

@ -20,14 +20,16 @@ namespace Tapeti.Connection
internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer
{
private readonly IConsumer consumer;
private readonly IMessageHandlerTracker messageHandlerTracker;
private readonly long connectionReference;
private readonly ResponseFunc onRespond;
/// <inheritdoc />
public TapetiBasicConsumer(IConsumer consumer, long connectionReference, ResponseFunc onRespond)
public TapetiBasicConsumer(IConsumer consumer, IMessageHandlerTracker messageHandlerTracker, long connectionReference, ResponseFunc onRespond)
{
this.consumer = consumer;
this.messageHandlerTracker = messageHandlerTracker;
this.connectionReference = connectionReference;
this.onRespond = onRespond;
}
@ -41,6 +43,9 @@ namespace Tapeti.Connection
string routingKey,
IBasicProperties properties,
ReadOnlyMemory<byte> body)
{
messageHandlerTracker.Enter();
try
{
// RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing
// from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support
@ -60,5 +65,10 @@ namespace Tapeti.Connection
await onRespond(connectionReference, deliveryTag, ConsumeResult.Error).ConfigureAwait(false);
}
}
finally
{
messageHandlerTracker.Exit();
}
}
}
}

View File

@ -33,6 +33,7 @@ namespace Tapeti.Connection
private const int ReconnectDelay = 5000;
private const int MandatoryReturnTimeout = 300000;
private const int MinimumConnectedReconnectDelay = 1000;
private const int CloseMessageHandlersTimeout = 30000;
private readonly TapetiConnectionParams connectionParams;
@ -49,6 +50,7 @@ namespace Tapeti.Connection
private readonly TapetiChannel consumeChannel;
private readonly TapetiChannel publishChannel;
private readonly HttpClient managementClient;
private readonly MessageHandlerTracker messageHandlerTracker = new();
// These fields must be locked using connectionLock
private readonly object connectionLock = new();
@ -224,7 +226,7 @@ namespace Tapeti.Connection
return;
capturedConnectionReference = Interlocked.Read(ref connectionReference);
var basicConsumer = new TapetiBasicConsumer(consumer, capturedConnectionReference, Respond);
var basicConsumer = new TapetiBasicConsumer(consumer, messageHandlerTracker, capturedConnectionReference, Respond);
consumerTag = channel.BasicConsume(queueName, false, basicConsumer);
}).ConfigureAwait(false);
@ -570,6 +572,9 @@ namespace Tapeti.Connection
capturedConnection.Dispose();
}
}
// Wait for message handlers to finish
await messageHandlerTracker.WaitForIdle(CloseMessageHandlersTimeout);
}

View File

@ -0,0 +1,40 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Tapeti.Helpers;
namespace Tapeti.Default
{
/// <inheritdoc />
public class MessageHandlerTracker : IMessageHandlerTracker
{
private volatile int runningCount;
private readonly ManualResetEventSlim idleEvent = new(true);
/// <inheritdoc />
public void Enter()
{
if (Interlocked.Increment(ref runningCount) == 1)
idleEvent.Reset();
}
/// <inheritdoc />
public void Exit()
{
if (Interlocked.Decrement(ref runningCount) == 0)
idleEvent.Set();
}
/// <summary>
/// Waits for the amount of currently running message handlers to reach zero.
/// </summary>
/// <param name="timeoutMilliseconds">The timeout after which an OperationCanceledException is thrown.</param>
public Task WaitForIdle(int timeoutMilliseconds)
{
return idleEvent.WaitHandle.WaitOneAsync(CancellationToken.None, timeoutMilliseconds);
}
}
}

View File

@ -0,0 +1,52 @@
using System.Threading.Tasks;
using System.Threading;
using System;
namespace Tapeti.Helpers
{
/// <summary>
/// Provides a WaitOneAsync method for <see cref="WaitHandle"/>.
/// </summary>
public static class WaitHandleExtensions
{
/// <summary>
/// Provides a way to wait for a WaitHandle asynchronously.
/// </summary>
/// <remarks>
/// Credit: <see href="https://stackoverflow.com/a/68632819"/>
/// </remarks>
public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
{
if (waitHandle == null)
throw new ArgumentNullException(nameof(waitHandle));
var tcs = new TaskCompletionSource<bool>();
var ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
var timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;
var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
(_, timedOut) =>
{
if (timedOut)
{
tcs.TrySetCanceled();
}
else
{
tcs.TrySetResult(true);
}
},
null, timeout, true);
var task = tcs.Task;
_ = task.ContinueWith(_ =>
{
rwh.Unregister(null);
return ctr.Unregister();
}, CancellationToken.None);
return task;
}
}
}

View File

@ -0,0 +1,18 @@
namespace Tapeti
{
/// <summary>
/// Tracks the number of currently running message handlers.
/// </summary>
public interface IMessageHandlerTracker
{
/// <summary>
/// Registers the start of a message handler.
/// </summary>
void Enter();
/// <summary>
/// Registers the end of a message handler.
/// </summary>
void Exit();
}
}