From f1a4ab1c67eb05e9e0307f9c8019a35df34dd430 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 15 Jan 2021 09:57:46 +0100 Subject: [PATCH] Added Stop method to ISubscriber --- Tapeti/Connection/ITapetiClient.cs | 9 +++++-- Tapeti/Connection/TapetiClient.cs | 34 ++++++++++++++++++++------- Tapeti/Connection/TapetiSubscriber.cs | 26 +++++++++++++++++--- Tapeti/ISubscriber.cs | 5 ++++ 4 files changed, 61 insertions(+), 13 deletions(-) diff --git a/Tapeti/Connection/ITapetiClient.cs b/Tapeti/Connection/ITapetiClient.cs index 4add519..4a9e4de 100644 --- a/Tapeti/Connection/ITapetiClient.cs +++ b/Tapeti/Connection/ITapetiClient.cs @@ -77,8 +77,14 @@ namespace Tapeti.Connection /// Cancelled when the connection is lost /// /// The consumer implementation which will receive the messages from the queue - Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer); + /// The consumer tag as returned by BasicConsume. + Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer); + /// + /// Stops the consumer with the specified tag. + /// + /// The consumer tag as returned by Consume. + Task Cancel(string consumerTag); /// /// Creates a durable queue if it does not already exist, and updates the bindings. @@ -118,7 +124,6 @@ namespace Tapeti.Connection /// The binding to add to the dynamic queue Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding); - /// /// Closes the connection to RabbitMQ gracefully. /// diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 1719dbc..71a49d1 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -72,7 +72,6 @@ namespace Tapeti.Connection } - /// public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams) { this.config = config; @@ -184,29 +183,48 @@ namespace Tapeti.Connection /// - public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer) + public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer) { if (deletedQueues.Contains(queueName)) - return; + return null; if (string.IsNullOrEmpty(queueName)) throw new ArgumentNullException(nameof(queueName)); + string consumerTag = null; + await QueueWithRetryableChannel(channel => { if (cancellationToken.IsCancellationRequested) return; var basicConsumer = new TapetiBasicConsumer(consumer, Respond); - channel.BasicConsume(queueName, false, basicConsumer); + consumerTag = channel.BasicConsume(queueName, false, basicConsumer); + }); + + return consumerTag; + } + + + /// + public async Task Cancel(string consumerTag) + { + if (isClosing || string.IsNullOrEmpty(consumerTag)) + return; + + // No need for a retryable channel here, if the connection is lost + // so is the consumer. + await Queue(channel => + { + channel.BasicCancel(consumerTag); }); } private async Task Respond(ulong deliveryTag, ConsumeResult result) { - await taskQueue.Value.Add(() => + await Queue(channel => { // No need for a retryable channel here, if the connection is lost we can't // use the deliveryTag anymore. @@ -214,15 +232,15 @@ namespace Tapeti.Connection { case ConsumeResult.Success: case ConsumeResult.ExternalRequeue: - GetChannel().BasicAck(deliveryTag, false); + channel.BasicAck(deliveryTag, false); break; case ConsumeResult.Error: - GetChannel().BasicNack(deliveryTag, false, false); + channel.BasicNack(deliveryTag, false, false); break; case ConsumeResult.Requeue: - GetChannel().BasicNack(deliveryTag, false, true); + channel.BasicNack(deliveryTag, false, true); break; default: diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 69be2d0..9969ae0 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -13,6 +13,7 @@ namespace Tapeti.Connection private readonly Func clientFactory; private readonly ITapetiConfig config; private bool consuming; + private readonly List consumerTags = new List(); private CancellationTokenSource initializeCancellationTokenSource; @@ -50,6 +51,8 @@ namespace Tapeti.Connection { initializeCancellationTokenSource?.Cancel(); initializeCancellationTokenSource = null; + + consumerTags.Clear(); } @@ -65,6 +68,8 @@ namespace Tapeti.Connection initializeCancellationTokenSource?.Cancel(); initializeCancellationTokenSource = new CancellationTokenSource(); + consumerTags.Clear(); + cancellationToken = initializeCancellationTokenSource.Token; // ReSharper disable once MethodSupportsCancellation @@ -91,6 +96,21 @@ namespace Tapeti.Connection } + /// + public async Task Stop() + { + if (!consuming) + return; + + initializeCancellationTokenSource?.Cancel(); + initializeCancellationTokenSource = null; + + await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag))); + + consumerTags.Clear(); + consuming = false; + } + private async Task ApplyBindings(CancellationToken cancellationToken) { @@ -115,13 +135,13 @@ namespace Tapeti.Connection { var queues = config.Bindings.GroupBy(binding => binding.QueueName); - await Task.WhenAll(queues.Select(async group => + consumerTags.AddRange(await Task.WhenAll(queues.Select(async group => { var queueName = group.Key; var consumer = new TapetiConsumer(config, queueName, group); - await clientFactory().Consume(cancellationToken, queueName, consumer); - })); + return await clientFactory().Consume(cancellationToken, queueName, consumer); + }))); } diff --git a/Tapeti/ISubscriber.cs b/Tapeti/ISubscriber.cs index f1aaafb..3110f65 100644 --- a/Tapeti/ISubscriber.cs +++ b/Tapeti/ISubscriber.cs @@ -13,5 +13,10 @@ namespace Tapeti /// Starts consuming from the subscribed queues if not already started. /// Task Resume(); + + /// + /// Stops consuming from the subscribed queues. + /// + Task Stop(); } }