diff --git a/Tapeti/Connection/ITapetiClient.cs b/Tapeti/Connection/ITapetiClient.cs
index e8c7654..e71c3fc 100644
--- a/Tapeti/Connection/ITapetiClient.cs
+++ b/Tapeti/Connection/ITapetiClient.cs
@@ -78,13 +78,13 @@ namespace Tapeti.Connection
///
/// The consumer implementation which will receive the messages from the queue
/// The consumer tag as returned by BasicConsume.
- Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
+ Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
///
/// Stops the consumer with the specified tag.
///
/// The consumer tag as returned by Consume.
- Task Cancel(string consumerTag);
+ Task Cancel(TapetiConsumerTag consumerTag);
///
/// Creates a durable queue if it does not already exist, and updates the bindings.
@@ -129,4 +129,31 @@ namespace Tapeti.Connection
///
Task Close();
}
+
+
+ ///
+ /// Represents a consumer for a specific connection.
+ ///
+ public class TapetiConsumerTag
+ {
+ ///
+ /// The consumer tag as determined by the AMQP protocol.
+ ///
+ public string ConsumerTag { get; }
+
+ ///
+ /// An internal reference to the connection on which the consume was started.
+ ///
+ public long ConnectionReference { get;}
+
+
+ ///
+ /// Creates a new instance of the TapetiConsumerTag class.
+ ///
+ public TapetiConsumerTag(long connectionReference, string consumerTag)
+ {
+ ConnectionReference = connectionReference;
+ ConsumerTag = consumerTag;
+ }
+ }
}
\ No newline at end of file
diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs
index 3e9934f..8d1d222 100644
--- a/Tapeti/Connection/TapetiBasicConsumer.cs
+++ b/Tapeti/Connection/TapetiBasicConsumer.cs
@@ -5,6 +5,9 @@ using Tapeti.Default;
namespace Tapeti.Connection
{
+ public delegate Task ResponseFunc(long expectedConnectionReference, ulong deliveryTag, ConsumeResult result);
+
+
///
///
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
@@ -12,13 +15,15 @@ namespace Tapeti.Connection
internal class TapetiBasicConsumer : DefaultBasicConsumer
{
private readonly IConsumer consumer;
- private readonly Func onRespond;
+ private readonly long connectionReference;
+ private readonly ResponseFunc onRespond;
///
- public TapetiBasicConsumer(IConsumer consumer, Func onRespond)
+ public TapetiBasicConsumer(IConsumer consumer, long connectionReference, ResponseFunc onRespond)
{
this.consumer = consumer;
+ this.connectionReference = connectionReference;
this.onRespond = onRespond;
}
@@ -45,11 +50,11 @@ namespace Tapeti.Connection
try
{
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
- await onRespond(deliveryTag, response);
+ await onRespond(connectionReference, deliveryTag, response);
}
catch
{
- await onRespond(deliveryTag, ConsumeResult.Error);
+ await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
}
});
}
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index 15ddffb..fba46b8 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -51,6 +51,7 @@ namespace Tapeti.Connection
// These fields must be locked using connectionLock
private readonly object connectionLock = new();
+ private long connectionReference;
private RabbitMQ.Client.IConnection connection;
private IModel consumeChannelModel;
private IModel publishChannelModel;
@@ -200,7 +201,7 @@ namespace Tapeti.Connection
///
- public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
+ public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
{
if (deletedQueues.Contains(queueName))
return null;
@@ -209,6 +210,7 @@ namespace Tapeti.Connection
throw new ArgumentNullException(nameof(queueName));
+ long capturedConnectionReference = -1;
string consumerTag = null;
await GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(channel =>
@@ -216,33 +218,52 @@ namespace Tapeti.Connection
if (cancellationToken.IsCancellationRequested)
return;
- var basicConsumer = new TapetiBasicConsumer(consumer, Respond);
+ capturedConnectionReference = Interlocked.Read(ref connectionReference);
+ var basicConsumer = new TapetiBasicConsumer(consumer, capturedConnectionReference, Respond);
consumerTag = channel.BasicConsume(queueName, false, basicConsumer);
});
- return consumerTag;
+ return new TapetiConsumerTag(capturedConnectionReference, consumerTag);
}
///
- public async Task Cancel(string consumerTag)
+ public async Task Cancel(TapetiConsumerTag consumerTag)
{
- if (isClosing || string.IsNullOrEmpty(consumerTag))
+ if (isClosing || string.IsNullOrEmpty(consumerTag.ConsumerTag))
+ return;
+
+ var capturedConnectionReference = Interlocked.Read(ref connectionReference);
+
+ // If the connection was re-established in the meantime, don't respond with an
+ // invalid deliveryTag. The message will be requeued.
+ if (capturedConnectionReference != consumerTag.ConnectionReference)
return;
// No need for a retryable channel here, if the connection is lost
// so is the consumer.
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
{
- channel.BasicCancel(consumerTag);
+ // Check again as a reconnect may have occured in the meantime
+ var currentConnectionReference = Interlocked.Read(ref connectionReference);
+ if (currentConnectionReference != consumerTag.ConnectionReference)
+ return;
+
+ channel.BasicCancel(consumerTag.ConsumerTag);
});
}
- private async Task Respond(ulong deliveryTag, ConsumeResult result)
+ private async Task Respond(long expectedConnectionReference, ulong deliveryTag, ConsumeResult result)
{
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
{
+ // If the connection was re-established in the meantime, don't respond with an
+ // invalid deliveryTag. The message will be requeued.
+ var currentConnectionReference = Interlocked.Read(ref connectionReference);
+ if (currentConnectionReference != connectionReference)
+ return;
+
// No need for a retryable channel here, if the connection is lost we can't
// use the deliveryTag anymore.
switch (result)
@@ -487,8 +508,8 @@ namespace Tapeti.Connection
await publishChannel.Reset();
// No need to close the channels as the connection will be closed
- capturedConsumeModel.Dispose();
- capturedPublishModel.Dispose();
+ capturedConsumeModel?.Dispose();
+ capturedPublishModel?.Dispose();
// ReSharper disable once InvertIf
if (capturedConnection != null)
@@ -695,56 +716,76 @@ namespace Tapeti.Connection
if (channel != null && channel.IsOpen)
return channel;
+ }
- // If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ
- // not related to the connection), wait for a bit to avoid spamming the connection
- if ((DateTime.UtcNow - connectedDateTime).TotalMilliseconds <= MinimumConnectedReconnectDelay)
- Thread.Sleep(ReconnectDelay);
+ // If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ
+ // not related to the connection), wait for a bit to avoid spamming the connection
+ if ((DateTime.UtcNow - connectedDateTime).TotalMilliseconds <= MinimumConnectedReconnectDelay)
+ Thread.Sleep(ReconnectDelay);
- var connectionFactory = new ConnectionFactory
+ var connectionFactory = new ConnectionFactory
+ {
+ HostName = connectionParams.HostName,
+ Port = connectionParams.Port,
+ VirtualHost = connectionParams.VirtualHost,
+ UserName = connectionParams.Username,
+ Password = connectionParams.Password,
+ AutomaticRecoveryEnabled = false,
+ TopologyRecoveryEnabled = false,
+ RequestedHeartbeat = TimeSpan.FromSeconds(30)
+ };
+
+ if (connectionParams.ClientProperties != null)
+ foreach (var pair in connectionParams.ClientProperties)
{
- HostName = connectionParams.HostName,
- Port = connectionParams.Port,
- VirtualHost = connectionParams.VirtualHost,
- UserName = connectionParams.Username,
- Password = connectionParams.Password,
- AutomaticRecoveryEnabled = false,
- TopologyRecoveryEnabled = false,
- RequestedHeartbeat = TimeSpan.FromSeconds(30)
- };
+ if (connectionFactory.ClientProperties.ContainsKey(pair.Key))
+ connectionFactory.ClientProperties[pair.Key] = Encoding.UTF8.GetBytes(pair.Value);
+ else
+ connectionFactory.ClientProperties.Add(pair.Key, Encoding.UTF8.GetBytes(pair.Value));
+ }
- if (connectionParams.ClientProperties != null)
- foreach (var pair in connectionParams.ClientProperties)
+
+ while (true)
+ {
+ try
+ {
+ RabbitMQ.Client.IConnection capturedConnection;
+ IModel capturedConsumeChannelModel;
+ IModel capturedPublishChannelModel;
+
+
+ lock (connectionLock)
{
- if (connectionFactory.ClientProperties.ContainsKey(pair.Key))
- connectionFactory.ClientProperties[pair.Key] = Encoding.UTF8.GetBytes(pair.Value);
- else
- connectionFactory.ClientProperties.Add(pair.Key, Encoding.UTF8.GetBytes(pair.Value));
+ capturedConnection = connection;
}
-
- while (true)
- {
- try
+ if (capturedConnection != null)
{
- if (connection != null)
+ try
{
- try
- {
+ if (connection.IsOpen)
connection.Close();
- }
- finally
- {
- connection.Dispose();
- }
-
- connection = null;
+ }
+ catch (AlreadyClosedException)
+ {
+ }
+ finally
+ {
+ connection.Dispose();
}
- logger.Connect(new ConnectContext(connectionParams, isReconnect));
+ connection = null;
+ }
+ logger.Connect(new ConnectContext(connectionParams, isReconnect));
+ Interlocked.Increment(ref connectionReference);
+
+ lock (connectionLock)
+ {
connection = connectionFactory.CreateConnection();
+ capturedConnection = connection;
+
consumeChannelModel = connection.CreateModel();
if (consumeChannel == null)
throw new BrokerUnreachableException(null);
@@ -753,98 +794,102 @@ namespace Tapeti.Connection
if (publishChannel == null)
throw new BrokerUnreachableException(null);
+ capturedConsumeChannelModel = consumeChannelModel;
+ capturedPublishChannelModel = publishChannelModel;
+ }
- if (config.Features.PublisherConfirms)
+
+ if (config.Features.PublisherConfirms)
+ {
+ lastDeliveryTag = 0;
+
+ Monitor.Enter(confirmLock);
+ try
{
- lastDeliveryTag = 0;
+ foreach (var pair in confirmMessages)
+ pair.Value.CompletionSource.SetCanceled();
- Monitor.Enter(confirmLock);
- try
- {
- foreach (var pair in confirmMessages)
- pair.Value.CompletionSource.SetCanceled();
-
- confirmMessages.Clear();
- }
- finally
- {
- Monitor.Exit(confirmLock);
- }
-
- publishChannelModel.ConfirmSelect();
+ confirmMessages.Clear();
+ }
+ finally
+ {
+ Monitor.Exit(confirmLock);
}
- if (connectionParams.PrefetchCount > 0)
- consumeChannelModel.BasicQos(0, connectionParams.PrefetchCount, false);
-
- var capturedConsumeChannelModel = consumeChannelModel;
- consumeChannelModel.ModelShutdown += (_, e) =>
- {
- lock (connectionLock)
- {
- if (consumeChannelModel == null || consumeChannelModel != capturedConsumeChannelModel)
- return;
-
- consumeChannelModel = null;
- }
-
- ConnectionEventListener?.Disconnected(new DisconnectedEventArgs
- {
- ReplyCode = e.ReplyCode,
- ReplyText = e.ReplyText
- });
-
- logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText));
-
- // Reconnect if the disconnect was unexpected
- if (!isClosing)
- GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(_ => { });
- };
-
- var capturedPublishChannelModel = publishChannelModel;
- publishChannelModel.ModelShutdown += (_, _) =>
- {
- lock (connectionLock)
- {
- if (publishChannelModel == null || publishChannelModel != capturedPublishChannelModel)
- return;
-
- publishChannelModel = null;
- }
-
- // No need to reconnect, the next Publish will
- };
-
-
- publishChannelModel.BasicReturn += HandleBasicReturn;
- publishChannelModel.BasicAcks += HandleBasicAck;
- publishChannelModel.BasicNacks += HandleBasicNack;
-
- connectedDateTime = DateTime.UtcNow;
-
- var connectedEventArgs = new ConnectedEventArgs
- {
- ConnectionParams = connectionParams,
- LocalPort = connection.LocalPort
- };
-
- if (isReconnect)
- ConnectionEventListener?.Reconnected(connectedEventArgs);
- else
- ConnectionEventListener?.Connected(connectedEventArgs);
-
- logger.ConnectSuccess(new ConnectContext(connectionParams, isReconnect, connection.LocalPort));
- isReconnect = true;
-
- break;
+ capturedPublishChannelModel.ConfirmSelect();
}
- catch (BrokerUnreachableException e)
+
+ if (connectionParams.PrefetchCount > 0)
+ capturedPublishChannelModel.BasicQos(0, connectionParams.PrefetchCount, false);
+
+ capturedPublishChannelModel.ModelShutdown += (_, e) =>
{
- logger.ConnectFailed(new ConnectContext(connectionParams, isReconnect, exception: e));
- Thread.Sleep(ReconnectDelay);
- }
+ lock (connectionLock)
+ {
+ if (consumeChannelModel == null || consumeChannelModel != capturedConsumeChannelModel)
+ return;
+
+ consumeChannelModel = null;
+ }
+
+ ConnectionEventListener?.Disconnected(new DisconnectedEventArgs
+ {
+ ReplyCode = e.ReplyCode,
+ ReplyText = e.ReplyText
+ });
+
+ logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText));
+
+ // Reconnect if the disconnect was unexpected
+ if (!isClosing)
+ GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(_ => { });
+ };
+
+ capturedPublishChannelModel.ModelShutdown += (_, _) =>
+ {
+ lock (connectionLock)
+ {
+ if (publishChannelModel == null || publishChannelModel != capturedPublishChannelModel)
+ return;
+
+ publishChannelModel = null;
+ }
+
+ // No need to reconnect, the next Publish will
+ };
+
+
+ capturedPublishChannelModel.BasicReturn += HandleBasicReturn;
+ capturedPublishChannelModel.BasicAcks += HandleBasicAck;
+ capturedPublishChannelModel.BasicNacks += HandleBasicNack;
+
+ connectedDateTime = DateTime.UtcNow;
+
+ var connectedEventArgs = new ConnectedEventArgs
+ {
+ ConnectionParams = connectionParams,
+ LocalPort = capturedConnection.LocalPort
+ };
+
+ if (isReconnect)
+ ConnectionEventListener?.Reconnected(connectedEventArgs);
+ else
+ ConnectionEventListener?.Connected(connectedEventArgs);
+
+ logger.ConnectSuccess(new ConnectContext(connectionParams, isReconnect, capturedConnection.LocalPort));
+ isReconnect = true;
+
+ break;
}
-
+ catch (BrokerUnreachableException e)
+ {
+ logger.ConnectFailed(new ConnectContext(connectionParams, isReconnect, exception: e));
+ Thread.Sleep(ReconnectDelay);
+ }
+ }
+
+ lock (connectionLock)
+ {
return channelType == TapetiChannelType.Publish
? publishChannelModel
: consumeChannelModel;
diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs
index 89c00da..3b92262 100644
--- a/Tapeti/Connection/TapetiSubscriber.cs
+++ b/Tapeti/Connection/TapetiSubscriber.cs
@@ -1,4 +1,4 @@
-using System;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@@ -13,7 +13,7 @@ namespace Tapeti.Connection
private readonly Func clientFactory;
private readonly ITapetiConfig config;
private bool consuming;
- private readonly List consumerTags = new();
+ private readonly List consumerTags = new();
private CancellationTokenSource initializeCancellationTokenSource;