diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs
index 3c55330..af5690b 100644
--- a/Tapeti.Serilog/TapetiSeriLogger.cs
+++ b/Tapeti.Serilog/TapetiSeriLogger.cs
@@ -23,34 +23,44 @@ namespace Tapeti.Serilog
///
- public void Connect(TapetiConnectionParams connectionParams, bool isReconnect)
+ public void Connect(IConnectContext connectContext)
{
seriLogger
- .ForContext("isReconnect", isReconnect)
+ .ForContext("isReconnect", connectContext.IsReconnect)
.Information("Tapeti: trying to connect to {host}:{port}/{virtualHost}",
- connectionParams.HostName,
- connectionParams.Port,
- connectionParams.VirtualHost);
+ connectContext.ConnectionParams.HostName,
+ connectContext.ConnectionParams.Port,
+ connectContext.ConnectionParams.VirtualHost);
}
///
- public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception)
+ public void ConnectFailed(IConnectFailedContext connectContext)
{
- seriLogger.Error(exception, "Tapeti: could not connect to {host}:{port}/{virtualHost}",
- connectionParams.HostName,
- connectionParams.Port,
- connectionParams.VirtualHost);
+ seriLogger.Error(connectContext.Exception, "Tapeti: could not connect to {host}:{port}/{virtualHost}",
+ connectContext.ConnectionParams.HostName,
+ connectContext.ConnectionParams.Port,
+ connectContext.ConnectionParams.VirtualHost);
}
///
- public void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect)
+ public void ConnectSuccess(IConnectSuccessContext connectContext)
{
seriLogger
- .ForContext("isReconnect", isReconnect)
- .Information("Tapeti: successfully connected to {host}:{port}/{virtualHost}",
- connectionParams.HostName,
- connectionParams.Port,
- connectionParams.VirtualHost);
+ .ForContext("isReconnect", connectContext.IsReconnect)
+ .Information("Tapeti: successfully connected to {host}:{port}/{virtualHost} on local port {localPort}",
+ connectContext.ConnectionParams.HostName,
+ connectContext.ConnectionParams.Port,
+ connectContext.ConnectionParams.VirtualHost,
+ connectContext.LocalPort);
+ }
+
+ ///
+ public void Disconnect(IDisconnectContext disconnectContext)
+ {
+ seriLogger
+ .Information("Tapeti: connection closed, reply text = {replyText}, reply code = {replyCode}",
+ disconnectContext.ReplyText,
+ disconnectContext.ReplyCode);
}
///
diff --git a/Tapeti.sln.DotSettings b/Tapeti.sln.DotSettings
index a951efb..c1f05d7 100644
--- a/Tapeti.sln.DotSettings
+++ b/Tapeti.sln.DotSettings
@@ -1,5 +1,6 @@
False
+ API
ID
KV
<Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
diff --git a/Tapeti/Config/IBinding.cs b/Tapeti/Config/IBinding.cs
index 8cbb45f..7d924fa 100644
--- a/Tapeti/Config/IBinding.cs
+++ b/Tapeti/Config/IBinding.cs
@@ -1,4 +1,5 @@
using System;
+using System.Threading;
using System.Threading.Tasks;
namespace Tapeti.Config
diff --git a/Tapeti/Connection/IConnectionEventListener.cs b/Tapeti/Connection/IConnectionEventListener.cs
index 395b393..db602ad 100644
--- a/Tapeti/Connection/IConnectionEventListener.cs
+++ b/Tapeti/Connection/IConnectionEventListener.cs
@@ -8,13 +8,13 @@
///
/// Called when a connection to RabbitMQ has been established.
///
- void Connected();
+ void Connected(ConnectedEventArgs e);
///
/// Called when the connection to RabbitMQ has been lost.
///
- void Reconnected();
+ void Reconnected(ConnectedEventArgs e);
///
diff --git a/Tapeti/Connection/ITapetiClient.cs b/Tapeti/Connection/ITapetiClient.cs
index 3a28053..4add519 100644
--- a/Tapeti/Connection/ITapetiClient.cs
+++ b/Tapeti/Connection/ITapetiClient.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
using Tapeti.Config;
@@ -73,43 +74,49 @@ namespace Tapeti.Connection
///
/// Starts a consumer for the specified queue, using the provided bindings to handle messages.
///
+ /// Cancelled when the connection is lost
///
/// The consumer implementation which will receive the messages from the queue
- Task Consume(string queueName, IConsumer consumer);
+ Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
///
/// Creates a durable queue if it does not already exist, and updates the bindings.
///
+ /// Cancelled when the connection is lost
/// The name of the queue to create
/// A list of bindings. Any bindings already on the queue which are not in this list will be removed
- Task DurableQueueDeclare(string queueName, IEnumerable bindings);
+ Task DurableQueueDeclare(CancellationToken cancellationToken, string queueName, IEnumerable bindings);
///
/// Verifies a durable queue exists. Will raise an exception if it does not.
///
+ /// Cancelled when the connection is lost
/// The name of the queue to verify
- Task DurableQueueVerify(string queueName);
+ Task DurableQueueVerify(CancellationToken cancellationToken, string queueName);
///
/// Deletes a durable queue.
///
+ /// Cancelled when the connection is lost
/// The name of the queue to delete
/// If true, the queue will only be deleted if it is empty otherwise all bindings will be removed. If false, the queue is deleted even if there are queued messages.
- Task DurableQueueDelete(string queueName, bool onlyIfEmpty = true);
+ Task DurableQueueDelete(CancellationToken cancellationToken, string queueName, bool onlyIfEmpty = true);
///
/// Creates a dynamic queue.
///
+ /// Cancelled when the connection is lost
/// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.
- Task DynamicQueueDeclare(string queuePrefix = null);
+ Task DynamicQueueDeclare(CancellationToken cancellationToken, string queuePrefix = null);
///
/// Add a binding to a dynamic queue.
///
+ /// Cancelled when the connection is lost
/// The name of the dynamic queue previously created using DynamicQueueDeclare
/// The binding to add to the dynamic queue
- Task DynamicQueueBind(string queueName, QueueBinding binding);
+ Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding);
///
diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs
index 66d3856..92fec5a 100644
--- a/Tapeti/Connection/TapetiBasicConsumer.cs
+++ b/Tapeti/Connection/TapetiBasicConsumer.cs
@@ -1,4 +1,4 @@
-using System;
+using System;
using System.Threading.Tasks;
using RabbitMQ.Client;
using Tapeti.Default;
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index ba1c00e..9a91ed0 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -184,7 +184,7 @@ namespace Tapeti.Connection
///
- public async Task Consume(string queueName, IConsumer consumer)
+ public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
{
if (deletedQueues.Contains(queueName))
return;
@@ -192,8 +192,12 @@ namespace Tapeti.Connection
if (string.IsNullOrEmpty(queueName))
throw new ArgumentNullException(nameof(queueName));
+
await QueueWithRetryableChannel(channel =>
- {
+ {
+ if (cancellationToken.IsCancellationRequested)
+ return;
+
var basicConsumer = new TapetiBasicConsumer(consumer, Respond);
channel.BasicConsume(queueName, false, basicConsumer);
});
@@ -229,48 +233,54 @@ namespace Tapeti.Connection
///
- public async Task DurableQueueDeclare(string queueName, IEnumerable bindings)
+ public async Task DurableQueueDeclare(CancellationToken cancellationToken, string queueName, IEnumerable bindings)
{
- await taskQueue.Value.Add(async () =>
+ var existingBindings = (await GetQueueBindings(queueName)).ToList();
+ var currentBindings = bindings.ToList();
+
+ await Queue(channel =>
{
- var existingBindings = (await GetQueueBindings(queueName)).ToList();
- var currentBindings = bindings.ToList();
+ if (cancellationToken.IsCancellationRequested)
+ return;
- WithRetryableChannel(channel =>
+ channel.QueueDeclare(queueName, true, false, false);
+
+ foreach (var binding in currentBindings.Except(existingBindings))
{
- channel.QueueDeclare(queueName, true, false, false);
+ DeclareExchange(channel, binding.Exchange);
+ channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
+ }
- foreach (var binding in currentBindings.Except(existingBindings))
- {
- DeclareExchange(channel, binding.Exchange);
- channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
- }
-
- foreach (var deletedBinding in existingBindings.Except(currentBindings))
- channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey);
- });
+ foreach (var deletedBinding in existingBindings.Except(currentBindings))
+ channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey);
});
}
///
- public async Task DurableQueueVerify(string queueName)
+ public async Task DurableQueueVerify(CancellationToken cancellationToken, string queueName)
{
- await QueueWithRetryableChannel(channel =>
+ await Queue(channel =>
{
+ if (cancellationToken.IsCancellationRequested)
+ return;
+
channel.QueueDeclarePassive(queueName);
});
}
///
- public async Task DurableQueueDelete(string queueName, bool onlyIfEmpty = true)
+ public async Task DurableQueueDelete(CancellationToken cancellationToken, string queueName, bool onlyIfEmpty = true)
{
if (!onlyIfEmpty)
{
uint deletedMessages = 0;
- await QueueWithRetryableChannel(channel =>
+ await Queue(channel =>
{
+ if (cancellationToken.IsCancellationRequested)
+ return;
+
deletedMessages = channel.QueueDelete(queueName);
});
@@ -285,6 +295,9 @@ namespace Tapeti.Connection
bool retry;
do
{
+ if (cancellationToken.IsCancellationRequested)
+ break;
+
retry = false;
// Get queue information from the Management API, since the AMQP operations will
@@ -304,10 +317,8 @@ namespace Tapeti.Connection
// includes the GetQueueInfo, the next time around it should have Messages > 0
try
{
- WithRetryableChannel(channel =>
- {
- channel.QueueDelete(queueName, false, true);
- });
+ var channel = GetChannel();
+ channel.QueueDelete(queueName, false, true);
deletedQueues.Add(queueName);
logger.QueueObsolete(queueName, true, 0);
@@ -327,11 +338,10 @@ namespace Tapeti.Connection
if (existingBindings.Count > 0)
{
- WithRetryableChannel(channel =>
- {
- foreach (var binding in existingBindings)
- channel.QueueUnbind(queueName, binding.Exchange, binding.RoutingKey);
- });
+ var channel = GetChannel();
+
+ foreach (var binding in existingBindings)
+ channel.QueueUnbind(queueName, binding.Exchange, binding.RoutingKey);
}
logger.QueueObsolete(queueName, false, queueInfo.Messages);
@@ -342,12 +352,15 @@ namespace Tapeti.Connection
///
- public async Task DynamicQueueDeclare(string queuePrefix = null)
+ public async Task DynamicQueueDeclare(CancellationToken cancellationToken, string queuePrefix = null)
{
string queueName = null;
- await QueueWithRetryableChannel(channel =>
+ await Queue(channel =>
{
+ if (cancellationToken.IsCancellationRequested)
+ return;
+
if (!string.IsNullOrEmpty(queuePrefix))
{
queueName = queuePrefix + "." + Guid.NewGuid().ToString("N");
@@ -361,10 +374,13 @@ namespace Tapeti.Connection
}
///
- public async Task DynamicQueueBind(string queueName, QueueBinding binding)
+ public async Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding)
{
- await QueueWithRetryableChannel(channel =>
+ await Queue(channel =>
{
+ if (cancellationToken.IsCancellationRequested)
+ return;
+
DeclareExchange(channel, binding.Exchange);
channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
});
@@ -543,6 +559,16 @@ namespace Tapeti.Connection
}
+ private async Task Queue(Action operation)
+ {
+ await taskQueue.Value.Add(() =>
+ {
+ var channel = GetChannel();
+ operation(channel);
+ });
+ }
+
+
private async Task QueueWithRetryableChannel(Action operation)
{
await taskQueue.Value.Add(() =>
@@ -613,7 +639,7 @@ namespace Tapeti.Connection
{
try
{
- logger.Connect(connectionParams, isReconnect);
+ logger.Connect(new ConnectContext(connectionParams, isReconnect));
connection = connectionFactory.CreateConnection();
channelInstance = connection.CreateModel();
@@ -652,6 +678,8 @@ namespace Tapeti.Connection
ReplyText = e.ReplyText
});
+ logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText));
+
channelInstance = null;
if (!isClosing)
@@ -664,19 +692,25 @@ namespace Tapeti.Connection
connectedDateTime = DateTime.UtcNow;
- if (isReconnect)
- ConnectionEventListener?.Reconnected();
- else
- ConnectionEventListener?.Connected();
+ var connectedEventArgs = new ConnectedEventArgs
+ {
+ ConnectionParams = connectionParams,
+ LocalPort = connection.LocalPort
+ };
- logger.ConnectSuccess(connectionParams, isReconnect);
+ if (isReconnect)
+ ConnectionEventListener?.Reconnected(connectedEventArgs);
+ else
+ ConnectionEventListener?.Connected(connectedEventArgs);
+
+ logger.ConnectSuccess(new ConnectContext(connectionParams, isReconnect, connection.LocalPort));
isReconnect = true;
break;
}
catch (BrokerUnreachableException e)
{
- logger.ConnectFailed(connectionParams, e);
+ logger.ConnectFailed(new ConnectContext(connectionParams, isReconnect, exception: e));
Thread.Sleep(ReconnectDelay);
}
}
@@ -786,5 +820,40 @@ namespace Tapeti.Connection
{
return exchange + ':' + routingKey;
}
+
+
+
+ private class ConnectContext : IConnectSuccessContext, IConnectFailedContext
+ {
+ public TapetiConnectionParams ConnectionParams { get; }
+ public bool IsReconnect { get; }
+ public int LocalPort { get; }
+ public Exception Exception { get; }
+
+
+ public ConnectContext(TapetiConnectionParams connectionParams, bool isReconnect, int localPort = 0, Exception exception = null)
+ {
+ ConnectionParams = connectionParams;
+ IsReconnect = isReconnect;
+ LocalPort = localPort;
+ Exception = exception;
+ }
+ }
+
+
+ private class DisconnectContext : IDisconnectContext
+ {
+ public TapetiConnectionParams ConnectionParams { get; }
+ public ushort ReplyCode { get; }
+ public string ReplyText { get; }
+
+
+ public DisconnectContext(TapetiConnectionParams connectionParams, ushort replyCode, string replyText)
+ {
+ ConnectionParams = connectionParams;
+ ReplyCode = replyCode;
+ ReplyText = replyText;
+ }
+ }
}
}
diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs
index e91ff34..c57a424 100644
--- a/Tapeti/Connection/TapetiSubscriber.cs
+++ b/Tapeti/Connection/TapetiSubscriber.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using Tapeti.Config;
@@ -13,6 +14,8 @@ namespace Tapeti.Connection
private readonly ITapetiConfig config;
private bool consuming;
+ private CancellationTokenSource initializeCancellationTokenSource;
+
///
public TapetiSubscriber(Func clientFactory, ITapetiConfig config)
@@ -22,35 +25,56 @@ namespace Tapeti.Connection
}
+ public void Dispose()
+ {
+ }
+
+
+
///
/// Applies the configured bindings and declares the queues in RabbitMQ. For internal use only.
///
///
public async Task ApplyBindings()
{
- var routingKeyStrategy = config.DependencyResolver.Resolve();
- var exchangeStrategy = config.DependencyResolver.Resolve();
+ initializeCancellationTokenSource = new CancellationTokenSource();
+ await ApplyBindings(initializeCancellationTokenSource.Token);
+ }
- var bindingTarget = config.Features.DeclareDurableQueues
- ? (CustomBindingTarget)new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy)
- : new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy);
- await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget)));
- await bindingTarget.Apply();
+ ///
+ /// Called after the connection is lost. For internal use only.
+ /// Guaranteed to be called from within the taskQueue thread.
+ ///
+ public void Disconnect()
+ {
+ initializeCancellationTokenSource?.Cancel();
+ initializeCancellationTokenSource = null;
}
///
/// Called after the connection is lost and regained. Reapplies the bindings and if Resume
/// has already been called, restarts the consumers. For internal use only.
+ /// Guaranteed to be called from within the taskQueue thread.
///
- ///
- public async Task Reconnect()
+ public void Reconnect()
{
- await ApplyBindings();
+ CancellationToken cancellationToken;
- if (consuming)
- await ConsumeQueues();
+ initializeCancellationTokenSource?.Cancel();
+ initializeCancellationTokenSource = new CancellationTokenSource();
+
+ cancellationToken = initializeCancellationTokenSource.Token;
+
+ // ReSharper disable once MethodSupportsCancellation
+ Task.Run(async () =>
+ {
+ await ApplyBindings(cancellationToken);
+
+ if (consuming && !cancellationToken.IsCancellationRequested)
+ await ConsumeQueues(cancellationToken);
+ });
}
@@ -61,11 +85,28 @@ namespace Tapeti.Connection
return;
consuming = true;
- await ConsumeQueues();
+ initializeCancellationTokenSource = new CancellationTokenSource();
+
+ await ConsumeQueues(initializeCancellationTokenSource.Token);
}
- private async Task ConsumeQueues()
+
+ private async Task ApplyBindings(CancellationToken cancellationToken)
+ {
+ var routingKeyStrategy = config.DependencyResolver.Resolve();
+ var exchangeStrategy = config.DependencyResolver.Resolve();
+
+ var bindingTarget = config.Features.DeclareDurableQueues
+ ? (CustomBindingTarget)new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
+ : new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
+
+ await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget)));
+ await bindingTarget.Apply();
+ }
+
+
+ private async Task ConsumeQueues(CancellationToken cancellationToken)
{
var queues = config.Bindings.GroupBy(binding => binding.QueueName);
@@ -74,7 +115,7 @@ namespace Tapeti.Connection
var queueName = group.Key;
var consumer = new TapetiConsumer(config, queueName, group);
- await clientFactory().Consume(queueName, consumer);
+ await clientFactory().Consume(cancellationToken, queueName, consumer);
}));
}
@@ -84,6 +125,7 @@ namespace Tapeti.Connection
protected readonly Func ClientFactory;
protected readonly IRoutingKeyStrategy RoutingKeyStrategy;
protected readonly IExchangeStrategy ExchangeStrategy;
+ protected readonly CancellationToken CancellationToken;
private struct DynamicQueueInfo
{
@@ -94,11 +136,12 @@ namespace Tapeti.Connection
private readonly Dictionary> dynamicQueues = new Dictionary>();
- protected CustomBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy)
+ protected CustomBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken)
{
ClientFactory = clientFactory;
RoutingKeyStrategy = routingKeyStrategy;
ExchangeStrategy = exchangeStrategy;
+ CancellationToken = cancellationToken;
}
@@ -122,7 +165,7 @@ namespace Tapeti.Connection
var routingKey = RoutingKeyStrategy.GetRoutingKey(messageClass);
var exchange = ExchangeStrategy.GetExchange(messageClass);
- await ClientFactory().DynamicQueueBind(result.QueueName, new QueueBinding(exchange, routingKey));
+ await ClientFactory().DynamicQueueBind(CancellationToken, result.QueueName, new QueueBinding(exchange, routingKey));
return result.QueueName;
}
@@ -139,7 +182,7 @@ namespace Tapeti.Connection
{
// If we don't know the routing key, always create a new queue to ensure there is no overlap.
// Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either.
- return await ClientFactory().DynamicQueueDeclare(queuePrefix);
+ return await ClientFactory().DynamicQueueDeclare(CancellationToken, queuePrefix);
}
@@ -181,7 +224,7 @@ namespace Tapeti.Connection
}
// Declare a new queue
- var queueName = await ClientFactory().DynamicQueueDeclare(queuePrefix);
+ var queueName = await ClientFactory().DynamicQueueDeclare(CancellationToken, queuePrefix);
var queueInfo = new DynamicQueueInfo
{
QueueName = queueName,
@@ -205,7 +248,7 @@ namespace Tapeti.Connection
private readonly HashSet obsoleteDurableQueues = new HashSet();
- public DeclareDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy) : base(clientFactory, routingKeyStrategy, exchangeStrategy)
+ public DeclareDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
{
}
@@ -247,13 +290,13 @@ namespace Tapeti.Connection
public override async Task Apply()
{
- var worker = ClientFactory();
- await DeclareQueues(worker);
- await DeleteObsoleteQueues(worker);
+ var client = ClientFactory();
+ await DeclareQueues(client);
+ await DeleteObsoleteQueues(client);
}
- private async Task DeclareQueues(ITapetiClient worker)
+ private async Task DeclareQueues(ITapetiClient client)
{
await Task.WhenAll(durableQueues.Select(async queue =>
{
@@ -265,16 +308,16 @@ namespace Tapeti.Connection
return new QueueBinding(exchange, routingKey);
});
- await worker.DurableQueueDeclare(queue.Key, bindings);
+ await client.DurableQueueDeclare(CancellationToken, queue.Key, bindings);
}));
}
- private async Task DeleteObsoleteQueues(ITapetiClient worker)
+ private async Task DeleteObsoleteQueues(ITapetiClient client)
{
await Task.WhenAll(obsoleteDurableQueues.Except(durableQueues.Keys).Select(async queue =>
{
- await worker.DurableQueueDelete(queue);
+ await client.DurableQueueDelete(CancellationToken, queue);
}));
}
}
@@ -285,7 +328,7 @@ namespace Tapeti.Connection
private readonly List durableQueues = new List();
- public PassiveDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy) : base(clientFactory, routingKeyStrategy, exchangeStrategy)
+ public PassiveDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
{
}
@@ -310,7 +353,7 @@ namespace Tapeti.Connection
{
if (!durableQueues.Contains(queueName))
{
- await ClientFactory().DurableQueueVerify(queueName);
+ await ClientFactory().DurableQueueVerify(CancellationToken, queueName);
durableQueues.Add(queueName);
}
}
diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs
index abfc92d..e942ae0 100644
--- a/Tapeti/Default/ConsoleLogger.cs
+++ b/Tapeti/Default/ConsoleLogger.cs
@@ -10,21 +10,27 @@ namespace Tapeti.Default
public class ConsoleLogger : ILogger
{
///
- public void Connect(TapetiConnectionParams connectionParams, bool isReconnect)
+ public void Connect(IConnectContext connectContext)
{
- Console.WriteLine($"[Tapeti] {(isReconnect ? "Reconnecting" : "Connecting")} to {connectionParams.HostName}:{connectionParams.Port}{connectionParams.VirtualHost}");
+ Console.WriteLine($"[Tapeti] {(connectContext.IsReconnect ? "Reconnecting" : "Connecting")} to {connectContext.ConnectionParams.HostName}:{connectContext.ConnectionParams.Port}{connectContext.ConnectionParams.VirtualHost}");
}
///
- public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception)
+ public void ConnectFailed(IConnectFailedContext connectContext)
{
- Console.WriteLine($"[Tapeti] Connection failed: {exception}");
+ Console.WriteLine($"[Tapeti] Connection failed: {connectContext.Exception}");
}
///
- public void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect)
+ public void ConnectSuccess(IConnectSuccessContext connectContext)
{
- Console.WriteLine($"[Tapeti] {(isReconnect ? "Reconnected" : "Connected")}");
+ Console.WriteLine($"[Tapeti] {(connectContext.IsReconnect ? "Reconnected" : "Connected")} using local port {connectContext.LocalPort}");
+ }
+
+ ///
+ public void Disconnect(IDisconnectContext disconnectContext)
+ {
+ Console.WriteLine($"[Tapeti] Connection closed: {(!string.IsNullOrEmpty(disconnectContext.ReplyText) ? disconnectContext.ReplyText : "")} (reply code: {disconnectContext.ReplyCode})");
}
///
diff --git a/Tapeti/Default/DevNullLogger.cs b/Tapeti/Default/DevNullLogger.cs
index aae6a27..bbaf911 100644
--- a/Tapeti/Default/DevNullLogger.cs
+++ b/Tapeti/Default/DevNullLogger.cs
@@ -10,17 +10,22 @@ namespace Tapeti.Default
public class DevNullLogger : ILogger
{
///
- public void Connect(TapetiConnectionParams connectionParams, bool isReconnect)
+ public void Connect(IConnectContext connectContext)
{
}
///
- public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception)
+ public void ConnectFailed(IConnectFailedContext connectContext)
{
}
///
- public void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect)
+ public void ConnectSuccess(IConnectSuccessContext connectContext)
+ {
+ }
+
+ ///
+ public void Disconnect(IDisconnectContext disconnectContext)
{
}
diff --git a/Tapeti/IConnection.cs b/Tapeti/IConnection.cs
index 364efd8..eedc765 100644
--- a/Tapeti/IConnection.cs
+++ b/Tapeti/IConnection.cs
@@ -5,6 +5,23 @@ using System.Threading.Tasks;
namespace Tapeti
{
+ ///
+ ///
+ ///
+ public class ConnectedEventArgs
+ {
+ ///
+ /// The connection parameters used to establish the connection.
+ ///
+ public TapetiConnectionParams ConnectionParams;
+
+ ///
+ /// The local port for the connection. Useful for identifying the connection in the management interface.
+ ///
+ public int LocalPort;
+ }
+
+
///
/// Contains information about the reason for a lost connection.
///
@@ -22,6 +39,9 @@ namespace Tapeti
}
+ ///
+ public delegate void ConnectedEventHandler(object sender, ConnectedEventArgs e);
+
///
public delegate void DisconnectedEventHandler(object sender, DisconnectedEventArgs e);
@@ -64,7 +84,7 @@ namespace Tapeti
///
/// Fired when a connection to RabbitMQ has been established.
///
- event EventHandler Connected;
+ event ConnectedEventHandler Connected;
///
/// Fired when the connection to RabbitMQ has been lost.
@@ -74,7 +94,7 @@ namespace Tapeti
///
/// Fired when the connection to RabbitMQ has been recovered after an unexpected disconnect.
///
- event EventHandler Reconnected;
+ event ConnectedEventHandler Reconnected;
}
}
diff --git a/Tapeti/ILogger.cs b/Tapeti/ILogger.cs
index c02e123..0a16ba0 100644
--- a/Tapeti/ILogger.cs
+++ b/Tapeti/ILogger.cs
@@ -5,6 +5,71 @@ using Tapeti.Config;
namespace Tapeti
{
+ ///
+ /// Contains information about the connection being established.
+ ///
+ public interface IConnectContext
+ {
+ ///
+ /// The connection parameters used to establish the connection.
+ ///
+ TapetiConnectionParams ConnectionParams { get; }
+
+ ///
+ /// Indicates whether this is an automatic reconnect or an initial connection.
+ ///
+ bool IsReconnect { get; }
+ }
+
+
+ ///
+ ///
+ /// Contains information about the failed connection.
+ ///
+ public interface IConnectFailedContext : IConnectContext
+ {
+ ///
+ /// The exception that caused the connection to fail.
+ ///
+ Exception Exception { get; }
+ }
+
+
+ ///
+ ///
+ /// Contains information about the established connection.
+ ///
+ public interface IConnectSuccessContext : IConnectContext
+ {
+ ///
+ /// The local port for the connection. Useful for identifying the connection in the management interface.
+ ///
+ int LocalPort { get; }
+ }
+
+
+ ///
+ /// Contains information about the disconnection.
+ ///
+ public interface IDisconnectContext
+ {
+ ///
+ /// The connection parameters used to establish the connection.
+ ///
+ TapetiConnectionParams ConnectionParams { get; }
+
+ ///
+ /// The reply code as provided by RabbitMQ, if the connection was closed by a protocol message.
+ ///
+ ushort ReplyCode { get; }
+
+ ///
+ /// The reply text as provided by RabbitMQ, if the connection was closed by a protocol message.
+ ///
+ string ReplyText { get; }
+ }
+
+
///
/// Handles the logging of various events in Tapeti
///
@@ -17,23 +82,26 @@ namespace Tapeti
///
/// Called before a connection to RabbitMQ is attempted.
///
- ///
- /// Indicates whether this is the initial connection or a reconnect
- void Connect(TapetiConnectionParams connectionParams, bool isReconnect);
+ /// Contains information about the connection being established.
+ void Connect(IConnectContext connectContext);
///
- /// Called when the connection has failed or is lost.
+ /// Called when the connection has failed.
///
- ///
- ///
- void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception);
+ /// Contains information about the connection that has failed.
+ void ConnectFailed(IConnectFailedContext connectContext);
///
/// Called when a connection to RabbitMQ has been succesfully established.
///
- ///
- /// Indicates whether this is the initial connection or a reconnect
- void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect);
+ /// Contains information about the established connection.
+ void ConnectSuccess(IConnectSuccessContext connectContext);
+
+ ///
+ /// Called when the connection to RabbitMQ is lost.
+ ///
+ /// Contains information about the disconnect event.
+ void Disconnect(IDisconnectContext disconnectContext);
///
/// Called when an exception occurs in a consumer.
diff --git a/Tapeti/ISubscriber.cs b/Tapeti/ISubscriber.cs
index 6b54bbf..f1aaafb 100644
--- a/Tapeti/ISubscriber.cs
+++ b/Tapeti/ISubscriber.cs
@@ -1,11 +1,13 @@
-using System.Threading.Tasks;
+using System;
+using System.Threading.Tasks;
namespace Tapeti
{
+ ///
///
/// Manages subscriptions to queues as configured by the bindings.
///
- public interface ISubscriber
+ public interface ISubscriber : IDisposable
{
///
/// Starts consuming from the subscribed queues if not already started.
diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs
index 75ef8c9..ff72c3f 100644
--- a/Tapeti/TapetiConnection.cs
+++ b/Tapeti/TapetiConnection.cs
@@ -47,13 +47,13 @@ namespace Tapeti
}
///
- public event EventHandler Connected;
+ public event ConnectedEventHandler Connected;
///
public event DisconnectedEventHandler Disconnected;
///
- public event EventHandler Reconnected;
+ public event ConnectedEventHandler Reconnected;
///
@@ -98,6 +98,8 @@ namespace Tapeti
public void Dispose()
{
Close().Wait();
+
+ subscriber?.Dispose();
}
@@ -110,9 +112,9 @@ namespace Tapeti
this.owner = owner;
}
- public void Connected()
+ public void Connected(ConnectedEventArgs e)
{
- owner.OnConnected(new EventArgs());
+ owner.OnConnected(e);
}
public void Disconnected(DisconnectedEventArgs e)
@@ -120,9 +122,9 @@ namespace Tapeti
owner.OnDisconnected(e);
}
- public void Reconnected()
+ public void Reconnected(ConnectedEventArgs e)
{
- owner.OnReconnected(new EventArgs());
+ owner.OnReconnected(e);
}
}
@@ -130,7 +132,7 @@ namespace Tapeti
///
/// Called when a connection to RabbitMQ has been established.
///
- protected virtual void OnConnected(EventArgs e)
+ protected virtual void OnConnected(ConnectedEventArgs e)
{
var connectedEvent = Connected;
if (connectedEvent == null)
@@ -142,19 +144,15 @@ namespace Tapeti
///
/// Called when the connection to RabbitMQ has been lost.
///
- protected virtual void OnReconnected(EventArgs e)
+ protected virtual void OnReconnected(ConnectedEventArgs e)
{
var reconnectedEvent = Reconnected;
if (reconnectedEvent == null && subscriber == null)
return;
- Task.Run(async () =>
- {
- if (subscriber != null)
- await subscriber.Reconnect();
+ subscriber?.Reconnect();
- reconnectedEvent?.Invoke(this, e);
- });
+ Task.Run(() => reconnectedEvent?.Invoke(this, e));
}
///
@@ -166,6 +164,8 @@ namespace Tapeti
if (disconnectedEvent == null)
return;
+ subscriber?.Disconnect();
+
Task.Run(() => disconnectedEvent.Invoke(this, e));
}
}