diff --git a/Tapeti.Cmd/Verbs/ExportVerb.cs b/Tapeti.Cmd/Verbs/ExportVerb.cs index 9e62c9d..db7b023 100644 --- a/Tapeti.Cmd/Verbs/ExportVerb.cs +++ b/Tapeti.Cmd/Verbs/ExportVerb.cs @@ -23,6 +23,9 @@ namespace Tapeti.Cmd.Verbs [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")] public bool RemoveMessages { get; set; } + [Option("skip", HelpText = "(Default: 0) Number of messages in the queue to skip. Useful if a previous non-removing export was interrupted.", Default = 0)] + public int Skip { get; set; } + [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] public int? MaxCount { get; set; } } @@ -52,6 +55,11 @@ namespace Tapeti.Cmd.Verbs using var channel = connection.CreateModel(); var totalCount = (int)channel.MessageCount(options.QueueName); + + var skip = Math.Max(options.Skip, 0); + if (skip > 0) + totalCount -= Math.Min(skip, totalCount); + if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) totalCount = options.MaxCount.Value; @@ -67,24 +75,28 @@ namespace Tapeti.Cmd.Verbs // No more messages on the queue break; - messageCount++; - - messageSerializer.Serialize(new Message + if (skip > 0) + skip--; + else { - DeliveryTag = result.DeliveryTag, - Redelivered = result.Redelivered, - Exchange = result.Exchange, - RoutingKey = result.RoutingKey, - Queue = options.QueueName, - Properties = result.BasicProperties, - Body = result.Body.ToArray() - }); + messageCount++; - if (options.RemoveMessages) - channel.BasicAck(result.DeliveryTag, false); + messageSerializer.Serialize(new Message + { + DeliveryTag = result.DeliveryTag, + Redelivered = result.Redelivered, + Exchange = result.Exchange, + RoutingKey = result.RoutingKey, + Queue = options.QueueName, + Properties = result.BasicProperties, + Body = result.Body.ToArray() + }); + if (options.RemoveMessages) + channel.BasicAck(result.DeliveryTag, false); - progressBar.Report(messageCount); + progressBar.Report(messageCount); + } } } diff --git a/Tapeti.Cmd/Verbs/ShovelVerb.cs b/Tapeti.Cmd/Verbs/ShovelVerb.cs index 891abe8..18e4c4e 100644 --- a/Tapeti.Cmd/Verbs/ShovelVerb.cs +++ b/Tapeti.Cmd/Verbs/ShovelVerb.cs @@ -37,6 +37,9 @@ namespace Tapeti.Cmd.Verbs [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] public string TargetPassword { get; set; } + [Option("skip", HelpText = "(Default: 0) Number of messages in the queue to skip. Useful if a previous non-removing shovel was interrupted.", Default = 0)] + public int Skip { get; set; } + [Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")] public int? MaxRate { get; set; } @@ -93,6 +96,11 @@ namespace Tapeti.Cmd.Verbs var targetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName; var totalCount = (int)sourceChannel.MessageCount(options.QueueName); + + var skip = Math.Max(options.Skip, 0); + if (skip > 0) + totalCount -= Math.Min(skip, totalCount); + if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) totalCount = options.MaxCount.Value; @@ -101,6 +109,18 @@ namespace Tapeti.Cmd.Verbs using (var progressBar = new ProgressBar(console, totalCount)) { + // Perform the skips outside of the rate limiter + while (skip > 0 && !console.Cancelled) + { + var result = sourceChannel.BasicGet(options.QueueName, false); + if (result == null) + // No more messages on the queue + return; + + skip--; + } + + var hasMessage = true; while (!console.Cancelled && hasMessage && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value)) diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst index adb2de5..b271c4b 100644 --- a/docs/tapeticmd.rst +++ b/docs/tapeticmd.rst @@ -51,6 +51,9 @@ Fetches messages from a queue and writes them to disk. -r, --remove If specified messages are acknowledged and removed from the queue. If not messages are kept. +--skip + Number of messages in the input to skip. Useful if a previous non-removing export was interrupted. + -n , --maxcount Maximum number of messages to retrieve from the queue. If not specified all messages are exported. @@ -124,6 +127,9 @@ Reads messages from a queue and publishes them to another queue, optionally to a -r, --remove If specified messages are acknowledged and removed from the queue. If not messages are kept. +--skip + Number of messages in the input to skip. Useful if a previous non-removing shovel was interrupted. + -n , --maxcount Maximum number of messages to retrieve from the queue. If not specified all messages are exported.