diff --git a/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj b/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj
deleted file mode 100644
index 7efcb1a..0000000
--- a/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj
+++ /dev/null
@@ -1,20 +0,0 @@
-
-
-
- Exe
- netcoreapp2.1
- _06_StatelessRequestResponse
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj b/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj
new file mode 100644
index 0000000..7d6c898
--- /dev/null
+++ b/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj
@@ -0,0 +1,20 @@
+
+
+
+ Exe
+ netcoreapp2.1
+ _06_StatelessRequestResponse
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/06-StatelessRequestResponse/ExampleMessageController.cs b/Examples/06-StatelessRequestResponse/ExampleMessageController.cs
similarity index 95%
rename from 06-StatelessRequestResponse/ExampleMessageController.cs
rename to Examples/06-StatelessRequestResponse/ExampleMessageController.cs
index fe01f2e..bc908ab 100644
--- a/06-StatelessRequestResponse/ExampleMessageController.cs
+++ b/Examples/06-StatelessRequestResponse/ExampleMessageController.cs
@@ -1,28 +1,28 @@
-using System;
-using ExampleLib;
-using Messaging.TapetiExample;
-using Tapeti.Annotations;
-
-namespace _06_StatelessRequestResponse
-{
- [MessageController]
- [DynamicQueue("tapeti.example.06")]
- public class ExampleMessageController
- {
- private readonly IExampleState exampleState;
-
-
- public ExampleMessageController(IExampleState exampleState)
- {
- this.exampleState = exampleState;
- }
-
-
- [ResponseHandler]
- public void HandleQuoteResponse(QuoteResponseMessage message)
- {
- Console.WriteLine("Received response: " + message.Quote);
- exampleState.Done();
- }
- }
-}
+using System;
+using ExampleLib;
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+
+namespace _06_StatelessRequestResponse
+{
+ [MessageController]
+ [DynamicQueue("tapeti.example.06")]
+ public class ExampleMessageController
+ {
+ private readonly IExampleState exampleState;
+
+
+ public ExampleMessageController(IExampleState exampleState)
+ {
+ this.exampleState = exampleState;
+ }
+
+
+ [ResponseHandler]
+ public void HandleQuoteResponse(QuoteResponseMessage message)
+ {
+ Console.WriteLine("Received response: " + message.Quote);
+ exampleState.Done();
+ }
+ }
+}
diff --git a/06-StatelessRequestResponse/Program.cs b/Examples/06-StatelessRequestResponse/Program.cs
similarity index 96%
rename from 06-StatelessRequestResponse/Program.cs
rename to Examples/06-StatelessRequestResponse/Program.cs
index 615cb9b..6cc0de3 100644
--- a/06-StatelessRequestResponse/Program.cs
+++ b/Examples/06-StatelessRequestResponse/Program.cs
@@ -1,51 +1,51 @@
-using System;
-using System.Threading.Tasks;
-using ExampleLib;
-using Messaging.TapetiExample;
-using SimpleInjector;
-using Tapeti;
-using Tapeti.DataAnnotations;
-using Tapeti.Default;
-using Tapeti.SimpleInjector;
-
-namespace _06_StatelessRequestResponse
-{
- public class Program
- {
- public static void Main(string[] args)
- {
- var container = new Container();
- var dependencyResolver = new SimpleInjectorDependencyResolver(container);
-
- container.Register();
-
- var helper = new ExampleConsoleApp(dependencyResolver);
- helper.Run(MainAsync);
- }
-
-
- internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone)
- {
- var config = new TapetiConfig(dependencyResolver)
- .WithDataAnnotations()
- .RegisterAllControllers()
- .Build();
-
-
- using (var connection = new TapetiConnection(config))
- {
- await connection.Subscribe();
-
- var publisher = dependencyResolver.Resolve();
- await publisher.PublishRequest(
- new QuoteRequestMessage
- {
- Amount = 1
- },
- c => c.HandleQuoteResponse);
-
- await waitForDone();
- }
- }
- }
-}
+using System;
+using System.Threading.Tasks;
+using ExampleLib;
+using Messaging.TapetiExample;
+using SimpleInjector;
+using Tapeti;
+using Tapeti.DataAnnotations;
+using Tapeti.Default;
+using Tapeti.SimpleInjector;
+
+namespace _06_StatelessRequestResponse
+{
+ public class Program
+ {
+ public static void Main(string[] args)
+ {
+ var container = new Container();
+ var dependencyResolver = new SimpleInjectorDependencyResolver(container);
+
+ container.Register();
+
+ var helper = new ExampleConsoleApp(dependencyResolver);
+ helper.Run(MainAsync);
+ }
+
+
+ internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone)
+ {
+ var config = new TapetiConfig(dependencyResolver)
+ .WithDataAnnotations()
+ .RegisterAllControllers()
+ .Build();
+
+
+ using (var connection = new TapetiConnection(config))
+ {
+ await connection.Subscribe();
+
+ var publisher = dependencyResolver.Resolve();
+ await publisher.PublishRequest(
+ new QuoteRequestMessage
+ {
+ Amount = 1
+ },
+ c => c.HandleQuoteResponse);
+
+ await waitForDone();
+ }
+ }
+ }
+}
diff --git a/06-StatelessRequestResponse/ReceivingMessageController.cs b/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs
similarity index 96%
rename from 06-StatelessRequestResponse/ReceivingMessageController.cs
rename to Examples/06-StatelessRequestResponse/ReceivingMessageController.cs
index 6684780..ea947c8 100644
--- a/06-StatelessRequestResponse/ReceivingMessageController.cs
+++ b/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs
@@ -1,38 +1,38 @@
-using Messaging.TapetiExample;
-using Tapeti.Annotations;
-
-namespace _06_StatelessRequestResponse
-{
- [MessageController]
- [DynamicQueue("tapeti.example.06.receiver")]
- public class ReceivingMessageController
- {
- // No publisher required, responses can simply be returned
- public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message)
- {
- string quote;
-
- switch (message.Amount)
- {
- case 1:
- // Well, they asked for it... :-)
- quote = "'";
- break;
-
- case 2:
- quote = "\"";
- break;
-
- default:
- // We have to return a response.
- quote = null;
- break;
- }
-
- return new QuoteResponseMessage
- {
- Quote = quote
- };
- }
- }
-}
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+
+namespace _06_StatelessRequestResponse
+{
+ [MessageController]
+ [DynamicQueue("tapeti.example.06.receiver")]
+ public class ReceivingMessageController
+ {
+ // No publisher required, responses can simply be returned
+ public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message)
+ {
+ string quote;
+
+ switch (message.Amount)
+ {
+ case 1:
+ // Well, they asked for it... :-)
+ quote = "'";
+ break;
+
+ case 2:
+ quote = "\"";
+ break;
+
+ default:
+ // We have to return a response.
+ quote = null;
+ break;
+ }
+
+ return new QuoteResponseMessage
+ {
+ Quote = quote
+ };
+ }
+ }
+}
diff --git a/Tapeti.Cmd/Commands/ImportCommand.cs b/Tapeti.Cmd/Commands/ImportCommand.cs
index ccdf308..a981051 100644
--- a/Tapeti.Cmd/Commands/ImportCommand.cs
+++ b/Tapeti.Cmd/Commands/ImportCommand.cs
@@ -1,4 +1,5 @@
using RabbitMQ.Client;
+using Tapeti.Cmd.RateLimiter;
using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd.Commands
@@ -10,17 +11,20 @@ namespace Tapeti.Cmd.Commands
public bool DirectToQueue { get; set; }
- public int Execute(IModel channel)
+ public int Execute(IModel channel, IRateLimiter rateLimiter)
{
var messageCount = 0;
foreach (var message in MessageSerializer.Deserialize())
{
- var exchange = DirectToQueue ? "" : message.Exchange;
- var routingKey = DirectToQueue ? message.Queue : message.RoutingKey;
+ rateLimiter.Execute(() =>
+ {
+ var exchange = DirectToQueue ? "" : message.Exchange;
+ var routingKey = DirectToQueue ? message.Queue : message.RoutingKey;
- channel.BasicPublish(exchange, routingKey, message.Properties, message.Body);
- messageCount++;
+ channel.BasicPublish(exchange, routingKey, message.Properties, message.Body);
+ messageCount++;
+ });
}
return messageCount;
diff --git a/Tapeti.Cmd/Commands/ShovelCommand.cs b/Tapeti.Cmd/Commands/ShovelCommand.cs
index 9b42a3a..34f11d2 100644
--- a/Tapeti.Cmd/Commands/ShovelCommand.cs
+++ b/Tapeti.Cmd/Commands/ShovelCommand.cs
@@ -1,5 +1,6 @@
using RabbitMQ.Client;
-
+using Tapeti.Cmd.RateLimiter;
+
namespace Tapeti.Cmd.Commands
{
public class ShovelCommand
@@ -10,7 +11,7 @@ namespace Tapeti.Cmd.Commands
public int? MaxCount { get; set; }
- public int Execute(IModel sourceChannel, IModel targetChannel)
+ public int Execute(IModel sourceChannel, IModel targetChannel, IRateLimiter rateLimiter)
{
var messageCount = 0;
@@ -22,13 +23,14 @@ namespace Tapeti.Cmd.Commands
break;
- targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body);
+ rateLimiter.Execute(() =>
+ {
+ targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body);
+ messageCount++;
-
- messageCount++;
-
- if (RemoveMessages)
- sourceChannel.BasicAck(result.DeliveryTag, false);
+ if (RemoveMessages)
+ sourceChannel.BasicAck(result.DeliveryTag, false);
+ });
}
return messageCount;
diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs
index 02fec70..f1bd5fe 100644
--- a/Tapeti.Cmd/Program.cs
+++ b/Tapeti.Cmd/Program.cs
@@ -4,10 +4,10 @@ using System.Diagnostics;
using System.IO;
using System.Text;
using CommandLine;
-using CommandLine.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
using Tapeti.Cmd.Commands;
+using Tapeti.Cmd.RateLimiter;
using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd
@@ -74,11 +74,14 @@ namespace Tapeti.Cmd
[Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public string InputMessage { get; set; }
- [Option('c', "pipe", Group = "Input", HelpText = "Messages are read from STDIN, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
+ [Option('c', "pipe", Group = "Input", HelpText = "Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public bool InputPipe { get; set; }
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
public bool PublishToExchange { get; set; }
+
+ [Option("maxrate", HelpText = "The maximum amount of messages per second to import.")]
+ public int? MaxRate { get; set; }
}
@@ -111,6 +114,9 @@ namespace Tapeti.Cmd
[Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")]
public string TargetPassword { get; set; }
+
+ [Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")]
+ public int? MaxRate { get; set; }
}
@@ -236,6 +242,15 @@ namespace Tapeti.Cmd
}
+ private static IRateLimiter GetRateLimiter(int? maxRate)
+ {
+ if (maxRate.GetValueOrDefault() <= 0)
+ return new NoRateLimiter();
+
+ return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1));
+ }
+
+
private static void RunExport(ExportOptions options)
{
int messageCount;
@@ -271,7 +286,7 @@ namespace Tapeti.Cmd
MessageSerializer = messageSerializer,
DirectToQueue = !options.PublishToExchange
- }.Execute(channel);
+ }.Execute(channel, GetRateLimiter(options.MaxRate));
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published.");
@@ -299,11 +314,11 @@ namespace Tapeti.Cmd
using (var targetConnection = GetTargetConnection(options))
using (var targetChannel = targetConnection.CreateModel())
{
- messageCount = shovelCommand.Execute(sourceChannel, targetChannel);
+ messageCount = shovelCommand.Execute(sourceChannel, targetChannel, GetRateLimiter(options.MaxRate));
}
}
else
- messageCount = shovelCommand.Execute(sourceChannel, sourceChannel);
+ messageCount = shovelCommand.Execute(sourceChannel, sourceChannel, GetRateLimiter(options.MaxRate));
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled.");
diff --git a/Tapeti.Cmd/RateLimiter/IRateLimiter.cs b/Tapeti.Cmd/RateLimiter/IRateLimiter.cs
new file mode 100644
index 0000000..6357d2b
--- /dev/null
+++ b/Tapeti.Cmd/RateLimiter/IRateLimiter.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace Tapeti.Cmd.RateLimiter
+{
+ public interface IRateLimiter
+ {
+ void Execute(Action action);
+ }
+}
diff --git a/Tapeti.Cmd/RateLimiter/NoRateLimiter.cs b/Tapeti.Cmd/RateLimiter/NoRateLimiter.cs
new file mode 100644
index 0000000..2cdce24
--- /dev/null
+++ b/Tapeti.Cmd/RateLimiter/NoRateLimiter.cs
@@ -0,0 +1,12 @@
+using System;
+
+namespace Tapeti.Cmd.RateLimiter
+{
+ public class NoRateLimiter : IRateLimiter
+ {
+ public void Execute(Action action)
+ {
+ action();
+ }
+ }
+}
diff --git a/Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs b/Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs
new file mode 100644
index 0000000..d03ad6c
--- /dev/null
+++ b/Tapeti.Cmd/RateLimiter/SpreadRateLimiter.cs
@@ -0,0 +1,30 @@
+using System;
+using System.Threading;
+
+namespace Tapeti.Cmd.RateLimiter
+{
+ public class SpreadRateLimiter : IRateLimiter
+ {
+ private readonly TimeSpan delay;
+ private DateTime lastExecute = DateTime.MinValue;
+
+ public SpreadRateLimiter(int amount, TimeSpan perTimespan)
+ {
+ delay = TimeSpan.FromMilliseconds(perTimespan.TotalMilliseconds / amount);
+ }
+
+
+ public void Execute(Action action)
+ {
+ // Very simple implementation; the time between actions must be at least the delay.
+ // This prevents bursts followed by nothing which are common with normal rate limiter implementations.
+ var remainingWaitTime = delay - (DateTime.Now - lastExecute);
+
+ if (remainingWaitTime.TotalMilliseconds > 0)
+ Thread.Sleep(remainingWaitTime);
+
+ action();
+ lastExecute = DateTime.Now;
+ }
+ }
+}
diff --git a/Tapeti.Cmd/build-release.bat b/Tapeti.Cmd/build-release.bat
index 9903ae7..a65fdab 100644
--- a/Tapeti.Cmd/build-release.bat
+++ b/Tapeti.Cmd/build-release.bat
@@ -1,3 +1,4 @@
+@Echo Off
mkdir publish
REM Executable is generated using self-contained=true, which is just a wrapper for "dotnet Tapeti.Cmd.dll".
diff --git a/Tapeti.sln b/Tapeti.sln
index 016fd47..63a14c7 100644
--- a/Tapeti.sln
+++ b/Tapeti.sln
@@ -57,7 +57,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tools", "Tools", "{62002327
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
diff --git a/Tapeti/Connection/ITapetiClient.cs b/Tapeti/Connection/ITapetiClient.cs
index 4add519..4a9e4de 100644
--- a/Tapeti/Connection/ITapetiClient.cs
+++ b/Tapeti/Connection/ITapetiClient.cs
@@ -77,8 +77,14 @@ namespace Tapeti.Connection
/// Cancelled when the connection is lost
///
/// The consumer implementation which will receive the messages from the queue
- Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
+ /// The consumer tag as returned by BasicConsume.
+ Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
+ ///
+ /// Stops the consumer with the specified tag.
+ ///
+ /// The consumer tag as returned by Consume.
+ Task Cancel(string consumerTag);
///
/// Creates a durable queue if it does not already exist, and updates the bindings.
@@ -118,7 +124,6 @@ namespace Tapeti.Connection
/// The binding to add to the dynamic queue
Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding);
-
///
/// Closes the connection to RabbitMQ gracefully.
///
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index 1719dbc..71a49d1 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -72,7 +72,6 @@ namespace Tapeti.Connection
}
- ///
public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams)
{
this.config = config;
@@ -184,29 +183,48 @@ namespace Tapeti.Connection
///
- public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
+ public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
{
if (deletedQueues.Contains(queueName))
- return;
+ return null;
if (string.IsNullOrEmpty(queueName))
throw new ArgumentNullException(nameof(queueName));
+ string consumerTag = null;
+
await QueueWithRetryableChannel(channel =>
{
if (cancellationToken.IsCancellationRequested)
return;
var basicConsumer = new TapetiBasicConsumer(consumer, Respond);
- channel.BasicConsume(queueName, false, basicConsumer);
+ consumerTag = channel.BasicConsume(queueName, false, basicConsumer);
+ });
+
+ return consumerTag;
+ }
+
+
+ ///
+ public async Task Cancel(string consumerTag)
+ {
+ if (isClosing || string.IsNullOrEmpty(consumerTag))
+ return;
+
+ // No need for a retryable channel here, if the connection is lost
+ // so is the consumer.
+ await Queue(channel =>
+ {
+ channel.BasicCancel(consumerTag);
});
}
private async Task Respond(ulong deliveryTag, ConsumeResult result)
{
- await taskQueue.Value.Add(() =>
+ await Queue(channel =>
{
// No need for a retryable channel here, if the connection is lost we can't
// use the deliveryTag anymore.
@@ -214,15 +232,15 @@ namespace Tapeti.Connection
{
case ConsumeResult.Success:
case ConsumeResult.ExternalRequeue:
- GetChannel().BasicAck(deliveryTag, false);
+ channel.BasicAck(deliveryTag, false);
break;
case ConsumeResult.Error:
- GetChannel().BasicNack(deliveryTag, false, false);
+ channel.BasicNack(deliveryTag, false, false);
break;
case ConsumeResult.Requeue:
- GetChannel().BasicNack(deliveryTag, false, true);
+ channel.BasicNack(deliveryTag, false, true);
break;
default:
diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs
index 69be2d0..9969ae0 100644
--- a/Tapeti/Connection/TapetiSubscriber.cs
+++ b/Tapeti/Connection/TapetiSubscriber.cs
@@ -13,6 +13,7 @@ namespace Tapeti.Connection
private readonly Func clientFactory;
private readonly ITapetiConfig config;
private bool consuming;
+ private readonly List consumerTags = new List();
private CancellationTokenSource initializeCancellationTokenSource;
@@ -50,6 +51,8 @@ namespace Tapeti.Connection
{
initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = null;
+
+ consumerTags.Clear();
}
@@ -65,6 +68,8 @@ namespace Tapeti.Connection
initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = new CancellationTokenSource();
+ consumerTags.Clear();
+
cancellationToken = initializeCancellationTokenSource.Token;
// ReSharper disable once MethodSupportsCancellation
@@ -91,6 +96,21 @@ namespace Tapeti.Connection
}
+ ///
+ public async Task Stop()
+ {
+ if (!consuming)
+ return;
+
+ initializeCancellationTokenSource?.Cancel();
+ initializeCancellationTokenSource = null;
+
+ await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag)));
+
+ consumerTags.Clear();
+ consuming = false;
+ }
+
private async Task ApplyBindings(CancellationToken cancellationToken)
{
@@ -115,13 +135,13 @@ namespace Tapeti.Connection
{
var queues = config.Bindings.GroupBy(binding => binding.QueueName);
- await Task.WhenAll(queues.Select(async group =>
+ consumerTags.AddRange(await Task.WhenAll(queues.Select(async group =>
{
var queueName = group.Key;
var consumer = new TapetiConsumer(config, queueName, group);
- await clientFactory().Consume(cancellationToken, queueName, consumer);
- }));
+ return await clientFactory().Consume(cancellationToken, queueName, consumer);
+ })));
}
diff --git a/Tapeti/ISubscriber.cs b/Tapeti/ISubscriber.cs
index f1aaafb..3110f65 100644
--- a/Tapeti/ISubscriber.cs
+++ b/Tapeti/ISubscriber.cs
@@ -13,5 +13,10 @@ namespace Tapeti
/// Starts consuming from the subscribed queues if not already started.
///
Task Resume();
+
+ ///
+ /// Stops consuming from the subscribed queues.
+ ///
+ Task Stop();
}
}
diff --git a/appveyor.yml b/appveyor.yml
index ad72b07..f9ccb36 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -63,7 +63,7 @@ configuration:
deploy:
provider: NuGet
api_key:
- secure: 6/t8qnIiVuBCYb2TaOAHPbcQNb3g9EH++6okxqDjSaWMGoBrUEDXNCDvLVijTafZ
+ secure: 3WCSZAzan66vEmHZ1q3XzfOfucuAQiA+SiCDChO/gswbxfIXUpiM1eMNASDa3qWH
skip_symbols: false
artifact: /.*\.nupkg/
\ No newline at end of file
diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst
index 6a81c14..af3c77b 100644
--- a/docs/tapeticmd.rst
+++ b/docs/tapeticmd.rst
@@ -80,6 +80,10 @@ Read messages from disk as previously exported and publish them to a queue.
-s , --serialization
The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.
+--maxrate
+ The maximum amount of messages per second to import.
+
+
Either input, message or pipe is required.
Example:
@@ -121,6 +125,8 @@ Reads messages from a queue and publishes them to another queue, optionally to a
--targetpassword
Password used to connect to the target RabbitMQ server. Defaults to the source password.
+--maxrate
+ The maximum amount of messages per second to shovel.
Example:
@@ -138,7 +144,7 @@ SingleFileJSON
''''''''''''''
The default serialization method. All messages are contained in a single file, where each line is a JSON document describing the message properties and it's content.
-An example message (formatted as multi-line to be more readable, but keep in mind that it must be a single line in the export file to be imported properly):
+An example message (formatted as multi-line to be more readable, but keep in mind that it **must be a single line** in the export file to be imported properly):
::
@@ -176,6 +182,29 @@ The properties correspond to the RabbitMQ client's IBasicProperties and can be o
Either Body or RawBody is present. Body is used if the ContentType is set to application/json, and will contain the original message as an inline JSON object for easy manipulation. For other content types, the RawBody contains the original encoded body.
+Below is a bare minimum example, assuming Tapeti style messages and the default direct-to-queue import (no --exchange parameter). Again, keep in mind that it **must be a single line** in the export file to be imported properly.
+
+::
+
+ {
+ "Queue": "tapeti.example.01",
+ "Properties": {
+ "ContentType": "application/json",
+ "Headers": {
+ "classType": "Messaging.TapetiExample.QuoteRequestMessage:Messaging.TapetiExample"
+ }
+ },
+ "Body": {
+ "Amount": 2
+ }
+ }
+
+Actual file contents will thus look like:
+
+::
+
+ { "Queue": "tapeti.example.01", "Properties": { "ContentType": "application/json", "Headers": { "classType": "Messaging.TapetiExample.QuoteRequestMessage:Messaging.TapetiExample" } }, "Body": { "Amount": 2 } }
+
EasyNetQHosepipe
''''''''''''''''