From f887cd8b7899578295b684883beae42ca4f10f35 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 15 Sep 2021 20:07:03 +0200 Subject: [PATCH] Implemented skip and maxcount parameters for Tapeti.Cmd import --- Tapeti.Cmd/Verbs/ImportVerb.cs | 38 +++++++++++++++++++++------------- docs/tapeticmd.rst | 6 ++++++ 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/Tapeti.Cmd/Verbs/ImportVerb.cs b/Tapeti.Cmd/Verbs/ImportVerb.cs index 87fa434..441d812 100644 --- a/Tapeti.Cmd/Verbs/ImportVerb.cs +++ b/Tapeti.Cmd/Verbs/ImportVerb.cs @@ -28,6 +28,12 @@ namespace Tapeti.Cmd.Verbs [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] public bool PublishToExchange { get; set; } + + [Option("skip", HelpText = "(Default: 0) Number of messages in the input to skip. Useful if a previous import was interrupted.", Default = 0)] + public int Skip { get; set; } + + [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to import.")] + public int? MaxCount { get; set; } [Option("maxrate", HelpText = "The maximum amount of messages per second to import.")] public int? MaxRate { get; set; } @@ -61,6 +67,7 @@ namespace Tapeti.Cmd.Verbs var totalCount = messageSerializer.GetMessageCount(); var messageCount = 0; + var skip = Math.Max(options.Skip, 0); ProgressBar progress = null; @@ -70,24 +77,27 @@ namespace Tapeti.Cmd.Verbs { foreach (var message in messageSerializer.Deserialize(channel)) { - if (console.Cancelled) + if (console.Cancelled || (options.MaxCount.HasValue && messageCount >= options.MaxCount.Value)) break; - rateLimiter.Execute(() => - { - if (console.Cancelled) - return; + if (skip > 0) + skip--; + else + rateLimiter.Execute(() => + { + if (console.Cancelled) + return; - var exchange = options.PublishToExchange ? message.Exchange : ""; - var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue; + var exchange = options.PublishToExchange ? message.Exchange : ""; + var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue; - // ReSharper disable AccessToDisposedClosure - channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); - messageCount++; - - progress?.Report(messageCount); - // ReSharper restore AccessToDisposedClosure - }); + // ReSharper disable AccessToDisposedClosure + channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); + messageCount++; + + progress?.Report(messageCount); + // ReSharper restore AccessToDisposedClosure + }); } } finally diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst index 5b9b876..adb2de5 100644 --- a/docs/tapeticmd.rst +++ b/docs/tapeticmd.rst @@ -82,6 +82,12 @@ Read messages from disk as previously exported and publish them to a queue. -e, --exchange If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue. +--skip + Number of messages in the input to skip. Useful if a previous import was interrupted. + +-n , --maxcount + Maximum number of messages to import. If not specified all messages are imported. + -s , --serialization The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.