From e31d638a11724ac4bcebe28bec396084f515d76a Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 3 Jul 2020 15:51:41 +0200 Subject: [PATCH] Implemented a rate limiting option for Tapeti.Cmd Moved example 06 to the "examples" sub-folder --- .../06-StatelessRequestResponse.csproj | 20 ---- .../06-StatelessRequestResponse.csproj | 20 ++++ .../ExampleMessageController.cs | 56 +++++----- .../06-StatelessRequestResponse}/Program.cs | 102 +++++++++--------- .../ReceivingMessageController.cs | 76 ++++++------- Tapeti.Cmd/Commands/ImportCommand.cs | 14 ++- Tapeti.Cmd/Commands/ShovelCommand.cs | 18 ++-- Tapeti.Cmd/Program.cs | 23 +++- Tapeti.Cmd/RateLimiter/IRateLimiter.cs | 9 ++ Tapeti.Cmd/RateLimiter/NoRateLimiter.cs | 12 +++ Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs | 30 ++++++ Tapeti.Cmd/build-release.bat | 1 + Tapeti.sln | 2 +- docs/tapeticmd.rst | 6 ++ 14 files changed, 234 insertions(+), 155 deletions(-) delete mode 100644 06-StatelessRequestResponse/06-StatelessRequestResponse.csproj create mode 100644 Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj rename {06-StatelessRequestResponse => Examples/06-StatelessRequestResponse}/ExampleMessageController.cs (95%) rename {06-StatelessRequestResponse => Examples/06-StatelessRequestResponse}/Program.cs (96%) rename {06-StatelessRequestResponse => Examples/06-StatelessRequestResponse}/ReceivingMessageController.cs (96%) create mode 100644 Tapeti.Cmd/RateLimiter/IRateLimiter.cs create mode 100644 Tapeti.Cmd/RateLimiter/NoRateLimiter.cs create mode 100644 Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs diff --git a/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj b/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj deleted file mode 100644 index 7efcb1a..0000000 --- a/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj +++ /dev/null @@ -1,20 +0,0 @@ - - - - Exe - netcoreapp2.1 - _06_StatelessRequestResponse - - - - - - - - - - - - - - diff --git a/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj b/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj new file mode 100644 index 0000000..7d6c898 --- /dev/null +++ b/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj @@ -0,0 +1,20 @@ + + + + Exe + netcoreapp2.1 + _06_StatelessRequestResponse + + + + + + + + + + + + + + diff --git a/06-StatelessRequestResponse/ExampleMessageController.cs b/Examples/06-StatelessRequestResponse/ExampleMessageController.cs similarity index 95% rename from 06-StatelessRequestResponse/ExampleMessageController.cs rename to Examples/06-StatelessRequestResponse/ExampleMessageController.cs index fe01f2e..bc908ab 100644 --- a/06-StatelessRequestResponse/ExampleMessageController.cs +++ b/Examples/06-StatelessRequestResponse/ExampleMessageController.cs @@ -1,28 +1,28 @@ -using System; -using ExampleLib; -using Messaging.TapetiExample; -using Tapeti.Annotations; - -namespace _06_StatelessRequestResponse -{ - [MessageController] - [DynamicQueue("tapeti.example.06")] - public class ExampleMessageController - { - private readonly IExampleState exampleState; - - - public ExampleMessageController(IExampleState exampleState) - { - this.exampleState = exampleState; - } - - - [ResponseHandler] - public void HandleQuoteResponse(QuoteResponseMessage message) - { - Console.WriteLine("Received response: " + message.Quote); - exampleState.Done(); - } - } -} +using System; +using ExampleLib; +using Messaging.TapetiExample; +using Tapeti.Annotations; + +namespace _06_StatelessRequestResponse +{ + [MessageController] + [DynamicQueue("tapeti.example.06")] + public class ExampleMessageController + { + private readonly IExampleState exampleState; + + + public ExampleMessageController(IExampleState exampleState) + { + this.exampleState = exampleState; + } + + + [ResponseHandler] + public void HandleQuoteResponse(QuoteResponseMessage message) + { + Console.WriteLine("Received response: " + message.Quote); + exampleState.Done(); + } + } +} diff --git a/06-StatelessRequestResponse/Program.cs b/Examples/06-StatelessRequestResponse/Program.cs similarity index 96% rename from 06-StatelessRequestResponse/Program.cs rename to Examples/06-StatelessRequestResponse/Program.cs index 615cb9b..6cc0de3 100644 --- a/06-StatelessRequestResponse/Program.cs +++ b/Examples/06-StatelessRequestResponse/Program.cs @@ -1,51 +1,51 @@ -using System; -using System.Threading.Tasks; -using ExampleLib; -using Messaging.TapetiExample; -using SimpleInjector; -using Tapeti; -using Tapeti.DataAnnotations; -using Tapeti.Default; -using Tapeti.SimpleInjector; - -namespace _06_StatelessRequestResponse -{ - public class Program - { - public static void Main(string[] args) - { - var container = new Container(); - var dependencyResolver = new SimpleInjectorDependencyResolver(container); - - container.Register(); - - var helper = new ExampleConsoleApp(dependencyResolver); - helper.Run(MainAsync); - } - - - internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone) - { - var config = new TapetiConfig(dependencyResolver) - .WithDataAnnotations() - .RegisterAllControllers() - .Build(); - - - using (var connection = new TapetiConnection(config)) - { - await connection.Subscribe(); - - var publisher = dependencyResolver.Resolve(); - await publisher.PublishRequest( - new QuoteRequestMessage - { - Amount = 1 - }, - c => c.HandleQuoteResponse); - - await waitForDone(); - } - } - } -} +using System; +using System.Threading.Tasks; +using ExampleLib; +using Messaging.TapetiExample; +using SimpleInjector; +using Tapeti; +using Tapeti.DataAnnotations; +using Tapeti.Default; +using Tapeti.SimpleInjector; + +namespace _06_StatelessRequestResponse +{ + public class Program + { + public static void Main(string[] args) + { + var container = new Container(); + var dependencyResolver = new SimpleInjectorDependencyResolver(container); + + container.Register(); + + var helper = new ExampleConsoleApp(dependencyResolver); + helper.Run(MainAsync); + } + + + internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone) + { + var config = new TapetiConfig(dependencyResolver) + .WithDataAnnotations() + .RegisterAllControllers() + .Build(); + + + using (var connection = new TapetiConnection(config)) + { + await connection.Subscribe(); + + var publisher = dependencyResolver.Resolve(); + await publisher.PublishRequest( + new QuoteRequestMessage + { + Amount = 1 + }, + c => c.HandleQuoteResponse); + + await waitForDone(); + } + } + } +} diff --git a/06-StatelessRequestResponse/ReceivingMessageController.cs b/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs similarity index 96% rename from 06-StatelessRequestResponse/ReceivingMessageController.cs rename to Examples/06-StatelessRequestResponse/ReceivingMessageController.cs index 6684780..ea947c8 100644 --- a/06-StatelessRequestResponse/ReceivingMessageController.cs +++ b/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs @@ -1,38 +1,38 @@ -using Messaging.TapetiExample; -using Tapeti.Annotations; - -namespace _06_StatelessRequestResponse -{ - [MessageController] - [DynamicQueue("tapeti.example.06.receiver")] - public class ReceivingMessageController - { - // No publisher required, responses can simply be returned - public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message) - { - string quote; - - switch (message.Amount) - { - case 1: - // Well, they asked for it... :-) - quote = "'"; - break; - - case 2: - quote = "\""; - break; - - default: - // We have to return a response. - quote = null; - break; - } - - return new QuoteResponseMessage - { - Quote = quote - }; - } - } -} +using Messaging.TapetiExample; +using Tapeti.Annotations; + +namespace _06_StatelessRequestResponse +{ + [MessageController] + [DynamicQueue("tapeti.example.06.receiver")] + public class ReceivingMessageController + { + // No publisher required, responses can simply be returned + public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message) + { + string quote; + + switch (message.Amount) + { + case 1: + // Well, they asked for it... :-) + quote = "'"; + break; + + case 2: + quote = "\""; + break; + + default: + // We have to return a response. + quote = null; + break; + } + + return new QuoteResponseMessage + { + Quote = quote + }; + } + } +} diff --git a/Tapeti.Cmd/Commands/ImportCommand.cs b/Tapeti.Cmd/Commands/ImportCommand.cs index ccdf308..a981051 100644 --- a/Tapeti.Cmd/Commands/ImportCommand.cs +++ b/Tapeti.Cmd/Commands/ImportCommand.cs @@ -1,4 +1,5 @@ using RabbitMQ.Client; +using Tapeti.Cmd.RateLimiter; using Tapeti.Cmd.Serialization; namespace Tapeti.Cmd.Commands @@ -10,17 +11,20 @@ namespace Tapeti.Cmd.Commands public bool DirectToQueue { get; set; } - public int Execute(IModel channel) + public int Execute(IModel channel, IRateLimiter rateLimiter) { var messageCount = 0; foreach (var message in MessageSerializer.Deserialize()) { - var exchange = DirectToQueue ? "" : message.Exchange; - var routingKey = DirectToQueue ? message.Queue : message.RoutingKey; + rateLimiter.Execute(() => + { + var exchange = DirectToQueue ? "" : message.Exchange; + var routingKey = DirectToQueue ? message.Queue : message.RoutingKey; - channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); - messageCount++; + channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); + messageCount++; + }); } return messageCount; diff --git a/Tapeti.Cmd/Commands/ShovelCommand.cs b/Tapeti.Cmd/Commands/ShovelCommand.cs index 9b42a3a..34f11d2 100644 --- a/Tapeti.Cmd/Commands/ShovelCommand.cs +++ b/Tapeti.Cmd/Commands/ShovelCommand.cs @@ -1,5 +1,6 @@ using RabbitMQ.Client; - +using Tapeti.Cmd.RateLimiter; + namespace Tapeti.Cmd.Commands { public class ShovelCommand @@ -10,7 +11,7 @@ namespace Tapeti.Cmd.Commands public int? MaxCount { get; set; } - public int Execute(IModel sourceChannel, IModel targetChannel) + public int Execute(IModel sourceChannel, IModel targetChannel, IRateLimiter rateLimiter) { var messageCount = 0; @@ -22,13 +23,14 @@ namespace Tapeti.Cmd.Commands break; - targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body); + rateLimiter.Execute(() => + { + targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body); + messageCount++; - - messageCount++; - - if (RemoveMessages) - sourceChannel.BasicAck(result.DeliveryTag, false); + if (RemoveMessages) + sourceChannel.BasicAck(result.DeliveryTag, false); + }); } return messageCount; diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index 879e1af..f1bd5fe 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -4,10 +4,10 @@ using System.Diagnostics; using System.IO; using System.Text; using CommandLine; -using CommandLine.Text; using RabbitMQ.Client; using RabbitMQ.Client.Framing; using Tapeti.Cmd.Commands; +using Tapeti.Cmd.RateLimiter; using Tapeti.Cmd.Serialization; namespace Tapeti.Cmd @@ -79,6 +79,9 @@ namespace Tapeti.Cmd [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] public bool PublishToExchange { get; set; } + + [Option("maxrate", HelpText = "The maximum amount of messages per second to import.")] + public int? MaxRate { get; set; } } @@ -111,6 +114,9 @@ namespace Tapeti.Cmd [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] public string TargetPassword { get; set; } + + [Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")] + public int? MaxRate { get; set; } } @@ -236,6 +242,15 @@ namespace Tapeti.Cmd } + private static IRateLimiter GetRateLimiter(int? maxRate) + { + if (maxRate.GetValueOrDefault() <= 0) + return new NoRateLimiter(); + + return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1)); + } + + private static void RunExport(ExportOptions options) { int messageCount; @@ -271,7 +286,7 @@ namespace Tapeti.Cmd MessageSerializer = messageSerializer, DirectToQueue = !options.PublishToExchange - }.Execute(channel); + }.Execute(channel, GetRateLimiter(options.MaxRate)); } Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); @@ -299,11 +314,11 @@ namespace Tapeti.Cmd using (var targetConnection = GetTargetConnection(options)) using (var targetChannel = targetConnection.CreateModel()) { - messageCount = shovelCommand.Execute(sourceChannel, targetChannel); + messageCount = shovelCommand.Execute(sourceChannel, targetChannel, GetRateLimiter(options.MaxRate)); } } else - messageCount = shovelCommand.Execute(sourceChannel, sourceChannel); + messageCount = shovelCommand.Execute(sourceChannel, sourceChannel, GetRateLimiter(options.MaxRate)); } Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled."); diff --git a/Tapeti.Cmd/RateLimiter/IRateLimiter.cs b/Tapeti.Cmd/RateLimiter/IRateLimiter.cs new file mode 100644 index 0000000..6357d2b --- /dev/null +++ b/Tapeti.Cmd/RateLimiter/IRateLimiter.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Cmd.RateLimiter +{ + public interface IRateLimiter + { + void Execute(Action action); + } +} diff --git a/Tapeti.Cmd/RateLimiter/NoRateLimiter.cs b/Tapeti.Cmd/RateLimiter/NoRateLimiter.cs new file mode 100644 index 0000000..2cdce24 --- /dev/null +++ b/Tapeti.Cmd/RateLimiter/NoRateLimiter.cs @@ -0,0 +1,12 @@ +using System; + +namespace Tapeti.Cmd.RateLimiter +{ + public class NoRateLimiter : IRateLimiter + { + public void Execute(Action action) + { + action(); + } + } +} diff --git a/Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs b/Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs new file mode 100644 index 0000000..d03ad6c --- /dev/null +++ b/Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs @@ -0,0 +1,30 @@ +using System; +using System.Threading; + +namespace Tapeti.Cmd.RateLimiter +{ + public class SpreadRateLimiter : IRateLimiter + { + private readonly TimeSpan delay; + private DateTime lastExecute = DateTime.MinValue; + + public SpreadRateLimiter(int amount, TimeSpan perTimespan) + { + delay = TimeSpan.FromMilliseconds(perTimespan.TotalMilliseconds / amount); + } + + + public void Execute(Action action) + { + // Very simple implementation; the time between actions must be at least the delay. + // This prevents bursts followed by nothing which are common with normal rate limiter implementations. + var remainingWaitTime = delay - (DateTime.Now - lastExecute); + + if (remainingWaitTime.TotalMilliseconds > 0) + Thread.Sleep(remainingWaitTime); + + action(); + lastExecute = DateTime.Now; + } + } +} diff --git a/Tapeti.Cmd/build-release.bat b/Tapeti.Cmd/build-release.bat index 9903ae7..a65fdab 100644 --- a/Tapeti.Cmd/build-release.bat +++ b/Tapeti.Cmd/build-release.bat @@ -1,3 +1,4 @@ +@Echo Off mkdir publish REM Executable is generated using self-contained=true, which is just a wrapper for "dotnet Tapeti.Cmd.dll". diff --git a/Tapeti.sln b/Tapeti.sln index 016fd47..63a14c7 100644 --- a/Tapeti.sln +++ b/Tapeti.sln @@ -57,7 +57,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tools", "Tools", "{62002327 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst index ee0b366..af3c77b 100644 --- a/docs/tapeticmd.rst +++ b/docs/tapeticmd.rst @@ -80,6 +80,10 @@ Read messages from disk as previously exported and publish them to a queue. -s , --serialization The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information. +--maxrate + The maximum amount of messages per second to import. + + Either input, message or pipe is required. Example: @@ -121,6 +125,8 @@ Reads messages from a queue and publishes them to another queue, optionally to a --targetpassword Password used to connect to the target RabbitMQ server. Defaults to the source password. +--maxrate + The maximum amount of messages per second to shovel. Example: