diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index f9d577a..df0f3d7 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -37,6 +38,9 @@ namespace Tapeti.Connection private IModel channelInstance; private ulong lastDeliveryTag; private DateTime connectedDateTime; + + // These fields must be locked, since the callbacks for BasicAck/BasicReturn can run in a different thread + private readonly object confirmLock = new Object(); private readonly Dictionary confirmMessages = new Dictionary(); private readonly Dictionary returnRoutingKeys = new Dictionary(); @@ -208,6 +212,9 @@ namespace Tapeti.Connection async (handler, next) => await handler.Handle(context, next), () => taskQueue.Value.Add(async () => { + if (Thread.CurrentThread.ManagedThreadId != 3) + Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); + var body = messageSerializer.Serialize(context.Message, context.Properties); Task publishResultTask = null; @@ -225,7 +232,16 @@ namespace Tapeti.Connection { lastDeliveryTag++; - confirmMessages.Add(lastDeliveryTag, messageInfo); + Monitor.Enter(confirmLock); + try + { + confirmMessages.Add(lastDeliveryTag, messageInfo); + } + finally + { + Monitor.Exit(confirmLock); + } + publishResultTask = messageInfo.CompletionSource.Task; } else @@ -326,7 +342,17 @@ namespace Tapeti.Connection if (config.UsePublisherConfirms) { lastDeliveryTag = 0; - confirmMessages.Clear(); + + Monitor.Enter(confirmLock); + try + { + confirmMessages.Clear(); + } + finally + { + Monitor.Exit(confirmLock); + } + channelInstance.ConfirmSelect(); } @@ -403,35 +429,51 @@ namespace Tapeti.Connection private void HandleBasicAck(object sender, BasicAckEventArgs e) { - foreach (var deliveryTag in GetDeliveryTags(e)) + Monitor.Enter(confirmLock); + try { - if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo)) - continue; - - if (returnRoutingKeys.TryGetValue(messageInfo.ReturnKey, out var returnInfo)) + foreach (var deliveryTag in GetDeliveryTags(e)) { - messageInfo.CompletionSource.SetResult(returnInfo.FirstReplyCode); + if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo)) + continue; - returnInfo.RefCount--; - if (returnInfo.RefCount == 0) - returnRoutingKeys.Remove(messageInfo.ReturnKey); + if (returnRoutingKeys.TryGetValue(messageInfo.ReturnKey, out var returnInfo)) + { + messageInfo.CompletionSource.SetResult(returnInfo.FirstReplyCode); + + returnInfo.RefCount--; + if (returnInfo.RefCount == 0) + returnRoutingKeys.Remove(messageInfo.ReturnKey); + } + + messageInfo.CompletionSource.SetResult(0); + confirmMessages.Remove(deliveryTag); } - - messageInfo.CompletionSource.SetResult(0); - confirmMessages.Remove(deliveryTag); + } + finally + { + Monitor.Exit(confirmLock); } } private void HandleBasicNack(object sender, BasicNackEventArgs e) { - foreach (var deliveryTag in GetDeliveryTags(e)) + Monitor.Enter(confirmLock); + try { - if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo)) - continue; + foreach (var deliveryTag in GetDeliveryTags(e)) + { + if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo)) + continue; - messageInfo.CompletionSource.SetCanceled(); - confirmMessages.Remove(e.DeliveryTag); + messageInfo.CompletionSource.SetCanceled(); + confirmMessages.Remove(e.DeliveryTag); + } + } + finally + { + Monitor.Exit(confirmLock); } }