diff --git a/Tapeti/Connection/ITapetiClient.cs b/Tapeti/Connection/ITapetiClient.cs
index 4add519..4a9e4de 100644
--- a/Tapeti/Connection/ITapetiClient.cs
+++ b/Tapeti/Connection/ITapetiClient.cs
@@ -77,8 +77,14 @@ namespace Tapeti.Connection
/// Cancelled when the connection is lost
///
/// The consumer implementation which will receive the messages from the queue
- Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
+ /// The consumer tag as returned by BasicConsume.
+ Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
+ ///
+ /// Stops the consumer with the specified tag.
+ ///
+ /// The consumer tag as returned by Consume.
+ Task Cancel(string consumerTag);
///
/// Creates a durable queue if it does not already exist, and updates the bindings.
@@ -118,7 +124,6 @@ namespace Tapeti.Connection
/// The binding to add to the dynamic queue
Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding);
-
///
/// Closes the connection to RabbitMQ gracefully.
///
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index 1719dbc..71a49d1 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -72,7 +72,6 @@ namespace Tapeti.Connection
}
- ///
public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams)
{
this.config = config;
@@ -184,29 +183,48 @@ namespace Tapeti.Connection
///
- public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
+ public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
{
if (deletedQueues.Contains(queueName))
- return;
+ return null;
if (string.IsNullOrEmpty(queueName))
throw new ArgumentNullException(nameof(queueName));
+ string consumerTag = null;
+
await QueueWithRetryableChannel(channel =>
{
if (cancellationToken.IsCancellationRequested)
return;
var basicConsumer = new TapetiBasicConsumer(consumer, Respond);
- channel.BasicConsume(queueName, false, basicConsumer);
+ consumerTag = channel.BasicConsume(queueName, false, basicConsumer);
+ });
+
+ return consumerTag;
+ }
+
+
+ ///
+ public async Task Cancel(string consumerTag)
+ {
+ if (isClosing || string.IsNullOrEmpty(consumerTag))
+ return;
+
+ // No need for a retryable channel here, if the connection is lost
+ // so is the consumer.
+ await Queue(channel =>
+ {
+ channel.BasicCancel(consumerTag);
});
}
private async Task Respond(ulong deliveryTag, ConsumeResult result)
{
- await taskQueue.Value.Add(() =>
+ await Queue(channel =>
{
// No need for a retryable channel here, if the connection is lost we can't
// use the deliveryTag anymore.
@@ -214,15 +232,15 @@ namespace Tapeti.Connection
{
case ConsumeResult.Success:
case ConsumeResult.ExternalRequeue:
- GetChannel().BasicAck(deliveryTag, false);
+ channel.BasicAck(deliveryTag, false);
break;
case ConsumeResult.Error:
- GetChannel().BasicNack(deliveryTag, false, false);
+ channel.BasicNack(deliveryTag, false, false);
break;
case ConsumeResult.Requeue:
- GetChannel().BasicNack(deliveryTag, false, true);
+ channel.BasicNack(deliveryTag, false, true);
break;
default:
diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs
index 69be2d0..9969ae0 100644
--- a/Tapeti/Connection/TapetiSubscriber.cs
+++ b/Tapeti/Connection/TapetiSubscriber.cs
@@ -13,6 +13,7 @@ namespace Tapeti.Connection
private readonly Func clientFactory;
private readonly ITapetiConfig config;
private bool consuming;
+ private readonly List consumerTags = new List();
private CancellationTokenSource initializeCancellationTokenSource;
@@ -50,6 +51,8 @@ namespace Tapeti.Connection
{
initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = null;
+
+ consumerTags.Clear();
}
@@ -65,6 +68,8 @@ namespace Tapeti.Connection
initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = new CancellationTokenSource();
+ consumerTags.Clear();
+
cancellationToken = initializeCancellationTokenSource.Token;
// ReSharper disable once MethodSupportsCancellation
@@ -91,6 +96,21 @@ namespace Tapeti.Connection
}
+ ///
+ public async Task Stop()
+ {
+ if (!consuming)
+ return;
+
+ initializeCancellationTokenSource?.Cancel();
+ initializeCancellationTokenSource = null;
+
+ await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag)));
+
+ consumerTags.Clear();
+ consuming = false;
+ }
+
private async Task ApplyBindings(CancellationToken cancellationToken)
{
@@ -115,13 +135,13 @@ namespace Tapeti.Connection
{
var queues = config.Bindings.GroupBy(binding => binding.QueueName);
- await Task.WhenAll(queues.Select(async group =>
+ consumerTags.AddRange(await Task.WhenAll(queues.Select(async group =>
{
var queueName = group.Key;
var consumer = new TapetiConsumer(config, queueName, group);
- await clientFactory().Consume(cancellationToken, queueName, consumer);
- }));
+ return await clientFactory().Consume(cancellationToken, queueName, consumer);
+ })));
}
diff --git a/Tapeti/ISubscriber.cs b/Tapeti/ISubscriber.cs
index f1aaafb..3110f65 100644
--- a/Tapeti/ISubscriber.cs
+++ b/Tapeti/ISubscriber.cs
@@ -13,5 +13,10 @@ namespace Tapeti
/// Starts consuming from the subscribed queues if not already started.
///
Task Resume();
+
+ ///
+ /// Stops consuming from the subscribed queues.
+ ///
+ Task Stop();
}
}