1
0
mirror of synced 2024-11-22 01:13:49 +00:00

Implemented skip and maxcount parameters for Tapeti.Cmd import

This commit is contained in:
Mark van Renswoude 2021-09-15 20:07:03 +02:00
parent 254af41875
commit f887cd8b78
2 changed files with 30 additions and 14 deletions

View File

@ -29,6 +29,12 @@ namespace Tapeti.Cmd.Verbs
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
public bool PublishToExchange { get; set; } public bool PublishToExchange { get; set; }
[Option("skip", HelpText = "(Default: 0) Number of messages in the input to skip. Useful if a previous import was interrupted.", Default = 0)]
public int Skip { get; set; }
[Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to import.")]
public int? MaxCount { get; set; }
[Option("maxrate", HelpText = "The maximum amount of messages per second to import.")] [Option("maxrate", HelpText = "The maximum amount of messages per second to import.")]
public int? MaxRate { get; set; } public int? MaxRate { get; set; }
@ -61,6 +67,7 @@ namespace Tapeti.Cmd.Verbs
var totalCount = messageSerializer.GetMessageCount(); var totalCount = messageSerializer.GetMessageCount();
var messageCount = 0; var messageCount = 0;
var skip = Math.Max(options.Skip, 0);
ProgressBar progress = null; ProgressBar progress = null;
@ -70,24 +77,27 @@ namespace Tapeti.Cmd.Verbs
{ {
foreach (var message in messageSerializer.Deserialize(channel)) foreach (var message in messageSerializer.Deserialize(channel))
{ {
if (console.Cancelled) if (console.Cancelled || (options.MaxCount.HasValue && messageCount >= options.MaxCount.Value))
break; break;
rateLimiter.Execute(() => if (skip > 0)
{ skip--;
if (console.Cancelled) else
return; rateLimiter.Execute(() =>
{
if (console.Cancelled)
return;
var exchange = options.PublishToExchange ? message.Exchange : ""; var exchange = options.PublishToExchange ? message.Exchange : "";
var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue; var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue;
// ReSharper disable AccessToDisposedClosure // ReSharper disable AccessToDisposedClosure
channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); channel.BasicPublish(exchange, routingKey, message.Properties, message.Body);
messageCount++; messageCount++;
progress?.Report(messageCount); progress?.Report(messageCount);
// ReSharper restore AccessToDisposedClosure // ReSharper restore AccessToDisposedClosure
}); });
} }
} }
finally finally

View File

@ -82,6 +82,12 @@ Read messages from disk as previously exported and publish them to a queue.
-e, --exchange -e, --exchange
If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue. If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.
--skip <count>
Number of messages in the input to skip. Useful if a previous import was interrupted.
-n <count>, --maxcount <count>
Maximum number of messages to import. If not specified all messages are imported.
-s <method>, --serialization <method> -s <method>, --serialization <method>
The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information. The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.