Implemented skip parameter for export and shovel commands

This commit is contained in:
Mark van Renswoude 2021-09-15 20:16:15 +02:00
parent f887cd8b78
commit b3ea711c3b
3 changed files with 52 additions and 14 deletions

View File

@ -23,6 +23,9 @@ namespace Tapeti.Cmd.Verbs
[Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")] [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")]
public bool RemoveMessages { get; set; } public bool RemoveMessages { get; set; }
[Option("skip", HelpText = "(Default: 0) Number of messages in the queue to skip. Useful if a previous non-removing export was interrupted.", Default = 0)]
public int Skip { get; set; }
[Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")]
public int? MaxCount { get; set; } public int? MaxCount { get; set; }
} }
@ -52,6 +55,11 @@ namespace Tapeti.Cmd.Verbs
using var channel = connection.CreateModel(); using var channel = connection.CreateModel();
var totalCount = (int)channel.MessageCount(options.QueueName); var totalCount = (int)channel.MessageCount(options.QueueName);
var skip = Math.Max(options.Skip, 0);
if (skip > 0)
totalCount -= Math.Min(skip, totalCount);
if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount)
totalCount = options.MaxCount.Value; totalCount = options.MaxCount.Value;
@ -67,24 +75,28 @@ namespace Tapeti.Cmd.Verbs
// No more messages on the queue // No more messages on the queue
break; break;
messageCount++; if (skip > 0)
skip--;
messageSerializer.Serialize(new Message else
{ {
DeliveryTag = result.DeliveryTag, messageCount++;
Redelivered = result.Redelivered,
Exchange = result.Exchange,
RoutingKey = result.RoutingKey,
Queue = options.QueueName,
Properties = result.BasicProperties,
Body = result.Body.ToArray()
});
if (options.RemoveMessages) messageSerializer.Serialize(new Message
channel.BasicAck(result.DeliveryTag, false); {
DeliveryTag = result.DeliveryTag,
Redelivered = result.Redelivered,
Exchange = result.Exchange,
RoutingKey = result.RoutingKey,
Queue = options.QueueName,
Properties = result.BasicProperties,
Body = result.Body.ToArray()
});
if (options.RemoveMessages)
channel.BasicAck(result.DeliveryTag, false);
progressBar.Report(messageCount); progressBar.Report(messageCount);
}
} }
} }

View File

@ -37,6 +37,9 @@ namespace Tapeti.Cmd.Verbs
[Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")]
public string TargetPassword { get; set; } public string TargetPassword { get; set; }
[Option("skip", HelpText = "(Default: 0) Number of messages in the queue to skip. Useful if a previous non-removing shovel was interrupted.", Default = 0)]
public int Skip { get; set; }
[Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")] [Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")]
public int? MaxRate { get; set; } public int? MaxRate { get; set; }
@ -93,6 +96,11 @@ namespace Tapeti.Cmd.Verbs
var targetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName; var targetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName;
var totalCount = (int)sourceChannel.MessageCount(options.QueueName); var totalCount = (int)sourceChannel.MessageCount(options.QueueName);
var skip = Math.Max(options.Skip, 0);
if (skip > 0)
totalCount -= Math.Min(skip, totalCount);
if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount)
totalCount = options.MaxCount.Value; totalCount = options.MaxCount.Value;
@ -101,6 +109,18 @@ namespace Tapeti.Cmd.Verbs
using (var progressBar = new ProgressBar(console, totalCount)) using (var progressBar = new ProgressBar(console, totalCount))
{ {
// Perform the skips outside of the rate limiter
while (skip > 0 && !console.Cancelled)
{
var result = sourceChannel.BasicGet(options.QueueName, false);
if (result == null)
// No more messages on the queue
return;
skip--;
}
var hasMessage = true; var hasMessage = true;
while (!console.Cancelled && hasMessage && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value)) while (!console.Cancelled && hasMessage && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value))

View File

@ -51,6 +51,9 @@ Fetches messages from a queue and writes them to disk.
-r, --remove -r, --remove
If specified messages are acknowledged and removed from the queue. If not messages are kept. If specified messages are acknowledged and removed from the queue. If not messages are kept.
--skip <count>
Number of messages in the input to skip. Useful if a previous non-removing export was interrupted.
-n <count>, --maxcount <count> -n <count>, --maxcount <count>
Maximum number of messages to retrieve from the queue. If not specified all messages are exported. Maximum number of messages to retrieve from the queue. If not specified all messages are exported.
@ -124,6 +127,9 @@ Reads messages from a queue and publishes them to another queue, optionally to a
-r, --remove -r, --remove
If specified messages are acknowledged and removed from the queue. If not messages are kept. If specified messages are acknowledged and removed from the queue. If not messages are kept.
--skip <count>
Number of messages in the input to skip. Useful if a previous non-removing shovel was interrupted.
-n <count>, --maxcount <count> -n <count>, --maxcount <count>
Maximum number of messages to retrieve from the queue. If not specified all messages are exported. Maximum number of messages to retrieve from the queue. If not specified all messages are exported.