1
0
mirror of synced 2024-06-28 14:57:40 +00:00

Fixed #6: Use 'mandatory' on replies (review)

Fixed #13: Exception for dynamic queues after reconnect
This commit is contained in:
Mark van Renswoude 2019-02-13 12:00:34 +01:00
parent 45c090d00d
commit 5b3be481e1
7 changed files with 274 additions and 65 deletions

View File

@ -30,6 +30,7 @@ namespace Tapeti.Config
public interface IDynamicQueue : IQueue public interface IDynamicQueue : IQueue
{ {
string GetDeclareQueueName();
void SetName(string name); void SetName(string name);
} }

View File

@ -1,9 +1,16 @@
namespace Tapeti.Connection namespace Tapeti.Connection
{ {
public class DisconnectedEventArgs
{
public ushort ReplyCode;
public string ReplyText;
}
public interface IConnectionEventListener public interface IConnectionEventListener
{ {
void Connected(); void Connected();
void Reconnected(); void Reconnected();
void Disconnected(); void Disconnected(DisconnectedEventArgs e);
} }
} }

View File

@ -1,8 +1,10 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using RabbitMQ.Client; using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing; using RabbitMQ.Client.Framing;
using Tapeti.Config; using Tapeti.Config;
@ -16,7 +18,7 @@ namespace Tapeti.Connection
{ {
private const int ReconnectDelay = 5000; private const int ReconnectDelay = 5000;
private const int MandatoryReturnTimeout = 30000; private const int MandatoryReturnTimeout = 30000;
private const int PublishMaxConnectAttempts = 3; private const int MinimumConnectedReconnectDelay = 1000;
private readonly IConfig config; private readonly IConfig config;
private readonly ILogger logger; private readonly ILogger logger;
@ -28,10 +30,30 @@ namespace Tapeti.Connection
private readonly IExchangeStrategy exchangeStrategy; private readonly IExchangeStrategy exchangeStrategy;
private readonly Lazy<SingleThreadTaskQueue> taskQueue = new Lazy<SingleThreadTaskQueue>(); private readonly Lazy<SingleThreadTaskQueue> taskQueue = new Lazy<SingleThreadTaskQueue>();
// These fields are for use in the taskQueue only! // These fields are for use in the taskQueue only!
private RabbitMQ.Client.IConnection connection; private RabbitMQ.Client.IConnection connection;
private bool isReconnect;
private IModel channelInstance; private IModel channelInstance;
private TaskCompletionSource<int> publishResultTaskSource; private ulong lastDeliveryTag;
private DateTime connectedDateTime;
private readonly Dictionary<ulong, ConfirmMessageInfo> confirmMessages = new Dictionary<ulong, ConfirmMessageInfo>();
private readonly Dictionary<string, ReturnInfo> returnRoutingKeys = new Dictionary<string, ReturnInfo>();
private class ConfirmMessageInfo
{
public string ReturnKey;
public TaskCompletionSource<int> CompletionSource;
}
private class ReturnInfo
{
public uint RefCount;
public int FirstReplyCode;
}
public TapetiWorker(IConfig config) public TapetiWorker(IConfig config)
@ -64,7 +86,7 @@ namespace Tapeti.Connection
return taskQueue.Value.Add(() => return taskQueue.Value.Add(() =>
{ {
GetChannel().BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)); WithRetryableChannel(channel => channel.BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)));
}); });
} }
@ -73,34 +95,38 @@ namespace Tapeti.Connection
{ {
return taskQueue.Value.Add(() => return taskQueue.Value.Add(() =>
{ {
var channel = GetChannel(); WithRetryableChannel(channel =>
if (queue.Dynamic)
{ {
var dynamicQueue = channel.QueueDeclare(queue.Name); if (queue.Dynamic)
(queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName);
foreach (var binding in queue.Bindings)
{ {
if (binding.QueueBindingMode == QueueBindingMode.RoutingKey) if (!(queue is IDynamicQueue dynamicQueue))
throw new NullReferenceException("Queue with Dynamic = true must implement IDynamicQueue");
var declaredQueue = channel.QueueDeclare(dynamicQueue.GetDeclareQueueName());
dynamicQueue.SetName(declaredQueue.QueueName);
foreach (var binding in queue.Bindings)
{ {
var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); if (binding.QueueBindingMode == QueueBindingMode.RoutingKey)
var exchange = exchangeStrategy.GetExchange(binding.MessageClass); {
var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass);
var exchange = exchangeStrategy.GetExchange(binding.MessageClass);
channel.QueueBind(dynamicQueue.QueueName, exchange, routingKey); channel.QueueBind(declaredQueue.QueueName, exchange, routingKey);
}
(binding as IBuildBinding)?.SetQueueName(declaredQueue.QueueName);
} }
(binding as IBuildBinding)?.SetQueueName(dynamicQueue.QueueName);
} }
} else
else
{
channel.QueueDeclarePassive(queue.Name);
foreach (var binding in queue.Bindings)
{ {
(binding as IBuildBinding)?.SetQueueName(queue.Name); channel.QueueDeclarePassive(queue.Name);
foreach (var binding in queue.Bindings)
{
(binding as IBuildBinding)?.SetQueueName(queue.Name);
}
} }
} });
}); });
} }
@ -109,6 +135,8 @@ namespace Tapeti.Connection
{ {
return taskQueue.Value.Add(() => return taskQueue.Value.Add(() =>
{ {
// No need for a retryable channel here, if the connection is lost we can't
// use the deliveryTag anymore.
switch (response) switch (response)
{ {
case ConsumeResponse.Ack: case ConsumeResponse.Ack:
@ -122,6 +150,9 @@ namespace Tapeti.Connection
case ConsumeResponse.Requeue: case ConsumeResponse.Requeue:
GetChannel().BasicNack(deliveryTag, false, true); GetChannel().BasicNack(deliveryTag, false, true);
break; break;
default:
throw new ArgumentOutOfRangeException(nameof(response), response, null);
} }
}); });
@ -175,27 +206,48 @@ namespace Tapeti.Connection
return MiddlewareHelper.GoAsync( return MiddlewareHelper.GoAsync(
config.PublishMiddleware, config.PublishMiddleware,
async (handler, next) => await handler.Handle(context, next), async (handler, next) => await handler.Handle(context, next),
() => taskQueue.Value.Add(() => () => taskQueue.Value.Add(async () =>
{ {
var body = messageSerializer.Serialize(context.Message, context.Properties); var body = messageSerializer.Serialize(context.Message, context.Properties);
Task<int> publishResultTask = null; Task<int> publishResultTask = null;
var messageInfo = new ConfirmMessageInfo
if (config.UsePublisherConfirms)
{ {
publishResultTaskSource = new TaskCompletionSource<int>(); ReturnKey = GetReturnKey(context.Exchange, context.RoutingKey),
publishResultTask = publishResultTaskSource.Task; CompletionSource = new TaskCompletionSource<int>()
} };
else
mandatory = false;
WithRetryableChannel(channel =>
{
// The delivery tag is lost after a reconnect, register under the new tag
if (config.UsePublisherConfirms)
{
lastDeliveryTag++;
confirmMessages.Add(lastDeliveryTag, messageInfo);
publishResultTask = messageInfo.CompletionSource.Task;
}
else
mandatory = false;
channel.BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body);
});
GetChannel(PublishMaxConnectAttempts).BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body);
if (publishResultTask == null) if (publishResultTask == null)
return; return;
if (!publishResultTask.Wait(MandatoryReturnTimeout)) var delayCancellationTokenSource = new CancellationTokenSource();
var signalledTask = await Task.WhenAny(publishResultTask, Task.Delay(MandatoryReturnTimeout, delayCancellationTokenSource.Token));
if (signalledTask != publishResultTask)
throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}"); throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}");
delayCancellationTokenSource.Cancel();
if (publishResultTask.IsCanceled)
throw new NackException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} was nacked");
var replyCode = publishResultTask.Result; var replyCode = publishResultTask.Result;
@ -210,16 +262,43 @@ namespace Tapeti.Connection
// ReSharper restore ImplicitlyCapturedClosure // ReSharper restore ImplicitlyCapturedClosure
} }
/// <remarks> /// <remarks>
/// Only call this from a task in the taskQueue to ensure IModel is only used /// Only call this from a task in the taskQueue to ensure IModel is only used
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
/// </remarks> /// </remarks>
private IModel GetChannel(int? maxAttempts = null) private void WithRetryableChannel(Action<IModel> operation)
{ {
if (channelInstance != null) while (true)
{
try
{
operation(GetChannel());
break;
}
catch (AlreadyClosedException e)
{
// TODO log?
}
}
}
/// <remarks>
/// Only call this from a task in the taskQueue to ensure IModel is only used
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
/// </remarks>
private IModel GetChannel()
{
if (channelInstance != null && channelInstance.IsOpen)
return channelInstance; return channelInstance;
var attempts = 0; // If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ
// not related to the connection), wait for a bit to avoid spamming the connection
if ((DateTime.UtcNow - connectedDateTime).TotalMilliseconds <= MinimumConnectedReconnectDelay)
Thread.Sleep(ReconnectDelay);
var connectionFactory = new ConnectionFactory var connectionFactory = new ConnectionFactory
{ {
HostName = ConnectionParams.HostName, HostName = ConnectionParams.HostName,
@ -227,8 +306,8 @@ namespace Tapeti.Connection
VirtualHost = ConnectionParams.VirtualHost, VirtualHost = ConnectionParams.VirtualHost,
UserName = ConnectionParams.Username, UserName = ConnectionParams.Username,
Password = ConnectionParams.Password, Password = ConnectionParams.Password,
AutomaticRecoveryEnabled = true, // The created connection is an IRecoverable AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false, // We'll manually redeclare all queues in the Reconnect event to update the internal state for dynamic queues TopologyRecoveryEnabled = false,
RequestedHeartbeat = 30 RequestedHeartbeat = 30
}; };
@ -240,39 +319,50 @@ namespace Tapeti.Connection
connection = connectionFactory.CreateConnection(); connection = connectionFactory.CreateConnection();
channelInstance = connection.CreateModel(); channelInstance = connection.CreateModel();
channelInstance.ConfirmSelect();
if (channelInstance == null)
throw new BrokerUnreachableException(null);
if (config.UsePublisherConfirms)
{
lastDeliveryTag = 0;
confirmMessages.Clear();
channelInstance.ConfirmSelect();
}
if (ConnectionParams.PrefetchCount > 0) if (ConnectionParams.PrefetchCount > 0)
channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false); channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false);
((IRecoverable)connection).Recovery += (sender, e) => ConnectionEventListener?.Reconnected(); channelInstance.ModelShutdown += (sender, e) =>
channelInstance.ModelShutdown += (sender, eventArgs) => ConnectionEventListener?.Disconnected();
channelInstance.BasicReturn += (sender, eventArgs) =>
{ {
publishResultTaskSource?.SetResult(eventArgs.ReplyCode); ConnectionEventListener?.Disconnected(new DisconnectedEventArgs
publishResultTaskSource = null; {
ReplyCode = e.ReplyCode,
ReplyText = e.ReplyText
});
channelInstance = null;
}; };
channelInstance.BasicAcks += (sender, eventArgs) => channelInstance.BasicReturn += HandleBasicReturn;
{ channelInstance.BasicAcks += HandleBasicAck;
publishResultTaskSource?.SetResult(0); channelInstance.BasicNacks += HandleBasicNack;
publishResultTaskSource = null;
}; connectedDateTime = DateTime.UtcNow;
if (isReconnect)
ConnectionEventListener?.Reconnected();
else
ConnectionEventListener?.Connected();
ConnectionEventListener?.Connected();
logger.ConnectSuccess(ConnectionParams); logger.ConnectSuccess(ConnectionParams);
isReconnect = true;
break; break;
} }
catch (BrokerUnreachableException e) catch (BrokerUnreachableException e)
{ {
logger.ConnectFailed(ConnectionParams, e); logger.ConnectFailed(ConnectionParams, e);
attempts++;
if (maxAttempts.HasValue && attempts > maxAttempts.Value)
throw;
Thread.Sleep(ReconnectDelay); Thread.Sleep(ReconnectDelay);
} }
} }
@ -281,6 +371,93 @@ namespace Tapeti.Connection
} }
private void HandleBasicReturn(object sender, BasicReturnEventArgs e)
{
/*
* "If the message is also published as mandatory, the basic.return is sent to the client before basic.ack."
* - https://www.rabbitmq.com/confirms.html
*
* Because there is no delivery tag included in the basic.return message. This solution is modeled after
* user OhJeez' answer on StackOverflow:
*
* "Since all messages with the same routing key are routed the same way. I assumed that once I get a
* basic.return about a specific routing key, all messages with this routing key can be considered undelivered"
* https://stackoverflow.com/questions/21336659/how-to-tell-which-amqp-message-was-not-routed-from-basic-return-response
*/
var key = GetReturnKey(e.Exchange, e.RoutingKey);
if (!returnRoutingKeys.TryGetValue(key, out var returnInfo))
{
returnInfo = new ReturnInfo
{
RefCount = 0,
FirstReplyCode = e.ReplyCode
};
returnRoutingKeys.Add(key, returnInfo);
}
returnInfo.RefCount++;
}
private void HandleBasicAck(object sender, BasicAckEventArgs e)
{
foreach (var deliveryTag in GetDeliveryTags(e))
{
if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo))
continue;
if (returnRoutingKeys.TryGetValue(messageInfo.ReturnKey, out var returnInfo))
{
messageInfo.CompletionSource.SetResult(returnInfo.FirstReplyCode);
returnInfo.RefCount--;
if (returnInfo.RefCount == 0)
returnRoutingKeys.Remove(messageInfo.ReturnKey);
}
messageInfo.CompletionSource.SetResult(0);
confirmMessages.Remove(deliveryTag);
}
}
private void HandleBasicNack(object sender, BasicNackEventArgs e)
{
foreach (var deliveryTag in GetDeliveryTags(e))
{
if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo))
continue;
messageInfo.CompletionSource.SetCanceled();
confirmMessages.Remove(e.DeliveryTag);
}
}
private IEnumerable<ulong> GetDeliveryTags(BasicAckEventArgs e)
{
return e.Multiple
? confirmMessages.Keys.Where(tag => tag <= e.DeliveryTag).ToArray()
: new[] { e.DeliveryTag };
}
private IEnumerable<ulong> GetDeliveryTags(BasicNackEventArgs e)
{
return e.Multiple
? confirmMessages.Keys.Where(tag => tag <= e.DeliveryTag).ToArray()
: new[] { e.DeliveryTag };
}
private static string GetReturnKey(string exchange, string routingKey)
{
return exchange + ':' + routingKey;
}
private class PublishContext : IPublishContext private class PublishContext : IPublishContext
{ {
public IDependencyResolver DependencyResolver { get; set; } public IDependencyResolver DependencyResolver { get; set; }

View File

@ -0,0 +1,9 @@
using System;
namespace Tapeti.Exceptions
{
public class NackException : Exception
{
public NackException(string message) : base(message) { }
}
}

View File

@ -462,6 +462,8 @@ namespace Tapeti
protected class Queue : IDynamicQueue protected class Queue : IDynamicQueue
{ {
private readonly string declareQueueName;
public bool Dynamic { get; } public bool Dynamic { get; }
public string Name { get; set; } public string Name { get; set; }
public IEnumerable<IBinding> Bindings { get; } public IEnumerable<IBinding> Bindings { get; }
@ -469,12 +471,20 @@ namespace Tapeti
public Queue(QueueInfo queue, IEnumerable<IBinding> bindings) public Queue(QueueInfo queue, IEnumerable<IBinding> bindings)
{ {
declareQueueName = queue.Name;
Dynamic = queue.Dynamic.GetValueOrDefault(); Dynamic = queue.Dynamic.GetValueOrDefault();
Name = queue.Name; Name = queue.Name;
Bindings = bindings; Bindings = bindings;
} }
public string GetDeclareQueueName()
{
return declareQueueName;
}
public void SetName(string name) public void SetName(string name)
{ {
Name = name; Name = name;

View File

@ -8,6 +8,8 @@ using Tapeti.Connection;
namespace Tapeti namespace Tapeti
{ {
public delegate void DisconnectedEventHandler(object sender, DisconnectedEventArgs e);
public class TapetiConnection : IDisposable public class TapetiConnection : IDisposable
{ {
private readonly IConfig config; private readonly IConfig config;
@ -29,11 +31,10 @@ namespace Tapeti
} }
public event EventHandler Connected; public event EventHandler Connected;
public event DisconnectedEventHandler Disconnected;
public event EventHandler Disconnected;
public event EventHandler Reconnected; public event EventHandler Reconnected;
public async Task<ISubscriber> Subscribe(bool startConsuming = true) public async Task<ISubscriber> Subscribe(bool startConsuming = true)
{ {
if (subscriber == null) if (subscriber == null)
@ -87,9 +88,9 @@ namespace Tapeti
owner.OnConnected(new EventArgs()); owner.OnConnected(new EventArgs());
} }
public void Disconnected() public void Disconnected(DisconnectedEventArgs e)
{ {
owner.OnDisconnected(new EventArgs()); owner.OnDisconnected(e);
} }
public void Reconnected() public void Reconnected()
@ -114,7 +115,7 @@ namespace Tapeti
}); });
} }
protected virtual void OnDisconnected(EventArgs e) protected virtual void OnDisconnected(DisconnectedEventArgs e)
{ {
Task.Run(() => Disconnected?.Invoke(this, e)); Task.Run(() => Disconnected?.Invoke(this, e));
} }

View File

@ -27,7 +27,7 @@ namespace Tapeti.Tasks
} }
public Task<T> Add<T>(Func<T> func) public Task Add(Func<Task> func)
{ {
lock (previousTaskLock) lock (previousTaskLock)
{ {
@ -36,7 +36,11 @@ namespace Tapeti.Tasks
, singleThreadScheduler.Value); , singleThreadScheduler.Value);
previousTask = task; previousTask = task;
return task;
// 'task' completes at the moment a Task is returned (for example, an await is encountered),
// this is used to chain the next. We return the unwrapped Task however, so that the caller
// awaits until the full task chain has completed.
return task.Unwrap();
} }
} }