Implemented a rate limiting option for Tapeti.Cmd

Moved example 06 to the "examples" sub-folder
This commit is contained in:
Mark van Renswoude 2020-07-03 15:51:41 +02:00
parent 266c4475a6
commit e31d638a11
14 changed files with 234 additions and 155 deletions

View File

@ -1,20 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>_06_StatelessRequestResponse</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.9.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Examples\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Examples\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
<ProjectReference Include="..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>_06_StatelessRequestResponse</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.9.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
<ProjectReference Include="..\..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
<ProjectReference Include="..\..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
</ItemGroup>
</Project>

View File

@ -1,28 +1,28 @@
using System; using System;
using ExampleLib; using ExampleLib;
using Messaging.TapetiExample; using Messaging.TapetiExample;
using Tapeti.Annotations; using Tapeti.Annotations;
namespace _06_StatelessRequestResponse namespace _06_StatelessRequestResponse
{ {
[MessageController] [MessageController]
[DynamicQueue("tapeti.example.06")] [DynamicQueue("tapeti.example.06")]
public class ExampleMessageController public class ExampleMessageController
{ {
private readonly IExampleState exampleState; private readonly IExampleState exampleState;
public ExampleMessageController(IExampleState exampleState) public ExampleMessageController(IExampleState exampleState)
{ {
this.exampleState = exampleState; this.exampleState = exampleState;
} }
[ResponseHandler] [ResponseHandler]
public void HandleQuoteResponse(QuoteResponseMessage message) public void HandleQuoteResponse(QuoteResponseMessage message)
{ {
Console.WriteLine("Received response: " + message.Quote); Console.WriteLine("Received response: " + message.Quote);
exampleState.Done(); exampleState.Done();
} }
} }
} }

View File

@ -1,51 +1,51 @@
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using ExampleLib; using ExampleLib;
using Messaging.TapetiExample; using Messaging.TapetiExample;
using SimpleInjector; using SimpleInjector;
using Tapeti; using Tapeti;
using Tapeti.DataAnnotations; using Tapeti.DataAnnotations;
using Tapeti.Default; using Tapeti.Default;
using Tapeti.SimpleInjector; using Tapeti.SimpleInjector;
namespace _06_StatelessRequestResponse namespace _06_StatelessRequestResponse
{ {
public class Program public class Program
{ {
public static void Main(string[] args) public static void Main(string[] args)
{ {
var container = new Container(); var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container); var dependencyResolver = new SimpleInjectorDependencyResolver(container);
container.Register<ILogger, ConsoleLogger>(); container.Register<ILogger, ConsoleLogger>();
var helper = new ExampleConsoleApp(dependencyResolver); var helper = new ExampleConsoleApp(dependencyResolver);
helper.Run(MainAsync); helper.Run(MainAsync);
} }
internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func<Task> waitForDone) internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func<Task> waitForDone)
{ {
var config = new TapetiConfig(dependencyResolver) var config = new TapetiConfig(dependencyResolver)
.WithDataAnnotations() .WithDataAnnotations()
.RegisterAllControllers() .RegisterAllControllers()
.Build(); .Build();
using (var connection = new TapetiConnection(config)) using (var connection = new TapetiConnection(config))
{ {
await connection.Subscribe(); await connection.Subscribe();
var publisher = dependencyResolver.Resolve<IPublisher>(); var publisher = dependencyResolver.Resolve<IPublisher>();
await publisher.PublishRequest<ExampleMessageController, QuoteRequestMessage, QuoteResponseMessage>( await publisher.PublishRequest<ExampleMessageController, QuoteRequestMessage, QuoteResponseMessage>(
new QuoteRequestMessage new QuoteRequestMessage
{ {
Amount = 1 Amount = 1
}, },
c => c.HandleQuoteResponse); c => c.HandleQuoteResponse);
await waitForDone(); await waitForDone();
} }
} }
} }
} }

View File

@ -1,38 +1,38 @@
using Messaging.TapetiExample; using Messaging.TapetiExample;
using Tapeti.Annotations; using Tapeti.Annotations;
namespace _06_StatelessRequestResponse namespace _06_StatelessRequestResponse
{ {
[MessageController] [MessageController]
[DynamicQueue("tapeti.example.06.receiver")] [DynamicQueue("tapeti.example.06.receiver")]
public class ReceivingMessageController public class ReceivingMessageController
{ {
// No publisher required, responses can simply be returned // No publisher required, responses can simply be returned
public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message) public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message)
{ {
string quote; string quote;
switch (message.Amount) switch (message.Amount)
{ {
case 1: case 1:
// Well, they asked for it... :-) // Well, they asked for it... :-)
quote = "'"; quote = "'";
break; break;
case 2: case 2:
quote = "\""; quote = "\"";
break; break;
default: default:
// We have to return a response. // We have to return a response.
quote = null; quote = null;
break; break;
} }
return new QuoteResponseMessage return new QuoteResponseMessage
{ {
Quote = quote Quote = quote
}; };
} }
} }
} }

View File

@ -1,4 +1,5 @@
using RabbitMQ.Client; using RabbitMQ.Client;
using Tapeti.Cmd.RateLimiter;
using Tapeti.Cmd.Serialization; using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd.Commands namespace Tapeti.Cmd.Commands
@ -10,17 +11,20 @@ namespace Tapeti.Cmd.Commands
public bool DirectToQueue { get; set; } public bool DirectToQueue { get; set; }
public int Execute(IModel channel) public int Execute(IModel channel, IRateLimiter rateLimiter)
{ {
var messageCount = 0; var messageCount = 0;
foreach (var message in MessageSerializer.Deserialize()) foreach (var message in MessageSerializer.Deserialize())
{ {
var exchange = DirectToQueue ? "" : message.Exchange; rateLimiter.Execute(() =>
var routingKey = DirectToQueue ? message.Queue : message.RoutingKey; {
var exchange = DirectToQueue ? "" : message.Exchange;
var routingKey = DirectToQueue ? message.Queue : message.RoutingKey;
channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); channel.BasicPublish(exchange, routingKey, message.Properties, message.Body);
messageCount++; messageCount++;
});
} }
return messageCount; return messageCount;

View File

@ -1,5 +1,6 @@
using RabbitMQ.Client; using RabbitMQ.Client;
using Tapeti.Cmd.RateLimiter;
namespace Tapeti.Cmd.Commands namespace Tapeti.Cmd.Commands
{ {
public class ShovelCommand public class ShovelCommand
@ -10,7 +11,7 @@ namespace Tapeti.Cmd.Commands
public int? MaxCount { get; set; } public int? MaxCount { get; set; }
public int Execute(IModel sourceChannel, IModel targetChannel) public int Execute(IModel sourceChannel, IModel targetChannel, IRateLimiter rateLimiter)
{ {
var messageCount = 0; var messageCount = 0;
@ -22,13 +23,14 @@ namespace Tapeti.Cmd.Commands
break; break;
targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body); rateLimiter.Execute(() =>
{
targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body);
messageCount++;
if (RemoveMessages)
messageCount++; sourceChannel.BasicAck(result.DeliveryTag, false);
});
if (RemoveMessages)
sourceChannel.BasicAck(result.DeliveryTag, false);
} }
return messageCount; return messageCount;

View File

@ -4,10 +4,10 @@ using System.Diagnostics;
using System.IO; using System.IO;
using System.Text; using System.Text;
using CommandLine; using CommandLine;
using CommandLine.Text;
using RabbitMQ.Client; using RabbitMQ.Client;
using RabbitMQ.Client.Framing; using RabbitMQ.Client.Framing;
using Tapeti.Cmd.Commands; using Tapeti.Cmd.Commands;
using Tapeti.Cmd.RateLimiter;
using Tapeti.Cmd.Serialization; using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd namespace Tapeti.Cmd
@ -79,6 +79,9 @@ namespace Tapeti.Cmd
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
public bool PublishToExchange { get; set; } public bool PublishToExchange { get; set; }
[Option("maxrate", HelpText = "The maximum amount of messages per second to import.")]
public int? MaxRate { get; set; }
} }
@ -111,6 +114,9 @@ namespace Tapeti.Cmd
[Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")]
public string TargetPassword { get; set; } public string TargetPassword { get; set; }
[Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")]
public int? MaxRate { get; set; }
} }
@ -236,6 +242,15 @@ namespace Tapeti.Cmd
} }
private static IRateLimiter GetRateLimiter(int? maxRate)
{
if (maxRate.GetValueOrDefault() <= 0)
return new NoRateLimiter();
return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1));
}
private static void RunExport(ExportOptions options) private static void RunExport(ExportOptions options)
{ {
int messageCount; int messageCount;
@ -271,7 +286,7 @@ namespace Tapeti.Cmd
MessageSerializer = messageSerializer, MessageSerializer = messageSerializer,
DirectToQueue = !options.PublishToExchange DirectToQueue = !options.PublishToExchange
}.Execute(channel); }.Execute(channel, GetRateLimiter(options.MaxRate));
} }
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published.");
@ -299,11 +314,11 @@ namespace Tapeti.Cmd
using (var targetConnection = GetTargetConnection(options)) using (var targetConnection = GetTargetConnection(options))
using (var targetChannel = targetConnection.CreateModel()) using (var targetChannel = targetConnection.CreateModel())
{ {
messageCount = shovelCommand.Execute(sourceChannel, targetChannel); messageCount = shovelCommand.Execute(sourceChannel, targetChannel, GetRateLimiter(options.MaxRate));
} }
} }
else else
messageCount = shovelCommand.Execute(sourceChannel, sourceChannel); messageCount = shovelCommand.Execute(sourceChannel, sourceChannel, GetRateLimiter(options.MaxRate));
} }
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled."); Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled.");

View File

@ -0,0 +1,9 @@
using System;
namespace Tapeti.Cmd.RateLimiter
{
public interface IRateLimiter
{
void Execute(Action action);
}
}

View File

@ -0,0 +1,12 @@
using System;
namespace Tapeti.Cmd.RateLimiter
{
public class NoRateLimiter : IRateLimiter
{
public void Execute(Action action)
{
action();
}
}
}

View File

@ -0,0 +1,30 @@
using System;
using System.Threading;
namespace Tapeti.Cmd.RateLimiter
{
public class SpreadRateLimiter : IRateLimiter
{
private readonly TimeSpan delay;
private DateTime lastExecute = DateTime.MinValue;
public SpreadRateLimiter(int amount, TimeSpan perTimespan)
{
delay = TimeSpan.FromMilliseconds(perTimespan.TotalMilliseconds / amount);
}
public void Execute(Action action)
{
// Very simple implementation; the time between actions must be at least the delay.
// This prevents bursts followed by nothing which are common with normal rate limiter implementations.
var remainingWaitTime = delay - (DateTime.Now - lastExecute);
if (remainingWaitTime.TotalMilliseconds > 0)
Thread.Sleep(remainingWaitTime);
action();
lastExecute = DateTime.Now;
}
}
}

View File

@ -1,3 +1,4 @@
@Echo Off
mkdir publish mkdir publish
REM Executable is generated using self-contained=true, which is just a wrapper for "dotnet Tapeti.Cmd.dll". REM Executable is generated using self-contained=true, which is just a wrapper for "dotnet Tapeti.Cmd.dll".

View File

@ -57,7 +57,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tools", "Tools", "{62002327
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution

View File

@ -80,6 +80,10 @@ Read messages from disk as previously exported and publish them to a queue.
-s <method>, --serialization <method> -s <method>, --serialization <method>
The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information. The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.
--maxrate <messages per second>
The maximum amount of messages per second to import.
Either input, message or pipe is required. Either input, message or pipe is required.
Example: Example:
@ -121,6 +125,8 @@ Reads messages from a queue and publishes them to another queue, optionally to a
--targetpassword <password> --targetpassword <password>
Password used to connect to the target RabbitMQ server. Defaults to the source password. Password used to connect to the target RabbitMQ server. Defaults to the source password.
--maxrate <messages per second>
The maximum amount of messages per second to shovel.
Example: Example: