1
0
mirror of synced 2024-11-22 01:13:49 +00:00

Merge branch 'release/2.3'

This commit is contained in:
Mark van Renswoude 2021-01-15 10:39:42 +01:00
commit 8a94223d37
19 changed files with 321 additions and 171 deletions

View File

@ -1,20 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>_06_StatelessRequestResponse</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.9.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Examples\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Examples\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
<ProjectReference Include="..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>_06_StatelessRequestResponse</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.9.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
<ProjectReference Include="..\..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
<ProjectReference Include="..\..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
</ItemGroup>
</Project>

View File

@ -1,4 +1,5 @@
using RabbitMQ.Client; using RabbitMQ.Client;
using Tapeti.Cmd.RateLimiter;
using Tapeti.Cmd.Serialization; using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd.Commands namespace Tapeti.Cmd.Commands
@ -10,17 +11,20 @@ namespace Tapeti.Cmd.Commands
public bool DirectToQueue { get; set; } public bool DirectToQueue { get; set; }
public int Execute(IModel channel) public int Execute(IModel channel, IRateLimiter rateLimiter)
{ {
var messageCount = 0; var messageCount = 0;
foreach (var message in MessageSerializer.Deserialize()) foreach (var message in MessageSerializer.Deserialize())
{ {
var exchange = DirectToQueue ? "" : message.Exchange; rateLimiter.Execute(() =>
var routingKey = DirectToQueue ? message.Queue : message.RoutingKey; {
var exchange = DirectToQueue ? "" : message.Exchange;
var routingKey = DirectToQueue ? message.Queue : message.RoutingKey;
channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); channel.BasicPublish(exchange, routingKey, message.Properties, message.Body);
messageCount++; messageCount++;
});
} }
return messageCount; return messageCount;

View File

@ -1,4 +1,5 @@
using RabbitMQ.Client; using RabbitMQ.Client;
using Tapeti.Cmd.RateLimiter;
namespace Tapeti.Cmd.Commands namespace Tapeti.Cmd.Commands
{ {
@ -10,7 +11,7 @@ namespace Tapeti.Cmd.Commands
public int? MaxCount { get; set; } public int? MaxCount { get; set; }
public int Execute(IModel sourceChannel, IModel targetChannel) public int Execute(IModel sourceChannel, IModel targetChannel, IRateLimiter rateLimiter)
{ {
var messageCount = 0; var messageCount = 0;
@ -22,13 +23,14 @@ namespace Tapeti.Cmd.Commands
break; break;
targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body); rateLimiter.Execute(() =>
{
targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body);
messageCount++;
if (RemoveMessages)
messageCount++; sourceChannel.BasicAck(result.DeliveryTag, false);
});
if (RemoveMessages)
sourceChannel.BasicAck(result.DeliveryTag, false);
} }
return messageCount; return messageCount;

View File

@ -4,10 +4,10 @@ using System.Diagnostics;
using System.IO; using System.IO;
using System.Text; using System.Text;
using CommandLine; using CommandLine;
using CommandLine.Text;
using RabbitMQ.Client; using RabbitMQ.Client;
using RabbitMQ.Client.Framing; using RabbitMQ.Client.Framing;
using Tapeti.Cmd.Commands; using Tapeti.Cmd.Commands;
using Tapeti.Cmd.RateLimiter;
using Tapeti.Cmd.Serialization; using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd namespace Tapeti.Cmd
@ -74,11 +74,14 @@ namespace Tapeti.Cmd
[Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] [Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public string InputMessage { get; set; } public string InputMessage { get; set; }
[Option('c', "pipe", Group = "Input", HelpText = "Messages are read from STDIN, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] [Option('c', "pipe", Group = "Input", HelpText = "Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public bool InputPipe { get; set; } public bool InputPipe { get; set; }
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
public bool PublishToExchange { get; set; } public bool PublishToExchange { get; set; }
[Option("maxrate", HelpText = "The maximum amount of messages per second to import.")]
public int? MaxRate { get; set; }
} }
@ -111,6 +114,9 @@ namespace Tapeti.Cmd
[Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")]
public string TargetPassword { get; set; } public string TargetPassword { get; set; }
[Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")]
public int? MaxRate { get; set; }
} }
@ -236,6 +242,15 @@ namespace Tapeti.Cmd
} }
private static IRateLimiter GetRateLimiter(int? maxRate)
{
if (maxRate.GetValueOrDefault() <= 0)
return new NoRateLimiter();
return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1));
}
private static void RunExport(ExportOptions options) private static void RunExport(ExportOptions options)
{ {
int messageCount; int messageCount;
@ -271,7 +286,7 @@ namespace Tapeti.Cmd
MessageSerializer = messageSerializer, MessageSerializer = messageSerializer,
DirectToQueue = !options.PublishToExchange DirectToQueue = !options.PublishToExchange
}.Execute(channel); }.Execute(channel, GetRateLimiter(options.MaxRate));
} }
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published.");
@ -299,11 +314,11 @@ namespace Tapeti.Cmd
using (var targetConnection = GetTargetConnection(options)) using (var targetConnection = GetTargetConnection(options))
using (var targetChannel = targetConnection.CreateModel()) using (var targetChannel = targetConnection.CreateModel())
{ {
messageCount = shovelCommand.Execute(sourceChannel, targetChannel); messageCount = shovelCommand.Execute(sourceChannel, targetChannel, GetRateLimiter(options.MaxRate));
} }
} }
else else
messageCount = shovelCommand.Execute(sourceChannel, sourceChannel); messageCount = shovelCommand.Execute(sourceChannel, sourceChannel, GetRateLimiter(options.MaxRate));
} }
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled."); Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled.");

View File

@ -0,0 +1,9 @@
using System;
namespace Tapeti.Cmd.RateLimiter
{
public interface IRateLimiter
{
void Execute(Action action);
}
}

View File

@ -0,0 +1,12 @@
using System;
namespace Tapeti.Cmd.RateLimiter
{
public class NoRateLimiter : IRateLimiter
{
public void Execute(Action action)
{
action();
}
}
}

View File

@ -0,0 +1,30 @@
using System;
using System.Threading;
namespace Tapeti.Cmd.RateLimiter
{
public class SpreadRateLimiter : IRateLimiter
{
private readonly TimeSpan delay;
private DateTime lastExecute = DateTime.MinValue;
public SpreadRateLimiter(int amount, TimeSpan perTimespan)
{
delay = TimeSpan.FromMilliseconds(perTimespan.TotalMilliseconds / amount);
}
public void Execute(Action action)
{
// Very simple implementation; the time between actions must be at least the delay.
// This prevents bursts followed by nothing which are common with normal rate limiter implementations.
var remainingWaitTime = delay - (DateTime.Now - lastExecute);
if (remainingWaitTime.TotalMilliseconds > 0)
Thread.Sleep(remainingWaitTime);
action();
lastExecute = DateTime.Now;
}
}
}

View File

@ -1,3 +1,4 @@
@Echo Off
mkdir publish mkdir publish
REM Executable is generated using self-contained=true, which is just a wrapper for "dotnet Tapeti.Cmd.dll". REM Executable is generated using self-contained=true, which is just a wrapper for "dotnet Tapeti.Cmd.dll".

View File

@ -57,7 +57,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tools", "Tools", "{62002327
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution

View File

@ -77,8 +77,14 @@ namespace Tapeti.Connection
/// <param name="cancellationToken">Cancelled when the connection is lost</param> /// <param name="cancellationToken">Cancelled when the connection is lost</param>
/// <param name="queueName"></param> /// <param name="queueName"></param>
/// <param name="consumer">The consumer implementation which will receive the messages from the queue</param> /// <param name="consumer">The consumer implementation which will receive the messages from the queue</param>
Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer); /// <returns>The consumer tag as returned by BasicConsume.</returns>
Task<string> Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
/// <summary>
/// Stops the consumer with the specified tag.
/// </summary>
/// <param name="consumerTag">The consumer tag as returned by Consume.</param>
Task Cancel(string consumerTag);
/// <summary> /// <summary>
/// Creates a durable queue if it does not already exist, and updates the bindings. /// Creates a durable queue if it does not already exist, and updates the bindings.
@ -118,7 +124,6 @@ namespace Tapeti.Connection
/// <param name="binding">The binding to add to the dynamic queue</param> /// <param name="binding">The binding to add to the dynamic queue</param>
Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding); Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding);
/// <summary> /// <summary>
/// Closes the connection to RabbitMQ gracefully. /// Closes the connection to RabbitMQ gracefully.
/// </summary> /// </summary>

View File

@ -72,7 +72,6 @@ namespace Tapeti.Connection
} }
/// <inheritdoc />
public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams) public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams)
{ {
this.config = config; this.config = config;
@ -184,29 +183,48 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer) public async Task<string> Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
{ {
if (deletedQueues.Contains(queueName)) if (deletedQueues.Contains(queueName))
return; return null;
if (string.IsNullOrEmpty(queueName)) if (string.IsNullOrEmpty(queueName))
throw new ArgumentNullException(nameof(queueName)); throw new ArgumentNullException(nameof(queueName));
string consumerTag = null;
await QueueWithRetryableChannel(channel => await QueueWithRetryableChannel(channel =>
{ {
if (cancellationToken.IsCancellationRequested) if (cancellationToken.IsCancellationRequested)
return; return;
var basicConsumer = new TapetiBasicConsumer(consumer, Respond); var basicConsumer = new TapetiBasicConsumer(consumer, Respond);
channel.BasicConsume(queueName, false, basicConsumer); consumerTag = channel.BasicConsume(queueName, false, basicConsumer);
});
return consumerTag;
}
/// <inheritdoc />
public async Task Cancel(string consumerTag)
{
if (isClosing || string.IsNullOrEmpty(consumerTag))
return;
// No need for a retryable channel here, if the connection is lost
// so is the consumer.
await Queue(channel =>
{
channel.BasicCancel(consumerTag);
}); });
} }
private async Task Respond(ulong deliveryTag, ConsumeResult result) private async Task Respond(ulong deliveryTag, ConsumeResult result)
{ {
await taskQueue.Value.Add(() => await Queue(channel =>
{ {
// No need for a retryable channel here, if the connection is lost we can't // No need for a retryable channel here, if the connection is lost we can't
// use the deliveryTag anymore. // use the deliveryTag anymore.
@ -214,15 +232,15 @@ namespace Tapeti.Connection
{ {
case ConsumeResult.Success: case ConsumeResult.Success:
case ConsumeResult.ExternalRequeue: case ConsumeResult.ExternalRequeue:
GetChannel().BasicAck(deliveryTag, false); channel.BasicAck(deliveryTag, false);
break; break;
case ConsumeResult.Error: case ConsumeResult.Error:
GetChannel().BasicNack(deliveryTag, false, false); channel.BasicNack(deliveryTag, false, false);
break; break;
case ConsumeResult.Requeue: case ConsumeResult.Requeue:
GetChannel().BasicNack(deliveryTag, false, true); channel.BasicNack(deliveryTag, false, true);
break; break;
default: default:

View File

@ -13,6 +13,7 @@ namespace Tapeti.Connection
private readonly Func<ITapetiClient> clientFactory; private readonly Func<ITapetiClient> clientFactory;
private readonly ITapetiConfig config; private readonly ITapetiConfig config;
private bool consuming; private bool consuming;
private readonly List<string> consumerTags = new List<string>();
private CancellationTokenSource initializeCancellationTokenSource; private CancellationTokenSource initializeCancellationTokenSource;
@ -50,6 +51,8 @@ namespace Tapeti.Connection
{ {
initializeCancellationTokenSource?.Cancel(); initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = null; initializeCancellationTokenSource = null;
consumerTags.Clear();
} }
@ -65,6 +68,8 @@ namespace Tapeti.Connection
initializeCancellationTokenSource?.Cancel(); initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = new CancellationTokenSource(); initializeCancellationTokenSource = new CancellationTokenSource();
consumerTags.Clear();
cancellationToken = initializeCancellationTokenSource.Token; cancellationToken = initializeCancellationTokenSource.Token;
// ReSharper disable once MethodSupportsCancellation // ReSharper disable once MethodSupportsCancellation
@ -91,6 +96,21 @@ namespace Tapeti.Connection
} }
/// <inheritdoc />
public async Task Stop()
{
if (!consuming)
return;
initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = null;
await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag)));
consumerTags.Clear();
consuming = false;
}
private async Task ApplyBindings(CancellationToken cancellationToken) private async Task ApplyBindings(CancellationToken cancellationToken)
{ {
@ -115,13 +135,13 @@ namespace Tapeti.Connection
{ {
var queues = config.Bindings.GroupBy(binding => binding.QueueName); var queues = config.Bindings.GroupBy(binding => binding.QueueName);
await Task.WhenAll(queues.Select(async group => consumerTags.AddRange(await Task.WhenAll(queues.Select(async group =>
{ {
var queueName = group.Key; var queueName = group.Key;
var consumer = new TapetiConsumer(config, queueName, group); var consumer = new TapetiConsumer(config, queueName, group);
await clientFactory().Consume(cancellationToken, queueName, consumer); return await clientFactory().Consume(cancellationToken, queueName, consumer);
})); })));
} }

View File

@ -13,5 +13,10 @@ namespace Tapeti
/// Starts consuming from the subscribed queues if not already started. /// Starts consuming from the subscribed queues if not already started.
/// </summary> /// </summary>
Task Resume(); Task Resume();
/// <summary>
/// Stops consuming from the subscribed queues.
/// </summary>
Task Stop();
} }
} }

View File

@ -63,7 +63,7 @@ configuration:
deploy: deploy:
provider: NuGet provider: NuGet
api_key: api_key:
secure: 6/t8qnIiVuBCYb2TaOAHPbcQNb3g9EH++6okxqDjSaWMGoBrUEDXNCDvLVijTafZ secure: 3WCSZAzan66vEmHZ1q3XzfOfucuAQiA+SiCDChO/gswbxfIXUpiM1eMNASDa3qWH
skip_symbols: false skip_symbols: false
artifact: /.*\.nupkg/ artifact: /.*\.nupkg/

View File

@ -80,6 +80,10 @@ Read messages from disk as previously exported and publish them to a queue.
-s <method>, --serialization <method> -s <method>, --serialization <method>
The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information. The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.
--maxrate <messages per second>
The maximum amount of messages per second to import.
Either input, message or pipe is required. Either input, message or pipe is required.
Example: Example:
@ -121,6 +125,8 @@ Reads messages from a queue and publishes them to another queue, optionally to a
--targetpassword <password> --targetpassword <password>
Password used to connect to the target RabbitMQ server. Defaults to the source password. Password used to connect to the target RabbitMQ server. Defaults to the source password.
--maxrate <messages per second>
The maximum amount of messages per second to shovel.
Example: Example:
@ -138,7 +144,7 @@ SingleFileJSON
'''''''''''''' ''''''''''''''
The default serialization method. All messages are contained in a single file, where each line is a JSON document describing the message properties and it's content. The default serialization method. All messages are contained in a single file, where each line is a JSON document describing the message properties and it's content.
An example message (formatted as multi-line to be more readable, but keep in mind that it must be a single line in the export file to be imported properly): An example message (formatted as multi-line to be more readable, but keep in mind that it **must be a single line** in the export file to be imported properly):
:: ::
@ -176,6 +182,29 @@ The properties correspond to the RabbitMQ client's IBasicProperties and can be o
Either Body or RawBody is present. Body is used if the ContentType is set to application/json, and will contain the original message as an inline JSON object for easy manipulation. For other content types, the RawBody contains the original encoded body. Either Body or RawBody is present. Body is used if the ContentType is set to application/json, and will contain the original message as an inline JSON object for easy manipulation. For other content types, the RawBody contains the original encoded body.
Below is a bare minimum example, assuming Tapeti style messages and the default direct-to-queue import (no --exchange parameter). Again, keep in mind that it **must be a single line** in the export file to be imported properly.
::
{
"Queue": "tapeti.example.01",
"Properties": {
"ContentType": "application/json",
"Headers": {
"classType": "Messaging.TapetiExample.QuoteRequestMessage:Messaging.TapetiExample"
}
},
"Body": {
"Amount": 2
}
}
Actual file contents will thus look like:
::
{ "Queue": "tapeti.example.01", "Properties": { "ContentType": "application/json", "Headers": { "classType": "Messaging.TapetiExample.QuoteRequestMessage:Messaging.TapetiExample" } }, "Body": { "Amount": 2 } }
EasyNetQHosepipe EasyNetQHosepipe
'''''''''''''''' ''''''''''''''''