1
0
mirror of synced 2024-11-05 02:59:16 +00:00

Fixed "Index out of range" when publishing from multiple thread (or from a message handler with prefetchcount > 1)

This commit is contained in:
Mark van Renswoude 2019-05-20 15:22:40 +02:00
parent 4c08e9b684
commit 0bd9d06795

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -37,6 +38,9 @@ namespace Tapeti.Connection
private IModel channelInstance; private IModel channelInstance;
private ulong lastDeliveryTag; private ulong lastDeliveryTag;
private DateTime connectedDateTime; private DateTime connectedDateTime;
// These fields must be locked, since the callbacks for BasicAck/BasicReturn can run in a different thread
private readonly object confirmLock = new Object();
private readonly Dictionary<ulong, ConfirmMessageInfo> confirmMessages = new Dictionary<ulong, ConfirmMessageInfo>(); private readonly Dictionary<ulong, ConfirmMessageInfo> confirmMessages = new Dictionary<ulong, ConfirmMessageInfo>();
private readonly Dictionary<string, ReturnInfo> returnRoutingKeys = new Dictionary<string, ReturnInfo>(); private readonly Dictionary<string, ReturnInfo> returnRoutingKeys = new Dictionary<string, ReturnInfo>();
@ -208,6 +212,9 @@ namespace Tapeti.Connection
async (handler, next) => await handler.Handle(context, next), async (handler, next) => await handler.Handle(context, next),
() => taskQueue.Value.Add(async () => () => taskQueue.Value.Add(async () =>
{ {
if (Thread.CurrentThread.ManagedThreadId != 3)
Debug.WriteLine(Thread.CurrentThread.ManagedThreadId);
var body = messageSerializer.Serialize(context.Message, context.Properties); var body = messageSerializer.Serialize(context.Message, context.Properties);
Task<int> publishResultTask = null; Task<int> publishResultTask = null;
@ -225,7 +232,16 @@ namespace Tapeti.Connection
{ {
lastDeliveryTag++; lastDeliveryTag++;
confirmMessages.Add(lastDeliveryTag, messageInfo); Monitor.Enter(confirmLock);
try
{
confirmMessages.Add(lastDeliveryTag, messageInfo);
}
finally
{
Monitor.Exit(confirmLock);
}
publishResultTask = messageInfo.CompletionSource.Task; publishResultTask = messageInfo.CompletionSource.Task;
} }
else else
@ -326,7 +342,17 @@ namespace Tapeti.Connection
if (config.UsePublisherConfirms) if (config.UsePublisherConfirms)
{ {
lastDeliveryTag = 0; lastDeliveryTag = 0;
confirmMessages.Clear();
Monitor.Enter(confirmLock);
try
{
confirmMessages.Clear();
}
finally
{
Monitor.Exit(confirmLock);
}
channelInstance.ConfirmSelect(); channelInstance.ConfirmSelect();
} }
@ -403,35 +429,51 @@ namespace Tapeti.Connection
private void HandleBasicAck(object sender, BasicAckEventArgs e) private void HandleBasicAck(object sender, BasicAckEventArgs e)
{ {
foreach (var deliveryTag in GetDeliveryTags(e)) Monitor.Enter(confirmLock);
try
{ {
if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo)) foreach (var deliveryTag in GetDeliveryTags(e))
continue;
if (returnRoutingKeys.TryGetValue(messageInfo.ReturnKey, out var returnInfo))
{ {
messageInfo.CompletionSource.SetResult(returnInfo.FirstReplyCode); if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo))
continue;
returnInfo.RefCount--; if (returnRoutingKeys.TryGetValue(messageInfo.ReturnKey, out var returnInfo))
if (returnInfo.RefCount == 0) {
returnRoutingKeys.Remove(messageInfo.ReturnKey); messageInfo.CompletionSource.SetResult(returnInfo.FirstReplyCode);
returnInfo.RefCount--;
if (returnInfo.RefCount == 0)
returnRoutingKeys.Remove(messageInfo.ReturnKey);
}
messageInfo.CompletionSource.SetResult(0);
confirmMessages.Remove(deliveryTag);
} }
}
messageInfo.CompletionSource.SetResult(0); finally
confirmMessages.Remove(deliveryTag); {
Monitor.Exit(confirmLock);
} }
} }
private void HandleBasicNack(object sender, BasicNackEventArgs e) private void HandleBasicNack(object sender, BasicNackEventArgs e)
{ {
foreach (var deliveryTag in GetDeliveryTags(e)) Monitor.Enter(confirmLock);
try
{ {
if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo)) foreach (var deliveryTag in GetDeliveryTags(e))
continue; {
if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo))
continue;
messageInfo.CompletionSource.SetCanceled(); messageInfo.CompletionSource.SetCanceled();
confirmMessages.Remove(e.DeliveryTag); confirmMessages.Remove(e.DeliveryTag);
}
}
finally
{
Monitor.Exit(confirmLock);
} }
} }