From 6d7e7cda922c6d2d1b48b1f8dbea7110352fa989 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 24 Jun 2020 13:24:33 +0200 Subject: [PATCH 1/4] Added bare minimum import example file to documentation --- Tapeti.Cmd/Program.cs | 2 +- docs/tapeticmd.rst | 25 ++++++++++++++++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index 02fec70..879e1af 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -74,7 +74,7 @@ namespace Tapeti.Cmd [Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] public string InputMessage { get; set; } - [Option('c', "pipe", Group = "Input", HelpText = "Messages are read from STDIN, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] + [Option('c', "pipe", Group = "Input", HelpText = "Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] public bool InputPipe { get; set; } [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst index 6a81c14..ee0b366 100644 --- a/docs/tapeticmd.rst +++ b/docs/tapeticmd.rst @@ -138,7 +138,7 @@ SingleFileJSON '''''''''''''' The default serialization method. All messages are contained in a single file, where each line is a JSON document describing the message properties and it's content. -An example message (formatted as multi-line to be more readable, but keep in mind that it must be a single line in the export file to be imported properly): +An example message (formatted as multi-line to be more readable, but keep in mind that it **must be a single line** in the export file to be imported properly): :: @@ -176,6 +176,29 @@ The properties correspond to the RabbitMQ client's IBasicProperties and can be o Either Body or RawBody is present. Body is used if the ContentType is set to application/json, and will contain the original message as an inline JSON object for easy manipulation. For other content types, the RawBody contains the original encoded body. +Below is a bare minimum example, assuming Tapeti style messages and the default direct-to-queue import (no --exchange parameter). Again, keep in mind that it **must be a single line** in the export file to be imported properly. + +:: + + { + "Queue": "tapeti.example.01", + "Properties": { + "ContentType": "application/json", + "Headers": { + "classType": "Messaging.TapetiExample.QuoteRequestMessage:Messaging.TapetiExample" + } + }, + "Body": { + "Amount": 2 + } + } + +Actual file contents will thus look like: + +:: + + { "Queue": "tapeti.example.01", "Properties": { "ContentType": "application/json", "Headers": { "classType": "Messaging.TapetiExample.QuoteRequestMessage:Messaging.TapetiExample" } }, "Body": { "Amount": 2 } } + EasyNetQHosepipe '''''''''''''''' From e31d638a11724ac4bcebe28bec396084f515d76a Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 3 Jul 2020 15:51:41 +0200 Subject: [PATCH 2/4] Implemented a rate limiting option for Tapeti.Cmd Moved example 06 to the "examples" sub-folder --- .../06-StatelessRequestResponse.csproj | 20 ---- .../06-StatelessRequestResponse.csproj | 20 ++++ .../ExampleMessageController.cs | 56 +++++----- .../06-StatelessRequestResponse}/Program.cs | 102 +++++++++--------- .../ReceivingMessageController.cs | 76 ++++++------- Tapeti.Cmd/Commands/ImportCommand.cs | 14 ++- Tapeti.Cmd/Commands/ShovelCommand.cs | 18 ++-- Tapeti.Cmd/Program.cs | 23 +++- Tapeti.Cmd/RateLimiter/IRateLimiter.cs | 9 ++ Tapeti.Cmd/RateLimiter/NoRateLimiter.cs | 12 +++ Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs | 30 ++++++ Tapeti.Cmd/build-release.bat | 1 + Tapeti.sln | 2 +- docs/tapeticmd.rst | 6 ++ 14 files changed, 234 insertions(+), 155 deletions(-) delete mode 100644 06-StatelessRequestResponse/06-StatelessRequestResponse.csproj create mode 100644 Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj rename {06-StatelessRequestResponse => Examples/06-StatelessRequestResponse}/ExampleMessageController.cs (95%) rename {06-StatelessRequestResponse => Examples/06-StatelessRequestResponse}/Program.cs (96%) rename {06-StatelessRequestResponse => Examples/06-StatelessRequestResponse}/ReceivingMessageController.cs (96%) create mode 100644 Tapeti.Cmd/RateLimiter/IRateLimiter.cs create mode 100644 Tapeti.Cmd/RateLimiter/NoRateLimiter.cs create mode 100644 Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs diff --git a/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj b/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj deleted file mode 100644 index 7efcb1a..0000000 --- a/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj +++ /dev/null @@ -1,20 +0,0 @@ - - - - Exe - netcoreapp2.1 - _06_StatelessRequestResponse - - - - - - - - - - - - - - diff --git a/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj b/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj new file mode 100644 index 0000000..7d6c898 --- /dev/null +++ b/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj @@ -0,0 +1,20 @@ + + + + Exe + netcoreapp2.1 + _06_StatelessRequestResponse + + + + + + + + + + + + + + diff --git a/06-StatelessRequestResponse/ExampleMessageController.cs b/Examples/06-StatelessRequestResponse/ExampleMessageController.cs similarity index 95% rename from 06-StatelessRequestResponse/ExampleMessageController.cs rename to Examples/06-StatelessRequestResponse/ExampleMessageController.cs index fe01f2e..bc908ab 100644 --- a/06-StatelessRequestResponse/ExampleMessageController.cs +++ b/Examples/06-StatelessRequestResponse/ExampleMessageController.cs @@ -1,28 +1,28 @@ -using System; -using ExampleLib; -using Messaging.TapetiExample; -using Tapeti.Annotations; - -namespace _06_StatelessRequestResponse -{ - [MessageController] - [DynamicQueue("tapeti.example.06")] - public class ExampleMessageController - { - private readonly IExampleState exampleState; - - - public ExampleMessageController(IExampleState exampleState) - { - this.exampleState = exampleState; - } - - - [ResponseHandler] - public void HandleQuoteResponse(QuoteResponseMessage message) - { - Console.WriteLine("Received response: " + message.Quote); - exampleState.Done(); - } - } -} +using System; +using ExampleLib; +using Messaging.TapetiExample; +using Tapeti.Annotations; + +namespace _06_StatelessRequestResponse +{ + [MessageController] + [DynamicQueue("tapeti.example.06")] + public class ExampleMessageController + { + private readonly IExampleState exampleState; + + + public ExampleMessageController(IExampleState exampleState) + { + this.exampleState = exampleState; + } + + + [ResponseHandler] + public void HandleQuoteResponse(QuoteResponseMessage message) + { + Console.WriteLine("Received response: " + message.Quote); + exampleState.Done(); + } + } +} diff --git a/06-StatelessRequestResponse/Program.cs b/Examples/06-StatelessRequestResponse/Program.cs similarity index 96% rename from 06-StatelessRequestResponse/Program.cs rename to Examples/06-StatelessRequestResponse/Program.cs index 615cb9b..6cc0de3 100644 --- a/06-StatelessRequestResponse/Program.cs +++ b/Examples/06-StatelessRequestResponse/Program.cs @@ -1,51 +1,51 @@ -using System; -using System.Threading.Tasks; -using ExampleLib; -using Messaging.TapetiExample; -using SimpleInjector; -using Tapeti; -using Tapeti.DataAnnotations; -using Tapeti.Default; -using Tapeti.SimpleInjector; - -namespace _06_StatelessRequestResponse -{ - public class Program - { - public static void Main(string[] args) - { - var container = new Container(); - var dependencyResolver = new SimpleInjectorDependencyResolver(container); - - container.Register(); - - var helper = new ExampleConsoleApp(dependencyResolver); - helper.Run(MainAsync); - } - - - internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone) - { - var config = new TapetiConfig(dependencyResolver) - .WithDataAnnotations() - .RegisterAllControllers() - .Build(); - - - using (var connection = new TapetiConnection(config)) - { - await connection.Subscribe(); - - var publisher = dependencyResolver.Resolve(); - await publisher.PublishRequest( - new QuoteRequestMessage - { - Amount = 1 - }, - c => c.HandleQuoteResponse); - - await waitForDone(); - } - } - } -} +using System; +using System.Threading.Tasks; +using ExampleLib; +using Messaging.TapetiExample; +using SimpleInjector; +using Tapeti; +using Tapeti.DataAnnotations; +using Tapeti.Default; +using Tapeti.SimpleInjector; + +namespace _06_StatelessRequestResponse +{ + public class Program + { + public static void Main(string[] args) + { + var container = new Container(); + var dependencyResolver = new SimpleInjectorDependencyResolver(container); + + container.Register(); + + var helper = new ExampleConsoleApp(dependencyResolver); + helper.Run(MainAsync); + } + + + internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone) + { + var config = new TapetiConfig(dependencyResolver) + .WithDataAnnotations() + .RegisterAllControllers() + .Build(); + + + using (var connection = new TapetiConnection(config)) + { + await connection.Subscribe(); + + var publisher = dependencyResolver.Resolve(); + await publisher.PublishRequest( + new QuoteRequestMessage + { + Amount = 1 + }, + c => c.HandleQuoteResponse); + + await waitForDone(); + } + } + } +} diff --git a/06-StatelessRequestResponse/ReceivingMessageController.cs b/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs similarity index 96% rename from 06-StatelessRequestResponse/ReceivingMessageController.cs rename to Examples/06-StatelessRequestResponse/ReceivingMessageController.cs index 6684780..ea947c8 100644 --- a/06-StatelessRequestResponse/ReceivingMessageController.cs +++ b/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs @@ -1,38 +1,38 @@ -using Messaging.TapetiExample; -using Tapeti.Annotations; - -namespace _06_StatelessRequestResponse -{ - [MessageController] - [DynamicQueue("tapeti.example.06.receiver")] - public class ReceivingMessageController - { - // No publisher required, responses can simply be returned - public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message) - { - string quote; - - switch (message.Amount) - { - case 1: - // Well, they asked for it... :-) - quote = "'"; - break; - - case 2: - quote = "\""; - break; - - default: - // We have to return a response. - quote = null; - break; - } - - return new QuoteResponseMessage - { - Quote = quote - }; - } - } -} +using Messaging.TapetiExample; +using Tapeti.Annotations; + +namespace _06_StatelessRequestResponse +{ + [MessageController] + [DynamicQueue("tapeti.example.06.receiver")] + public class ReceivingMessageController + { + // No publisher required, responses can simply be returned + public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message) + { + string quote; + + switch (message.Amount) + { + case 1: + // Well, they asked for it... :-) + quote = "'"; + break; + + case 2: + quote = "\""; + break; + + default: + // We have to return a response. + quote = null; + break; + } + + return new QuoteResponseMessage + { + Quote = quote + }; + } + } +} diff --git a/Tapeti.Cmd/Commands/ImportCommand.cs b/Tapeti.Cmd/Commands/ImportCommand.cs index ccdf308..a981051 100644 --- a/Tapeti.Cmd/Commands/ImportCommand.cs +++ b/Tapeti.Cmd/Commands/ImportCommand.cs @@ -1,4 +1,5 @@ using RabbitMQ.Client; +using Tapeti.Cmd.RateLimiter; using Tapeti.Cmd.Serialization; namespace Tapeti.Cmd.Commands @@ -10,17 +11,20 @@ namespace Tapeti.Cmd.Commands public bool DirectToQueue { get; set; } - public int Execute(IModel channel) + public int Execute(IModel channel, IRateLimiter rateLimiter) { var messageCount = 0; foreach (var message in MessageSerializer.Deserialize()) { - var exchange = DirectToQueue ? "" : message.Exchange; - var routingKey = DirectToQueue ? message.Queue : message.RoutingKey; + rateLimiter.Execute(() => + { + var exchange = DirectToQueue ? "" : message.Exchange; + var routingKey = DirectToQueue ? message.Queue : message.RoutingKey; - channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); - messageCount++; + channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); + messageCount++; + }); } return messageCount; diff --git a/Tapeti.Cmd/Commands/ShovelCommand.cs b/Tapeti.Cmd/Commands/ShovelCommand.cs index 9b42a3a..34f11d2 100644 --- a/Tapeti.Cmd/Commands/ShovelCommand.cs +++ b/Tapeti.Cmd/Commands/ShovelCommand.cs @@ -1,5 +1,6 @@ using RabbitMQ.Client; - +using Tapeti.Cmd.RateLimiter; + namespace Tapeti.Cmd.Commands { public class ShovelCommand @@ -10,7 +11,7 @@ namespace Tapeti.Cmd.Commands public int? MaxCount { get; set; } - public int Execute(IModel sourceChannel, IModel targetChannel) + public int Execute(IModel sourceChannel, IModel targetChannel, IRateLimiter rateLimiter) { var messageCount = 0; @@ -22,13 +23,14 @@ namespace Tapeti.Cmd.Commands break; - targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body); + rateLimiter.Execute(() => + { + targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body); + messageCount++; - - messageCount++; - - if (RemoveMessages) - sourceChannel.BasicAck(result.DeliveryTag, false); + if (RemoveMessages) + sourceChannel.BasicAck(result.DeliveryTag, false); + }); } return messageCount; diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index 879e1af..f1bd5fe 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -4,10 +4,10 @@ using System.Diagnostics; using System.IO; using System.Text; using CommandLine; -using CommandLine.Text; using RabbitMQ.Client; using RabbitMQ.Client.Framing; using Tapeti.Cmd.Commands; +using Tapeti.Cmd.RateLimiter; using Tapeti.Cmd.Serialization; namespace Tapeti.Cmd @@ -79,6 +79,9 @@ namespace Tapeti.Cmd [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] public bool PublishToExchange { get; set; } + + [Option("maxrate", HelpText = "The maximum amount of messages per second to import.")] + public int? MaxRate { get; set; } } @@ -111,6 +114,9 @@ namespace Tapeti.Cmd [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] public string TargetPassword { get; set; } + + [Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")] + public int? MaxRate { get; set; } } @@ -236,6 +242,15 @@ namespace Tapeti.Cmd } + private static IRateLimiter GetRateLimiter(int? maxRate) + { + if (maxRate.GetValueOrDefault() <= 0) + return new NoRateLimiter(); + + return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1)); + } + + private static void RunExport(ExportOptions options) { int messageCount; @@ -271,7 +286,7 @@ namespace Tapeti.Cmd MessageSerializer = messageSerializer, DirectToQueue = !options.PublishToExchange - }.Execute(channel); + }.Execute(channel, GetRateLimiter(options.MaxRate)); } Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); @@ -299,11 +314,11 @@ namespace Tapeti.Cmd using (var targetConnection = GetTargetConnection(options)) using (var targetChannel = targetConnection.CreateModel()) { - messageCount = shovelCommand.Execute(sourceChannel, targetChannel); + messageCount = shovelCommand.Execute(sourceChannel, targetChannel, GetRateLimiter(options.MaxRate)); } } else - messageCount = shovelCommand.Execute(sourceChannel, sourceChannel); + messageCount = shovelCommand.Execute(sourceChannel, sourceChannel, GetRateLimiter(options.MaxRate)); } Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled."); diff --git a/Tapeti.Cmd/RateLimiter/IRateLimiter.cs b/Tapeti.Cmd/RateLimiter/IRateLimiter.cs new file mode 100644 index 0000000..6357d2b --- /dev/null +++ b/Tapeti.Cmd/RateLimiter/IRateLimiter.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Cmd.RateLimiter +{ + public interface IRateLimiter + { + void Execute(Action action); + } +} diff --git a/Tapeti.Cmd/RateLimiter/NoRateLimiter.cs b/Tapeti.Cmd/RateLimiter/NoRateLimiter.cs new file mode 100644 index 0000000..2cdce24 --- /dev/null +++ b/Tapeti.Cmd/RateLimiter/NoRateLimiter.cs @@ -0,0 +1,12 @@ +using System; + +namespace Tapeti.Cmd.RateLimiter +{ + public class NoRateLimiter : IRateLimiter + { + public void Execute(Action action) + { + action(); + } + } +} diff --git a/Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs b/Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs new file mode 100644 index 0000000..d03ad6c --- /dev/null +++ b/Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs @@ -0,0 +1,30 @@ +using System; +using System.Threading; + +namespace Tapeti.Cmd.RateLimiter +{ + public class SpreadRateLimiter : IRateLimiter + { + private readonly TimeSpan delay; + private DateTime lastExecute = DateTime.MinValue; + + public SpreadRateLimiter(int amount, TimeSpan perTimespan) + { + delay = TimeSpan.FromMilliseconds(perTimespan.TotalMilliseconds / amount); + } + + + public void Execute(Action action) + { + // Very simple implementation; the time between actions must be at least the delay. + // This prevents bursts followed by nothing which are common with normal rate limiter implementations. + var remainingWaitTime = delay - (DateTime.Now - lastExecute); + + if (remainingWaitTime.TotalMilliseconds > 0) + Thread.Sleep(remainingWaitTime); + + action(); + lastExecute = DateTime.Now; + } + } +} diff --git a/Tapeti.Cmd/build-release.bat b/Tapeti.Cmd/build-release.bat index 9903ae7..a65fdab 100644 --- a/Tapeti.Cmd/build-release.bat +++ b/Tapeti.Cmd/build-release.bat @@ -1,3 +1,4 @@ +@Echo Off mkdir publish REM Executable is generated using self-contained=true, which is just a wrapper for "dotnet Tapeti.Cmd.dll". diff --git a/Tapeti.sln b/Tapeti.sln index 016fd47..63a14c7 100644 --- a/Tapeti.sln +++ b/Tapeti.sln @@ -57,7 +57,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tools", "Tools", "{62002327 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst index ee0b366..af3c77b 100644 --- a/docs/tapeticmd.rst +++ b/docs/tapeticmd.rst @@ -80,6 +80,10 @@ Read messages from disk as previously exported and publish them to a queue. -s , --serialization The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information. +--maxrate + The maximum amount of messages per second to import. + + Either input, message or pipe is required. Example: @@ -121,6 +125,8 @@ Reads messages from a queue and publishes them to another queue, optionally to a --targetpassword Password used to connect to the target RabbitMQ server. Defaults to the source password. +--maxrate + The maximum amount of messages per second to shovel. Example: From 70a9c04fc73dab2958fcdd2ed5f94103702c95c4 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Sat, 3 Oct 2020 09:54:36 +0200 Subject: [PATCH 3/4] Updated NuGet API key --- appveyor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/appveyor.yml b/appveyor.yml index ad72b07..f9ccb36 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -63,7 +63,7 @@ configuration: deploy: provider: NuGet api_key: - secure: 6/t8qnIiVuBCYb2TaOAHPbcQNb3g9EH++6okxqDjSaWMGoBrUEDXNCDvLVijTafZ + secure: 3WCSZAzan66vEmHZ1q3XzfOfucuAQiA+SiCDChO/gswbxfIXUpiM1eMNASDa3qWH skip_symbols: false artifact: /.*\.nupkg/ \ No newline at end of file From f1a4ab1c67eb05e9e0307f9c8019a35df34dd430 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 15 Jan 2021 09:57:46 +0100 Subject: [PATCH 4/4] Added Stop method to ISubscriber --- Tapeti/Connection/ITapetiClient.cs | 9 +++++-- Tapeti/Connection/TapetiClient.cs | 34 ++++++++++++++++++++------- Tapeti/Connection/TapetiSubscriber.cs | 26 +++++++++++++++++--- Tapeti/ISubscriber.cs | 5 ++++ 4 files changed, 61 insertions(+), 13 deletions(-) diff --git a/Tapeti/Connection/ITapetiClient.cs b/Tapeti/Connection/ITapetiClient.cs index 4add519..4a9e4de 100644 --- a/Tapeti/Connection/ITapetiClient.cs +++ b/Tapeti/Connection/ITapetiClient.cs @@ -77,8 +77,14 @@ namespace Tapeti.Connection /// Cancelled when the connection is lost /// /// The consumer implementation which will receive the messages from the queue - Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer); + /// The consumer tag as returned by BasicConsume. + Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer); + /// + /// Stops the consumer with the specified tag. + /// + /// The consumer tag as returned by Consume. + Task Cancel(string consumerTag); /// /// Creates a durable queue if it does not already exist, and updates the bindings. @@ -118,7 +124,6 @@ namespace Tapeti.Connection /// The binding to add to the dynamic queue Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding); - /// /// Closes the connection to RabbitMQ gracefully. /// diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 1719dbc..71a49d1 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -72,7 +72,6 @@ namespace Tapeti.Connection } - /// public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams) { this.config = config; @@ -184,29 +183,48 @@ namespace Tapeti.Connection /// - public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer) + public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer) { if (deletedQueues.Contains(queueName)) - return; + return null; if (string.IsNullOrEmpty(queueName)) throw new ArgumentNullException(nameof(queueName)); + string consumerTag = null; + await QueueWithRetryableChannel(channel => { if (cancellationToken.IsCancellationRequested) return; var basicConsumer = new TapetiBasicConsumer(consumer, Respond); - channel.BasicConsume(queueName, false, basicConsumer); + consumerTag = channel.BasicConsume(queueName, false, basicConsumer); + }); + + return consumerTag; + } + + + /// + public async Task Cancel(string consumerTag) + { + if (isClosing || string.IsNullOrEmpty(consumerTag)) + return; + + // No need for a retryable channel here, if the connection is lost + // so is the consumer. + await Queue(channel => + { + channel.BasicCancel(consumerTag); }); } private async Task Respond(ulong deliveryTag, ConsumeResult result) { - await taskQueue.Value.Add(() => + await Queue(channel => { // No need for a retryable channel here, if the connection is lost we can't // use the deliveryTag anymore. @@ -214,15 +232,15 @@ namespace Tapeti.Connection { case ConsumeResult.Success: case ConsumeResult.ExternalRequeue: - GetChannel().BasicAck(deliveryTag, false); + channel.BasicAck(deliveryTag, false); break; case ConsumeResult.Error: - GetChannel().BasicNack(deliveryTag, false, false); + channel.BasicNack(deliveryTag, false, false); break; case ConsumeResult.Requeue: - GetChannel().BasicNack(deliveryTag, false, true); + channel.BasicNack(deliveryTag, false, true); break; default: diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 69be2d0..9969ae0 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -13,6 +13,7 @@ namespace Tapeti.Connection private readonly Func clientFactory; private readonly ITapetiConfig config; private bool consuming; + private readonly List consumerTags = new List(); private CancellationTokenSource initializeCancellationTokenSource; @@ -50,6 +51,8 @@ namespace Tapeti.Connection { initializeCancellationTokenSource?.Cancel(); initializeCancellationTokenSource = null; + + consumerTags.Clear(); } @@ -65,6 +68,8 @@ namespace Tapeti.Connection initializeCancellationTokenSource?.Cancel(); initializeCancellationTokenSource = new CancellationTokenSource(); + consumerTags.Clear(); + cancellationToken = initializeCancellationTokenSource.Token; // ReSharper disable once MethodSupportsCancellation @@ -91,6 +96,21 @@ namespace Tapeti.Connection } + /// + public async Task Stop() + { + if (!consuming) + return; + + initializeCancellationTokenSource?.Cancel(); + initializeCancellationTokenSource = null; + + await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag))); + + consumerTags.Clear(); + consuming = false; + } + private async Task ApplyBindings(CancellationToken cancellationToken) { @@ -115,13 +135,13 @@ namespace Tapeti.Connection { var queues = config.Bindings.GroupBy(binding => binding.QueueName); - await Task.WhenAll(queues.Select(async group => + consumerTags.AddRange(await Task.WhenAll(queues.Select(async group => { var queueName = group.Key; var consumer = new TapetiConsumer(config, queueName, group); - await clientFactory().Consume(cancellationToken, queueName, consumer); - })); + return await clientFactory().Consume(cancellationToken, queueName, consumer); + }))); } diff --git a/Tapeti/ISubscriber.cs b/Tapeti/ISubscriber.cs index f1aaafb..3110f65 100644 --- a/Tapeti/ISubscriber.cs +++ b/Tapeti/ISubscriber.cs @@ -13,5 +13,10 @@ namespace Tapeti /// Starts consuming from the subscribed queues if not already started. /// Task Resume(); + + /// + /// Stops consuming from the subscribed queues. + /// + Task Stop(); } }