1
0
mirror of synced 2024-11-05 02:59:16 +00:00

Possible fix for #18 Two consumers from same connection after reconnect

- Consume calls still in the TapetiClient task queue while it is reconnecting would not be cancelled, but new calls were added as well after the reconnect
- Unrelated but useful: added LocalPort and Disconnect event to logging
This commit is contained in:
Mark van Renswoude 2019-10-10 16:03:12 +02:00
parent 9a85702ed8
commit 2e2a77a7ae
14 changed files with 364 additions and 132 deletions

View File

@ -23,34 +23,44 @@ namespace Tapeti.Serilog
/// <inheritdoc /> /// <inheritdoc />
public void Connect(TapetiConnectionParams connectionParams, bool isReconnect) public void Connect(IConnectContext connectContext)
{ {
seriLogger seriLogger
.ForContext("isReconnect", isReconnect) .ForContext("isReconnect", connectContext.IsReconnect)
.Information("Tapeti: trying to connect to {host}:{port}/{virtualHost}", .Information("Tapeti: trying to connect to {host}:{port}/{virtualHost}",
connectionParams.HostName, connectContext.ConnectionParams.HostName,
connectionParams.Port, connectContext.ConnectionParams.Port,
connectionParams.VirtualHost); connectContext.ConnectionParams.VirtualHost);
} }
/// <inheritdoc /> /// <inheritdoc />
public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception) public void ConnectFailed(IConnectFailedContext connectContext)
{ {
seriLogger.Error(exception, "Tapeti: could not connect to {host}:{port}/{virtualHost}", seriLogger.Error(connectContext.Exception, "Tapeti: could not connect to {host}:{port}/{virtualHost}",
connectionParams.HostName, connectContext.ConnectionParams.HostName,
connectionParams.Port, connectContext.ConnectionParams.Port,
connectionParams.VirtualHost); connectContext.ConnectionParams.VirtualHost);
} }
/// <inheritdoc /> /// <inheritdoc />
public void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect) public void ConnectSuccess(IConnectSuccessContext connectContext)
{ {
seriLogger seriLogger
.ForContext("isReconnect", isReconnect) .ForContext("isReconnect", connectContext.IsReconnect)
.Information("Tapeti: successfully connected to {host}:{port}/{virtualHost}", .Information("Tapeti: successfully connected to {host}:{port}/{virtualHost} on local port {localPort}",
connectionParams.HostName, connectContext.ConnectionParams.HostName,
connectionParams.Port, connectContext.ConnectionParams.Port,
connectionParams.VirtualHost); connectContext.ConnectionParams.VirtualHost,
connectContext.LocalPort);
}
/// <inheritdoc />
public void Disconnect(IDisconnectContext disconnectContext)
{
seriLogger
.Information("Tapeti: connection closed, reply text = {replyText}, reply code = {replyCode}",
disconnectContext.ReplyText,
disconnectContext.ReplyCode);
} }
/// <inheritdoc /> /// <inheritdoc />

View File

@ -1,5 +1,6 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation"> <wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_LINES/@EntryValue">False</s:Boolean> <s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_LINES/@EntryValue">False</s:Boolean>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=API/@EntryIndexedValue">API</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ID/@EntryIndexedValue">ID</s:String> <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ID/@EntryIndexedValue">ID</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=KV/@EntryIndexedValue">KV</s:String> <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=KV/@EntryIndexedValue">KV</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /&gt;</s:String> <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /&gt;</s:String>

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Tapeti.Config namespace Tapeti.Config

View File

@ -8,13 +8,13 @@
/// <summary> /// <summary>
/// Called when a connection to RabbitMQ has been established. /// Called when a connection to RabbitMQ has been established.
/// </summary> /// </summary>
void Connected(); void Connected(ConnectedEventArgs e);
/// <summary> /// <summary>
/// Called when the connection to RabbitMQ has been lost. /// Called when the connection to RabbitMQ has been lost.
/// </summary> /// </summary>
void Reconnected(); void Reconnected(ConnectedEventArgs e);
/// <summary> /// <summary>

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
@ -73,43 +74,49 @@ namespace Tapeti.Connection
/// <summary> /// <summary>
/// Starts a consumer for the specified queue, using the provided bindings to handle messages. /// Starts a consumer for the specified queue, using the provided bindings to handle messages.
/// </summary> /// </summary>
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
/// <param name="queueName"></param> /// <param name="queueName"></param>
/// <param name="consumer">The consumer implementation which will receive the messages from the queue</param> /// <param name="consumer">The consumer implementation which will receive the messages from the queue</param>
Task Consume(string queueName, IConsumer consumer); Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
/// <summary> /// <summary>
/// Creates a durable queue if it does not already exist, and updates the bindings. /// Creates a durable queue if it does not already exist, and updates the bindings.
/// </summary> /// </summary>
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
/// <param name="queueName">The name of the queue to create</param> /// <param name="queueName">The name of the queue to create</param>
/// <param name="bindings">A list of bindings. Any bindings already on the queue which are not in this list will be removed</param> /// <param name="bindings">A list of bindings. Any bindings already on the queue which are not in this list will be removed</param>
Task DurableQueueDeclare(string queueName, IEnumerable<QueueBinding> bindings); Task DurableQueueDeclare(CancellationToken cancellationToken, string queueName, IEnumerable<QueueBinding> bindings);
/// <summary> /// <summary>
/// Verifies a durable queue exists. Will raise an exception if it does not. /// Verifies a durable queue exists. Will raise an exception if it does not.
/// </summary> /// </summary>
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
/// <param name="queueName">The name of the queue to verify</param> /// <param name="queueName">The name of the queue to verify</param>
Task DurableQueueVerify(string queueName); Task DurableQueueVerify(CancellationToken cancellationToken, string queueName);
/// <summary> /// <summary>
/// Deletes a durable queue. /// Deletes a durable queue.
/// </summary> /// </summary>
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
/// <param name="queueName">The name of the queue to delete</param> /// <param name="queueName">The name of the queue to delete</param>
/// <param name="onlyIfEmpty">If true, the queue will only be deleted if it is empty otherwise all bindings will be removed. If false, the queue is deleted even if there are queued messages.</param> /// <param name="onlyIfEmpty">If true, the queue will only be deleted if it is empty otherwise all bindings will be removed. If false, the queue is deleted even if there are queued messages.</param>
Task DurableQueueDelete(string queueName, bool onlyIfEmpty = true); Task DurableQueueDelete(CancellationToken cancellationToken, string queueName, bool onlyIfEmpty = true);
/// <summary> /// <summary>
/// Creates a dynamic queue. /// Creates a dynamic queue.
/// </summary> /// </summary>
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param> /// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
Task<string> DynamicQueueDeclare(string queuePrefix = null); Task<string> DynamicQueueDeclare(CancellationToken cancellationToken, string queuePrefix = null);
/// <summary> /// <summary>
/// Add a binding to a dynamic queue. /// Add a binding to a dynamic queue.
/// </summary> /// </summary>
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
/// <param name="queueName">The name of the dynamic queue previously created using DynamicQueueDeclare</param> /// <param name="queueName">The name of the dynamic queue previously created using DynamicQueueDeclare</param>
/// <param name="binding">The binding to add to the dynamic queue</param> /// <param name="binding">The binding to add to the dynamic queue</param>
Task DynamicQueueBind(string queueName, QueueBinding binding); Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding);
/// <summary> /// <summary>

View File

@ -1,4 +1,4 @@
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using RabbitMQ.Client; using RabbitMQ.Client;
using Tapeti.Default; using Tapeti.Default;

View File

@ -184,7 +184,7 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public async Task Consume(string queueName, IConsumer consumer) public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
{ {
if (deletedQueues.Contains(queueName)) if (deletedQueues.Contains(queueName))
return; return;
@ -192,8 +192,12 @@ namespace Tapeti.Connection
if (string.IsNullOrEmpty(queueName)) if (string.IsNullOrEmpty(queueName))
throw new ArgumentNullException(nameof(queueName)); throw new ArgumentNullException(nameof(queueName));
await QueueWithRetryableChannel(channel => await QueueWithRetryableChannel(channel =>
{ {
if (cancellationToken.IsCancellationRequested)
return;
var basicConsumer = new TapetiBasicConsumer(consumer, Respond); var basicConsumer = new TapetiBasicConsumer(consumer, Respond);
channel.BasicConsume(queueName, false, basicConsumer); channel.BasicConsume(queueName, false, basicConsumer);
}); });
@ -229,15 +233,16 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public async Task DurableQueueDeclare(string queueName, IEnumerable<QueueBinding> bindings) public async Task DurableQueueDeclare(CancellationToken cancellationToken, string queueName, IEnumerable<QueueBinding> bindings)
{
await taskQueue.Value.Add(async () =>
{ {
var existingBindings = (await GetQueueBindings(queueName)).ToList(); var existingBindings = (await GetQueueBindings(queueName)).ToList();
var currentBindings = bindings.ToList(); var currentBindings = bindings.ToList();
WithRetryableChannel(channel => await Queue(channel =>
{ {
if (cancellationToken.IsCancellationRequested)
return;
channel.QueueDeclare(queueName, true, false, false); channel.QueueDeclare(queueName, true, false, false);
foreach (var binding in currentBindings.Except(existingBindings)) foreach (var binding in currentBindings.Except(existingBindings))
@ -249,28 +254,33 @@ namespace Tapeti.Connection
foreach (var deletedBinding in existingBindings.Except(currentBindings)) foreach (var deletedBinding in existingBindings.Except(currentBindings))
channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey); channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey);
}); });
});
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task DurableQueueVerify(string queueName) public async Task DurableQueueVerify(CancellationToken cancellationToken, string queueName)
{ {
await QueueWithRetryableChannel(channel => await Queue(channel =>
{ {
if (cancellationToken.IsCancellationRequested)
return;
channel.QueueDeclarePassive(queueName); channel.QueueDeclarePassive(queueName);
}); });
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task DurableQueueDelete(string queueName, bool onlyIfEmpty = true) public async Task DurableQueueDelete(CancellationToken cancellationToken, string queueName, bool onlyIfEmpty = true)
{ {
if (!onlyIfEmpty) if (!onlyIfEmpty)
{ {
uint deletedMessages = 0; uint deletedMessages = 0;
await QueueWithRetryableChannel(channel => await Queue(channel =>
{ {
if (cancellationToken.IsCancellationRequested)
return;
deletedMessages = channel.QueueDelete(queueName); deletedMessages = channel.QueueDelete(queueName);
}); });
@ -285,6 +295,9 @@ namespace Tapeti.Connection
bool retry; bool retry;
do do
{ {
if (cancellationToken.IsCancellationRequested)
break;
retry = false; retry = false;
// Get queue information from the Management API, since the AMQP operations will // Get queue information from the Management API, since the AMQP operations will
@ -304,10 +317,8 @@ namespace Tapeti.Connection
// includes the GetQueueInfo, the next time around it should have Messages > 0 // includes the GetQueueInfo, the next time around it should have Messages > 0
try try
{ {
WithRetryableChannel(channel => var channel = GetChannel();
{
channel.QueueDelete(queueName, false, true); channel.QueueDelete(queueName, false, true);
});
deletedQueues.Add(queueName); deletedQueues.Add(queueName);
logger.QueueObsolete(queueName, true, 0); logger.QueueObsolete(queueName, true, 0);
@ -327,11 +338,10 @@ namespace Tapeti.Connection
if (existingBindings.Count > 0) if (existingBindings.Count > 0)
{ {
WithRetryableChannel(channel => var channel = GetChannel();
{
foreach (var binding in existingBindings) foreach (var binding in existingBindings)
channel.QueueUnbind(queueName, binding.Exchange, binding.RoutingKey); channel.QueueUnbind(queueName, binding.Exchange, binding.RoutingKey);
});
} }
logger.QueueObsolete(queueName, false, queueInfo.Messages); logger.QueueObsolete(queueName, false, queueInfo.Messages);
@ -342,12 +352,15 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public async Task<string> DynamicQueueDeclare(string queuePrefix = null) public async Task<string> DynamicQueueDeclare(CancellationToken cancellationToken, string queuePrefix = null)
{ {
string queueName = null; string queueName = null;
await QueueWithRetryableChannel(channel => await Queue(channel =>
{ {
if (cancellationToken.IsCancellationRequested)
return;
if (!string.IsNullOrEmpty(queuePrefix)) if (!string.IsNullOrEmpty(queuePrefix))
{ {
queueName = queuePrefix + "." + Guid.NewGuid().ToString("N"); queueName = queuePrefix + "." + Guid.NewGuid().ToString("N");
@ -361,10 +374,13 @@ namespace Tapeti.Connection
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task DynamicQueueBind(string queueName, QueueBinding binding) public async Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding)
{ {
await QueueWithRetryableChannel(channel => await Queue(channel =>
{ {
if (cancellationToken.IsCancellationRequested)
return;
DeclareExchange(channel, binding.Exchange); DeclareExchange(channel, binding.Exchange);
channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey); channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
}); });
@ -543,6 +559,16 @@ namespace Tapeti.Connection
} }
private async Task Queue(Action<IModel> operation)
{
await taskQueue.Value.Add(() =>
{
var channel = GetChannel();
operation(channel);
});
}
private async Task QueueWithRetryableChannel(Action<IModel> operation) private async Task QueueWithRetryableChannel(Action<IModel> operation)
{ {
await taskQueue.Value.Add(() => await taskQueue.Value.Add(() =>
@ -613,7 +639,7 @@ namespace Tapeti.Connection
{ {
try try
{ {
logger.Connect(connectionParams, isReconnect); logger.Connect(new ConnectContext(connectionParams, isReconnect));
connection = connectionFactory.CreateConnection(); connection = connectionFactory.CreateConnection();
channelInstance = connection.CreateModel(); channelInstance = connection.CreateModel();
@ -652,6 +678,8 @@ namespace Tapeti.Connection
ReplyText = e.ReplyText ReplyText = e.ReplyText
}); });
logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText));
channelInstance = null; channelInstance = null;
if (!isClosing) if (!isClosing)
@ -664,19 +692,25 @@ namespace Tapeti.Connection
connectedDateTime = DateTime.UtcNow; connectedDateTime = DateTime.UtcNow;
if (isReconnect) var connectedEventArgs = new ConnectedEventArgs
ConnectionEventListener?.Reconnected(); {
else ConnectionParams = connectionParams,
ConnectionEventListener?.Connected(); LocalPort = connection.LocalPort
};
logger.ConnectSuccess(connectionParams, isReconnect); if (isReconnect)
ConnectionEventListener?.Reconnected(connectedEventArgs);
else
ConnectionEventListener?.Connected(connectedEventArgs);
logger.ConnectSuccess(new ConnectContext(connectionParams, isReconnect, connection.LocalPort));
isReconnect = true; isReconnect = true;
break; break;
} }
catch (BrokerUnreachableException e) catch (BrokerUnreachableException e)
{ {
logger.ConnectFailed(connectionParams, e); logger.ConnectFailed(new ConnectContext(connectionParams, isReconnect, exception: e));
Thread.Sleep(ReconnectDelay); Thread.Sleep(ReconnectDelay);
} }
} }
@ -786,5 +820,40 @@ namespace Tapeti.Connection
{ {
return exchange + ':' + routingKey; return exchange + ':' + routingKey;
} }
private class ConnectContext : IConnectSuccessContext, IConnectFailedContext
{
public TapetiConnectionParams ConnectionParams { get; }
public bool IsReconnect { get; }
public int LocalPort { get; }
public Exception Exception { get; }
public ConnectContext(TapetiConnectionParams connectionParams, bool isReconnect, int localPort = 0, Exception exception = null)
{
ConnectionParams = connectionParams;
IsReconnect = isReconnect;
LocalPort = localPort;
Exception = exception;
}
}
private class DisconnectContext : IDisconnectContext
{
public TapetiConnectionParams ConnectionParams { get; }
public ushort ReplyCode { get; }
public string ReplyText { get; }
public DisconnectContext(TapetiConnectionParams connectionParams, ushort replyCode, string replyText)
{
ConnectionParams = connectionParams;
ReplyCode = replyCode;
ReplyText = replyText;
}
}
} }
} }

View File

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
@ -13,6 +14,8 @@ namespace Tapeti.Connection
private readonly ITapetiConfig config; private readonly ITapetiConfig config;
private bool consuming; private bool consuming;
private CancellationTokenSource initializeCancellationTokenSource;
/// <inheritdoc /> /// <inheritdoc />
public TapetiSubscriber(Func<ITapetiClient> clientFactory, ITapetiConfig config) public TapetiSubscriber(Func<ITapetiClient> clientFactory, ITapetiConfig config)
@ -22,35 +25,56 @@ namespace Tapeti.Connection
} }
public void Dispose()
{
}
/// <summary> /// <summary>
/// Applies the configured bindings and declares the queues in RabbitMQ. For internal use only. /// Applies the configured bindings and declares the queues in RabbitMQ. For internal use only.
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
public async Task ApplyBindings() public async Task ApplyBindings()
{ {
var routingKeyStrategy = config.DependencyResolver.Resolve<IRoutingKeyStrategy>(); initializeCancellationTokenSource = new CancellationTokenSource();
var exchangeStrategy = config.DependencyResolver.Resolve<IExchangeStrategy>(); await ApplyBindings(initializeCancellationTokenSource.Token);
}
var bindingTarget = config.Features.DeclareDurableQueues
? (CustomBindingTarget)new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy)
: new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy);
await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget))); /// <summary>
await bindingTarget.Apply(); /// Called after the connection is lost. For internal use only.
/// Guaranteed to be called from within the taskQueue thread.
/// </summary>
public void Disconnect()
{
initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = null;
} }
/// <summary> /// <summary>
/// Called after the connection is lost and regained. Reapplies the bindings and if Resume /// Called after the connection is lost and regained. Reapplies the bindings and if Resume
/// has already been called, restarts the consumers. For internal use only. /// has already been called, restarts the consumers. For internal use only.
/// Guaranteed to be called from within the taskQueue thread.
/// </summary> /// </summary>
/// <returns></returns> public void Reconnect()
public async Task Reconnect()
{ {
await ApplyBindings(); CancellationToken cancellationToken;
if (consuming) initializeCancellationTokenSource?.Cancel();
await ConsumeQueues(); initializeCancellationTokenSource = new CancellationTokenSource();
cancellationToken = initializeCancellationTokenSource.Token;
// ReSharper disable once MethodSupportsCancellation
Task.Run(async () =>
{
await ApplyBindings(cancellationToken);
if (consuming && !cancellationToken.IsCancellationRequested)
await ConsumeQueues(cancellationToken);
});
} }
@ -61,11 +85,28 @@ namespace Tapeti.Connection
return; return;
consuming = true; consuming = true;
await ConsumeQueues(); initializeCancellationTokenSource = new CancellationTokenSource();
await ConsumeQueues(initializeCancellationTokenSource.Token);
} }
private async Task ConsumeQueues()
private async Task ApplyBindings(CancellationToken cancellationToken)
{
var routingKeyStrategy = config.DependencyResolver.Resolve<IRoutingKeyStrategy>();
var exchangeStrategy = config.DependencyResolver.Resolve<IExchangeStrategy>();
var bindingTarget = config.Features.DeclareDurableQueues
? (CustomBindingTarget)new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
: new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget)));
await bindingTarget.Apply();
}
private async Task ConsumeQueues(CancellationToken cancellationToken)
{ {
var queues = config.Bindings.GroupBy(binding => binding.QueueName); var queues = config.Bindings.GroupBy(binding => binding.QueueName);
@ -74,7 +115,7 @@ namespace Tapeti.Connection
var queueName = group.Key; var queueName = group.Key;
var consumer = new TapetiConsumer(config, queueName, group); var consumer = new TapetiConsumer(config, queueName, group);
await clientFactory().Consume(queueName, consumer); await clientFactory().Consume(cancellationToken, queueName, consumer);
})); }));
} }
@ -84,6 +125,7 @@ namespace Tapeti.Connection
protected readonly Func<ITapetiClient> ClientFactory; protected readonly Func<ITapetiClient> ClientFactory;
protected readonly IRoutingKeyStrategy RoutingKeyStrategy; protected readonly IRoutingKeyStrategy RoutingKeyStrategy;
protected readonly IExchangeStrategy ExchangeStrategy; protected readonly IExchangeStrategy ExchangeStrategy;
protected readonly CancellationToken CancellationToken;
private struct DynamicQueueInfo private struct DynamicQueueInfo
{ {
@ -94,11 +136,12 @@ namespace Tapeti.Connection
private readonly Dictionary<string, List<DynamicQueueInfo>> dynamicQueues = new Dictionary<string, List<DynamicQueueInfo>>(); private readonly Dictionary<string, List<DynamicQueueInfo>> dynamicQueues = new Dictionary<string, List<DynamicQueueInfo>>();
protected CustomBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy) protected CustomBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken)
{ {
ClientFactory = clientFactory; ClientFactory = clientFactory;
RoutingKeyStrategy = routingKeyStrategy; RoutingKeyStrategy = routingKeyStrategy;
ExchangeStrategy = exchangeStrategy; ExchangeStrategy = exchangeStrategy;
CancellationToken = cancellationToken;
} }
@ -122,7 +165,7 @@ namespace Tapeti.Connection
var routingKey = RoutingKeyStrategy.GetRoutingKey(messageClass); var routingKey = RoutingKeyStrategy.GetRoutingKey(messageClass);
var exchange = ExchangeStrategy.GetExchange(messageClass); var exchange = ExchangeStrategy.GetExchange(messageClass);
await ClientFactory().DynamicQueueBind(result.QueueName, new QueueBinding(exchange, routingKey)); await ClientFactory().DynamicQueueBind(CancellationToken, result.QueueName, new QueueBinding(exchange, routingKey));
return result.QueueName; return result.QueueName;
} }
@ -139,7 +182,7 @@ namespace Tapeti.Connection
{ {
// If we don't know the routing key, always create a new queue to ensure there is no overlap. // If we don't know the routing key, always create a new queue to ensure there is no overlap.
// Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either. // Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either.
return await ClientFactory().DynamicQueueDeclare(queuePrefix); return await ClientFactory().DynamicQueueDeclare(CancellationToken, queuePrefix);
} }
@ -181,7 +224,7 @@ namespace Tapeti.Connection
} }
// Declare a new queue // Declare a new queue
var queueName = await ClientFactory().DynamicQueueDeclare(queuePrefix); var queueName = await ClientFactory().DynamicQueueDeclare(CancellationToken, queuePrefix);
var queueInfo = new DynamicQueueInfo var queueInfo = new DynamicQueueInfo
{ {
QueueName = queueName, QueueName = queueName,
@ -205,7 +248,7 @@ namespace Tapeti.Connection
private readonly HashSet<string> obsoleteDurableQueues = new HashSet<string>(); private readonly HashSet<string> obsoleteDurableQueues = new HashSet<string>();
public DeclareDurableQueuesBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy) : base(clientFactory, routingKeyStrategy, exchangeStrategy) public DeclareDurableQueuesBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
{ {
} }
@ -247,13 +290,13 @@ namespace Tapeti.Connection
public override async Task Apply() public override async Task Apply()
{ {
var worker = ClientFactory(); var client = ClientFactory();
await DeclareQueues(worker); await DeclareQueues(client);
await DeleteObsoleteQueues(worker); await DeleteObsoleteQueues(client);
} }
private async Task DeclareQueues(ITapetiClient worker) private async Task DeclareQueues(ITapetiClient client)
{ {
await Task.WhenAll(durableQueues.Select(async queue => await Task.WhenAll(durableQueues.Select(async queue =>
{ {
@ -265,16 +308,16 @@ namespace Tapeti.Connection
return new QueueBinding(exchange, routingKey); return new QueueBinding(exchange, routingKey);
}); });
await worker.DurableQueueDeclare(queue.Key, bindings); await client.DurableQueueDeclare(CancellationToken, queue.Key, bindings);
})); }));
} }
private async Task DeleteObsoleteQueues(ITapetiClient worker) private async Task DeleteObsoleteQueues(ITapetiClient client)
{ {
await Task.WhenAll(obsoleteDurableQueues.Except(durableQueues.Keys).Select(async queue => await Task.WhenAll(obsoleteDurableQueues.Except(durableQueues.Keys).Select(async queue =>
{ {
await worker.DurableQueueDelete(queue); await client.DurableQueueDelete(CancellationToken, queue);
})); }));
} }
} }
@ -285,7 +328,7 @@ namespace Tapeti.Connection
private readonly List<string> durableQueues = new List<string>(); private readonly List<string> durableQueues = new List<string>();
public PassiveDurableQueuesBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy) : base(clientFactory, routingKeyStrategy, exchangeStrategy) public PassiveDurableQueuesBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
{ {
} }
@ -310,7 +353,7 @@ namespace Tapeti.Connection
{ {
if (!durableQueues.Contains(queueName)) if (!durableQueues.Contains(queueName))
{ {
await ClientFactory().DurableQueueVerify(queueName); await ClientFactory().DurableQueueVerify(CancellationToken, queueName);
durableQueues.Add(queueName); durableQueues.Add(queueName);
} }
} }

View File

@ -10,21 +10,27 @@ namespace Tapeti.Default
public class ConsoleLogger : ILogger public class ConsoleLogger : ILogger
{ {
/// <inheritdoc /> /// <inheritdoc />
public void Connect(TapetiConnectionParams connectionParams, bool isReconnect) public void Connect(IConnectContext connectContext)
{ {
Console.WriteLine($"[Tapeti] {(isReconnect ? "Reconnecting" : "Connecting")} to {connectionParams.HostName}:{connectionParams.Port}{connectionParams.VirtualHost}"); Console.WriteLine($"[Tapeti] {(connectContext.IsReconnect ? "Reconnecting" : "Connecting")} to {connectContext.ConnectionParams.HostName}:{connectContext.ConnectionParams.Port}{connectContext.ConnectionParams.VirtualHost}");
} }
/// <inheritdoc /> /// <inheritdoc />
public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception) public void ConnectFailed(IConnectFailedContext connectContext)
{ {
Console.WriteLine($"[Tapeti] Connection failed: {exception}"); Console.WriteLine($"[Tapeti] Connection failed: {connectContext.Exception}");
} }
/// <inheritdoc /> /// <inheritdoc />
public void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect) public void ConnectSuccess(IConnectSuccessContext connectContext)
{ {
Console.WriteLine($"[Tapeti] {(isReconnect ? "Reconnected" : "Connected")}"); Console.WriteLine($"[Tapeti] {(connectContext.IsReconnect ? "Reconnected" : "Connected")} using local port {connectContext.LocalPort}");
}
/// <inheritdoc />
public void Disconnect(IDisconnectContext disconnectContext)
{
Console.WriteLine($"[Tapeti] Connection closed: {(!string.IsNullOrEmpty(disconnectContext.ReplyText) ? disconnectContext.ReplyText : "<no reply text>")} (reply code: {disconnectContext.ReplyCode})");
} }
/// <inheritdoc /> /// <inheritdoc />

View File

@ -10,17 +10,22 @@ namespace Tapeti.Default
public class DevNullLogger : ILogger public class DevNullLogger : ILogger
{ {
/// <inheritdoc /> /// <inheritdoc />
public void Connect(TapetiConnectionParams connectionParams, bool isReconnect) public void Connect(IConnectContext connectContext)
{ {
} }
/// <inheritdoc /> /// <inheritdoc />
public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception) public void ConnectFailed(IConnectFailedContext connectContext)
{ {
} }
/// <inheritdoc /> /// <inheritdoc />
public void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect) public void ConnectSuccess(IConnectSuccessContext connectContext)
{
}
/// <inheritdoc />
public void Disconnect(IDisconnectContext disconnectContext)
{ {
} }

View File

@ -5,6 +5,23 @@ using System.Threading.Tasks;
namespace Tapeti namespace Tapeti
{ {
/// <summary>
///
/// </summary>
public class ConnectedEventArgs
{
/// <summary>
/// The connection parameters used to establish the connection.
/// </summary>
public TapetiConnectionParams ConnectionParams;
/// <summary>
/// The local port for the connection. Useful for identifying the connection in the management interface.
/// </summary>
public int LocalPort;
}
/// <summary> /// <summary>
/// Contains information about the reason for a lost connection. /// Contains information about the reason for a lost connection.
/// </summary> /// </summary>
@ -22,6 +39,9 @@ namespace Tapeti
} }
/// <inheritdoc />
public delegate void ConnectedEventHandler(object sender, ConnectedEventArgs e);
/// <inheritdoc /> /// <inheritdoc />
public delegate void DisconnectedEventHandler(object sender, DisconnectedEventArgs e); public delegate void DisconnectedEventHandler(object sender, DisconnectedEventArgs e);
@ -64,7 +84,7 @@ namespace Tapeti
/// <summary> /// <summary>
/// Fired when a connection to RabbitMQ has been established. /// Fired when a connection to RabbitMQ has been established.
/// </summary> /// </summary>
event EventHandler Connected; event ConnectedEventHandler Connected;
/// <summary> /// <summary>
/// Fired when the connection to RabbitMQ has been lost. /// Fired when the connection to RabbitMQ has been lost.
@ -74,7 +94,7 @@ namespace Tapeti
/// <summary> /// <summary>
/// Fired when the connection to RabbitMQ has been recovered after an unexpected disconnect. /// Fired when the connection to RabbitMQ has been recovered after an unexpected disconnect.
/// </summary> /// </summary>
event EventHandler Reconnected; event ConnectedEventHandler Reconnected;
} }
} }

View File

@ -5,6 +5,71 @@ using Tapeti.Config;
namespace Tapeti namespace Tapeti
{ {
/// <summary>
/// Contains information about the connection being established.
/// </summary>
public interface IConnectContext
{
/// <summary>
/// The connection parameters used to establish the connection.
/// </summary>
TapetiConnectionParams ConnectionParams { get; }
/// <summary>
/// Indicates whether this is an automatic reconnect or an initial connection.
/// </summary>
bool IsReconnect { get; }
}
/// <inheritdoc />
/// <summary>
/// Contains information about the failed connection.
/// </summary>
public interface IConnectFailedContext : IConnectContext
{
/// <summary>
/// The exception that caused the connection to fail.
/// </summary>
Exception Exception { get; }
}
/// <inheritdoc />
/// <summary>
/// Contains information about the established connection.
/// </summary>
public interface IConnectSuccessContext : IConnectContext
{
/// <summary>
/// The local port for the connection. Useful for identifying the connection in the management interface.
/// </summary>
int LocalPort { get; }
}
/// <summary>
/// Contains information about the disconnection.
/// </summary>
public interface IDisconnectContext
{
/// <summary>
/// The connection parameters used to establish the connection.
/// </summary>
TapetiConnectionParams ConnectionParams { get; }
/// <summary>
/// The reply code as provided by RabbitMQ, if the connection was closed by a protocol message.
/// </summary>
ushort ReplyCode { get; }
/// <summary>
/// The reply text as provided by RabbitMQ, if the connection was closed by a protocol message.
/// </summary>
string ReplyText { get; }
}
/// <summary> /// <summary>
/// Handles the logging of various events in Tapeti /// Handles the logging of various events in Tapeti
/// </summary> /// </summary>
@ -17,23 +82,26 @@ namespace Tapeti
/// <summary> /// <summary>
/// Called before a connection to RabbitMQ is attempted. /// Called before a connection to RabbitMQ is attempted.
/// </summary> /// </summary>
/// <param name="connectionParams"></param> /// <param name="connectContext">Contains information about the connection being established.</param>
/// <param name="isReconnect">Indicates whether this is the initial connection or a reconnect</param> void Connect(IConnectContext connectContext);
void Connect(TapetiConnectionParams connectionParams, bool isReconnect);
/// <summary> /// <summary>
/// Called when the connection has failed or is lost. /// Called when the connection has failed.
/// </summary> /// </summary>
/// <param name="connectionParams"></param> /// <param name="connectContext">Contains information about the connection that has failed.</param>
/// <param name="exception"></param> void ConnectFailed(IConnectFailedContext connectContext);
void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception);
/// <summary> /// <summary>
/// Called when a connection to RabbitMQ has been succesfully established. /// Called when a connection to RabbitMQ has been succesfully established.
/// </summary> /// </summary>
/// <param name="connectionParams"></param> /// <param name="connectContext">Contains information about the established connection.</param>
/// <param name="isReconnect">Indicates whether this is the initial connection or a reconnect</param> void ConnectSuccess(IConnectSuccessContext connectContext);
void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect);
/// <summary>
/// Called when the connection to RabbitMQ is lost.
/// </summary>
/// <param name="disconnectContext">Contains information about the disconnect event.</param>
void Disconnect(IDisconnectContext disconnectContext);
/// <summary> /// <summary>
/// Called when an exception occurs in a consumer. /// Called when an exception occurs in a consumer.

View File

@ -1,11 +1,13 @@
using System.Threading.Tasks; using System;
using System.Threading.Tasks;
namespace Tapeti namespace Tapeti
{ {
/// <inheritdoc />
/// <summary> /// <summary>
/// Manages subscriptions to queues as configured by the bindings. /// Manages subscriptions to queues as configured by the bindings.
/// </summary> /// </summary>
public interface ISubscriber public interface ISubscriber : IDisposable
{ {
/// <summary> /// <summary>
/// Starts consuming from the subscribed queues if not already started. /// Starts consuming from the subscribed queues if not already started.

View File

@ -47,13 +47,13 @@ namespace Tapeti
} }
/// <inheritdoc /> /// <inheritdoc />
public event EventHandler Connected; public event ConnectedEventHandler Connected;
/// <inheritdoc /> /// <inheritdoc />
public event DisconnectedEventHandler Disconnected; public event DisconnectedEventHandler Disconnected;
/// <inheritdoc /> /// <inheritdoc />
public event EventHandler Reconnected; public event ConnectedEventHandler Reconnected;
/// <inheritdoc /> /// <inheritdoc />
@ -98,6 +98,8 @@ namespace Tapeti
public void Dispose() public void Dispose()
{ {
Close().Wait(); Close().Wait();
subscriber?.Dispose();
} }
@ -110,9 +112,9 @@ namespace Tapeti
this.owner = owner; this.owner = owner;
} }
public void Connected() public void Connected(ConnectedEventArgs e)
{ {
owner.OnConnected(new EventArgs()); owner.OnConnected(e);
} }
public void Disconnected(DisconnectedEventArgs e) public void Disconnected(DisconnectedEventArgs e)
@ -120,9 +122,9 @@ namespace Tapeti
owner.OnDisconnected(e); owner.OnDisconnected(e);
} }
public void Reconnected() public void Reconnected(ConnectedEventArgs e)
{ {
owner.OnReconnected(new EventArgs()); owner.OnReconnected(e);
} }
} }
@ -130,7 +132,7 @@ namespace Tapeti
/// <summary> /// <summary>
/// Called when a connection to RabbitMQ has been established. /// Called when a connection to RabbitMQ has been established.
/// </summary> /// </summary>
protected virtual void OnConnected(EventArgs e) protected virtual void OnConnected(ConnectedEventArgs e)
{ {
var connectedEvent = Connected; var connectedEvent = Connected;
if (connectedEvent == null) if (connectedEvent == null)
@ -142,19 +144,15 @@ namespace Tapeti
/// <summary> /// <summary>
/// Called when the connection to RabbitMQ has been lost. /// Called when the connection to RabbitMQ has been lost.
/// </summary> /// </summary>
protected virtual void OnReconnected(EventArgs e) protected virtual void OnReconnected(ConnectedEventArgs e)
{ {
var reconnectedEvent = Reconnected; var reconnectedEvent = Reconnected;
if (reconnectedEvent == null && subscriber == null) if (reconnectedEvent == null && subscriber == null)
return; return;
Task.Run(async () => subscriber?.Reconnect();
{
if (subscriber != null)
await subscriber.Reconnect();
reconnectedEvent?.Invoke(this, e); Task.Run(() => reconnectedEvent?.Invoke(this, e));
});
} }
/// <summary> /// <summary>
@ -166,6 +164,8 @@ namespace Tapeti
if (disconnectedEvent == null) if (disconnectedEvent == null)
return; return;
subscriber?.Disconnect();
Task.Run(() => disconnectedEvent.Invoke(this, e)); Task.Run(() => disconnectedEvent.Invoke(this, e));
} }
} }