Added progress indicators to Tapeti.Cmd

Refactored Tapeti.Cmd internals
This commit is contained in:
Mark van Renswoude 2021-09-04 11:33:59 +02:00
parent be576a2409
commit 0bed6a8f92
22 changed files with 1111 additions and 728 deletions

View File

@ -0,0 +1,103 @@
using System;
using System.Text;
namespace Tapeti.Cmd.ASCII
{
public class ProgressBar : IDisposable, IProgress<int>
{
private static readonly TimeSpan UpdateInterval = TimeSpan.FromMilliseconds(20);
private readonly int max;
private readonly int width;
private readonly bool showPosition;
private int position;
private readonly bool enabled;
private DateTime lastUpdate = DateTime.MinValue;
private int lastOutputLength;
public ProgressBar(int max, int width = 10, bool showPosition = true)
{
if (width <= 0)
throw new ArgumentOutOfRangeException(nameof(width), "Width must be greater than zero");
if (max <= 0)
throw new ArgumentOutOfRangeException(nameof(max), "Max must be greater than zero");
this.max = max;
this.width = width;
this.showPosition = showPosition;
enabled = !Console.IsOutputRedirected;
if (!enabled)
return;
Console.CursorVisible = false;
Redraw();
}
public void Dispose()
{
if (!enabled || lastOutputLength <= 0)
return;
Console.CursorLeft = 0;
Console.Write(new string(' ', lastOutputLength));
Console.CursorLeft = 0;
Console.CursorVisible = true;
}
public void Report(int value)
{
if (!enabled)
return;
value = Math.Max(0, Math.Min(max, value));
position = value;
var now = DateTime.Now;
if (now - lastUpdate < UpdateInterval)
return;
lastUpdate = now;
Redraw();
}
private void Redraw()
{
var output = new StringBuilder("[");
var blockCount = (int)Math.Truncate((decimal)position / max * width);
if (blockCount > 0)
output.Append(new string('#', blockCount));
if (blockCount < width)
output.Append(new string('.', width - blockCount));
output.Append("] ");
if (showPosition)
{
output
.Append(position.ToString("N0")).Append(" / ").Append(max.ToString("N0"))
.Append(" (").Append((int) Math.Truncate((decimal) position / max * 100)).Append("%)");
}
else
output.Append(" ").Append((int)Math.Truncate((decimal)position / max * 100)).Append("%");
var newLength = output.Length;
if (newLength < lastOutputLength)
output.Append(new string(' ', lastOutputLength - output.Length));
Console.CursorLeft = 0;
Console.Write(output);
lastOutputLength = newLength;
}
}
}

View File

@ -1,46 +0,0 @@
using RabbitMQ.Client;
using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd.Commands
{
public class ExportCommand
{
public IMessageSerializer MessageSerializer { get; set; }
public string QueueName { get; set; }
public bool RemoveMessages { get; set; }
public int? MaxCount { get; set; }
public int Execute(IModel channel)
{
var messageCount = 0;
while (!MaxCount.HasValue || messageCount < MaxCount.Value)
{
var result = channel.BasicGet(QueueName, false);
if (result == null)
// No more messages on the queue
break;
messageCount++;
MessageSerializer.Serialize(new Message
{
DeliveryTag = result.DeliveryTag,
Redelivered = result.Redelivered,
Exchange = result.Exchange,
RoutingKey = result.RoutingKey,
Queue = QueueName,
Properties = result.BasicProperties,
Body = result.Body.ToArray()
});
if (RemoveMessages)
channel.BasicAck(result.DeliveryTag, false);
}
return messageCount;
}
}
}

View File

@ -1,33 +0,0 @@
using RabbitMQ.Client;
using Tapeti.Cmd.RateLimiter;
using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd.Commands
{
public class ImportCommand
{
public IMessageSerializer MessageSerializer { get; set; }
public bool DirectToQueue { get; set; }
public int Execute(IModel channel, IRateLimiter rateLimiter)
{
var messageCount = 0;
foreach (var message in MessageSerializer.Deserialize(channel))
{
rateLimiter.Execute(() =>
{
var exchange = DirectToQueue ? "" : message.Exchange;
var routingKey = DirectToQueue ? message.Queue : message.RoutingKey;
channel.BasicPublish(exchange, routingKey, message.Properties, message.Body);
messageCount++;
});
}
return messageCount;
}
}
}

View File

@ -1,43 +0,0 @@
using RabbitMQ.Client;
using Tapeti.Cmd.RateLimiter;
namespace Tapeti.Cmd.Commands
{
public class ShovelCommand
{
public string QueueName { get; set; }
public string TargetQueueName { get; set; }
public bool RemoveMessages { get; set; }
public int? MaxCount { get; set; }
public int Execute(IModel sourceChannel, IModel targetChannel, IRateLimiter rateLimiter)
{
var messageCount = 0;
while (!MaxCount.HasValue || messageCount < MaxCount.Value)
{
var result = sourceChannel.BasicGet(QueueName, false);
if (result == null)
// No more messages on the queue
break;
// Since RabbitMQ client 6 we need to copy the body before calling another channel method
// like BasicPublish, or the published body will be corrupted
var bodyCopy = result.Body.ToArray();
rateLimiter.Execute(() =>
{
targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, bodyCopy);
messageCount++;
if (RemoveMessages)
sourceChannel.BasicAck(result.DeliveryTag, false);
});
}
return messageCount;
}
}
}

View File

@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace Tapeti.Cmd.Parser
{
public static class BindingParser
{
public static Tuple<string, string>[] Parse(IEnumerable<string> bindings)
{
return bindings
.Select(b =>
{
var parts = b.Split(':');
if (parts.Length != 2)
throw new InvalidOperationException($"Invalid binding format: {b}");
return new Tuple<string, string>(parts[0], parts[1]);
})
.ToArray();
}
}
}

View File

@ -1,602 +1,57 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Reflection;
using CommandLine;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using Tapeti.Cmd.Commands;
using Tapeti.Cmd.Mock;
using Tapeti.Cmd.RateLimiter;
using Tapeti.Cmd.Serialization;
using Tapeti.Cmd.Verbs;
namespace Tapeti.Cmd
{
public class Program
{
public class CommonOptions
{
[Option('h', "host", HelpText = "Hostname of the RabbitMQ server.", Default = "localhost")]
public string Host { get; set; }
[Option("port", HelpText = "AMQP port of the RabbitMQ server.", Default = 5672)]
public int Port { get; set; }
[Option('v', "virtualhost", HelpText = "Virtual host used for the RabbitMQ connection.", Default = "/")]
public string VirtualHost { get; set; }
[Option('u', "username", HelpText = "Username used to connect to the RabbitMQ server.", Default = "guest")]
public string Username { get; set; }
[Option('p', "password", HelpText = "Password used to connect to the RabbitMQ server.", Default = "guest")]
public string Password { get; set; }
}
public enum SerializationMethod
{
SingleFileJSON,
EasyNetQHosepipe
}
public class MessageSerializerOptions : CommonOptions
{
[Option('s', "serialization", HelpText = "The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe.", Default = SerializationMethod.SingleFileJSON)]
public SerializationMethod SerializationMethod { get; set; }
}
[Verb("export", HelpText = "Fetch messages from a queue and write it to disk.")]
public class ExportOptions : MessageSerializerOptions
{
[Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")]
public string QueueName { get; set; }
[Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")]
public string OutputPath { get; set; }
[Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")]
public bool RemoveMessages { get; set; }
[Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")]
public int? MaxCount { get; set; }
}
[Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")]
public class ImportOptions : MessageSerializerOptions
{
[Option('i', "input", Group = "Input", HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")]
public string InputFile { get; set; }
[Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public string InputMessage { get; set; }
[Option('c', "pipe", Group = "Input", HelpText = "Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public bool InputPipe { get; set; }
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
public bool PublishToExchange { get; set; }
[Option("maxrate", HelpText = "The maximum amount of messages per second to import.")]
public int? MaxRate { get; set; }
}
[Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")]
public class ExampleOptions
{
}
[Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")]
public class ShovelOptions : CommonOptions
{
[Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")]
public string QueueName { get; set; }
[Option('t', "targetqueue", HelpText = "The target queue to publish the messages to. Defaults to the source queue if a different target host, port or virtualhost is specified. Otherwise it must be different from the source queue.")]
public string TargetQueueName { get; set; }
[Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the source queue. If not messages are kept.")]
public bool RemoveMessages { get; set; }
[Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")]
public int? MaxCount { get; set; }
[Option("targethost", HelpText = "Hostname of the target RabbitMQ server. Defaults to the source host. Note that you may still specify a different targetusername for example.")]
public string TargetHost { get; set; }
[Option("targetport", HelpText = "AMQP port of the target RabbitMQ server. Defaults to the source port.")]
public int? TargetPort { get; set; }
[Option("targetvirtualhost", HelpText = "Virtual host used for the target RabbitMQ connection. Defaults to the source virtualhost.")]
public string TargetVirtualHost { get; set; }
[Option("targetusername", HelpText = "Username used to connect to the target RabbitMQ server. Defaults to the source username.")]
public string TargetUsername { get; set; }
[Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")]
public string TargetPassword { get; set; }
[Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")]
public int? MaxRate { get; set; }
}
[Verb("purge", HelpText = "Removes all messages from a queue destructively.")]
public class PurgeOptions : CommonOptions
{
[Option('q', "queue", Required = true, HelpText = "The queue to purge.")]
public string QueueName { get; set; }
[Option("confirm", HelpText = "Confirms the purging of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)]
public bool Confirm { get; set; }
}
[Verb("declarequeue", HelpText = "Declares a durable queue without arguments, compatible with Tapeti.")]
public class DeclareQueueOptions : CommonOptions
{
[Option('q', "queue", Required = true, HelpText = "The name of the queue to declare.")]
public string QueueName { get; set; }
[Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: <exchange>:<routingKey>")]
public IEnumerable<string> Bindings { get; set; }
}
[Verb("removequeue", HelpText = "Removes a durable queue.")]
public class RemoveQueueOptions : CommonOptions
{
[Option('q', "queue", Required = true, HelpText = "The name of the queue to remove.")]
public string QueueName { get; set; }
[Option("confirm", HelpText = "Confirms the removal of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)]
public bool Confirm { get; set; }
[Option("confirmpurge", HelpText = "Confirms the removal of the specified queue even if there still are messages in the queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)]
public bool ConfirmPurge { get; set; }
}
[Verb("bindqueue", HelpText = "Add a binding to a queue.")]
public class BindQueueOptions : CommonOptions
{
[Option('q', "queue", Required = true, HelpText = "The name of the queue to add the binding(s) to.")]
public string QueueName { get; set; }
[Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: <exchange>:<routingKey>")]
public IEnumerable<string> Bindings { get; set; }
}
[Verb("unbindqueue", HelpText = "Remove a binding from a queue.")]
public class UnbindQueueOptions : CommonOptions
{
[Option('q', "queue", Required = true, HelpText = "The name of the queue to remove the binding(s) from.")]
public string QueueName { get; set; }
[Option('b', "bindings", Required = false, HelpText = "One or more bindings to remove from the queue. Format: <exchange>:<routingKey>")]
public IEnumerable<string> Bindings { get; set; }
}
public static int Main(string[] args)
{
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions, PurgeOptions, ExampleOptions,
DeclareQueueOptions, RemoveQueueOptions, BindQueueOptions, UnbindQueueOptions>(args)
.MapResult(
(ExportOptions o) => ExecuteVerb(o, RunExport),
(ImportOptions o) => ExecuteVerb(o, RunImport),
(ExampleOptions o) => ExecuteVerb(o, RunExample),
(ShovelOptions o) => ExecuteVerb(o, RunShovel),
(PurgeOptions o) => ExecuteVerb(o, RunPurge),
(DeclareQueueOptions o) => ExecuteVerb(o, RunDeclareQueue),
(RemoveQueueOptions o) => ExecuteVerb(o, RunRemoveQueue),
(BindQueueOptions o) => ExecuteVerb(o, RunBindQueue),
(UnbindQueueOptions o) => ExecuteVerb(o, RunUnbindQueue),
errs =>
{
if (!Debugger.IsAttached)
return 1;
Console.WriteLine("Press any Enter key to continue...");
Console.ReadLine();
return 1;
}
);
}
private static int ExecuteVerb<T>(T options, Action<T> execute) where T : class
{
try
{
execute(options);
return 0;
}
catch (Exception e)
{
Console.WriteLine(e.Message);
return 1;
}
}
private static IConnection GetConnection(CommonOptions options)
{
var factory = new ConnectionFactory
{
HostName = options.Host,
Port = options.Port,
VirtualHost = options.VirtualHost,
UserName = options.Username,
Password = options.Password
};
return factory.CreateConnection();
}
private static IMessageSerializer GetMessageSerializer(ImportOptions options)
{
switch (options.SerializationMethod)
{
case SerializationMethod.SingleFileJSON:
return new SingleFileJSONMessageSerializer(GetInputStream(options, out var disposeStream), disposeStream, Encoding.UTF8);
case SerializationMethod.EasyNetQHosepipe:
if (string.IsNullOrEmpty(options.InputFile))
throw new ArgumentException("An input path must be provided when using EasyNetQHosepipe serialization");
return new EasyNetQMessageSerializer(options.InputFile);
default:
throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
}
}
private static Stream GetInputStream(ImportOptions options, out bool disposeStream)
{
if (options.InputPipe)
{
disposeStream = false;
return Console.OpenStandardInput();
}
if (!string.IsNullOrEmpty(options.InputMessage))
{
disposeStream = true;
return new MemoryStream(Encoding.UTF8.GetBytes(options.InputMessage));
}
disposeStream = true;
return new FileStream(options.InputFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
}
private static IMessageSerializer GetMessageSerializer(ExportOptions options)
{
switch (options.SerializationMethod)
{
case SerializationMethod.SingleFileJSON:
return new SingleFileJSONMessageSerializer(GetOutputStream(options, out var disposeStream), disposeStream, Encoding.UTF8);
case SerializationMethod.EasyNetQHosepipe:
if (string.IsNullOrEmpty(options.OutputPath))
throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization");
return new EasyNetQMessageSerializer(options.OutputPath);
default:
throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
}
}
private static Stream GetOutputStream(ExportOptions options, out bool disposeStream)
{
disposeStream = true;
return new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read);
}
private static IRateLimiter GetRateLimiter(int? maxRate)
{
if (!maxRate.HasValue || maxRate.Value <= 0)
return new NoRateLimiter();
return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1));
}
private static void RunExport(ExportOptions options)
{
int messageCount;
using (var messageSerializer = GetMessageSerializer(options))
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
messageCount = new ExportCommand
{
MessageSerializer = messageSerializer,
QueueName = options.QueueName,
RemoveMessages = options.RemoveMessages,
MaxCount = options.MaxCount
}.Execute(channel);
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported.");
}
private static void RunImport(ImportOptions options)
{
int messageCount;
using (var messageSerializer = GetMessageSerializer(options))
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
messageCount = new ImportCommand
{
MessageSerializer = messageSerializer,
DirectToQueue = !options.PublishToExchange
}.Execute(channel, GetRateLimiter(options.MaxRate));
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published.");
}
private static void RunExample(ExampleOptions options)
{
using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false)))
{
messageSerializer.Serialize(new Message
{
Exchange = "example",
Queue = "example.queue",
RoutingKey = "example.routing.key",
DeliveryTag = 42,
Properties = new MockBasicProperties
{
ContentType = "application/json",
DeliveryMode = 2,
Headers = new Dictionary<string, object>
{
{ "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") }
},
ReplyTo = "reply.queue",
Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds())
},
Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }")
});
}
}
private static void RunShovel(ShovelOptions options)
{
int messageCount;
using (var sourceConnection = GetConnection(options))
using (var sourceChannel = sourceConnection.CreateModel())
{
var shovelCommand = new ShovelCommand
{
QueueName = options.QueueName,
TargetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName,
RemoveMessages = options.RemoveMessages,
MaxCount = options.MaxCount
};
if (RequiresSecondConnection(options))
{
using (var targetConnection = GetTargetConnection(options))
using (var targetChannel = targetConnection.CreateModel())
{
messageCount = shovelCommand.Execute(sourceChannel, targetChannel, GetRateLimiter(options.MaxRate));
}
}
else
messageCount = shovelCommand.Execute(sourceChannel, sourceChannel, GetRateLimiter(options.MaxRate));
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled.");
}
private static bool RequiresSecondConnection(ShovelOptions options)
{
if (!string.IsNullOrEmpty(options.TargetHost) && options.TargetHost != options.Host)
return true;
if (options.TargetPort.HasValue && options.TargetPort.Value != options.Port)
return true;
if (!string.IsNullOrEmpty(options.TargetVirtualHost) && options.TargetVirtualHost != options.VirtualHost)
return true;
// All relevant target host parameters are either omitted or the same. This means the queue must be different
// to prevent an infinite loop.
if (string.IsNullOrEmpty(options.TargetQueueName) || options.TargetQueueName == options.QueueName)
throw new ArgumentException("Target queue must be different from the source queue when shoveling within the same (virtual) host");
if (!string.IsNullOrEmpty(options.TargetUsername) && options.TargetUsername != options.Username)
return true;
// ReSharper disable once ConvertIfStatementToReturnStatement
if (!string.IsNullOrEmpty(options.TargetPassword) && options.TargetPassword != options.Password)
return true;
// Everything's the same, we can use the same channel
return false;
}
private static IConnection GetTargetConnection(ShovelOptions options)
{
var factory = new ConnectionFactory
{
HostName = !string.IsNullOrEmpty(options.TargetHost) ? options.TargetHost : options.Host,
Port = options.TargetPort ?? options.Port,
VirtualHost = !string.IsNullOrEmpty(options.TargetVirtualHost) ? options.TargetVirtualHost : options.VirtualHost,
UserName = !string.IsNullOrEmpty(options.TargetUsername) ? options.TargetUsername : options.Username,
Password = !string.IsNullOrEmpty(options.TargetPassword) ? options.TargetPassword : options.Password
};
return factory.CreateConnection();
}
private static void RunPurge(PurgeOptions options)
{
if (!options.Confirm)
{
Console.Write($"Do you want to purge the queue '{options.QueueName}'? (Y/N) ");
var answer = Console.ReadLine();
if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase))
return;
}
uint messageCount;
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
messageCount = channel.QueuePurge(options.QueueName);
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'.");
}
private static void RunDeclareQueue(DeclareQueueOptions options)
{
// Parse early to fail early
var bindings = ParseBindings(options.Bindings);
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(options.QueueName, true, false, false);
foreach (var (exchange, routingKey) in bindings)
channel.QueueBind(options.QueueName, exchange, routingKey);
}
Console.WriteLine($"Queue {options.QueueName} declared with {bindings.Length} binding{(bindings.Length != 1 ? "s" : "")}.");
}
private static void RunRemoveQueue(RemoveQueueOptions options)
{
if (!options.Confirm)
{
Console.Write($"Do you want to remove the queue '{options.QueueName}'? (Y/N) ");
var answer = Console.ReadLine();
if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase))
return;
}
uint messageCount;
try
{
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
messageCount = channel.QueueDelete(options.QueueName, true, true);
}
}
catch (OperationInterruptedException e)
{
if (e.ShutdownReason.ReplyCode == 406)
{
if (!options.ConfirmPurge)
{
Console.Write($"There are messages remaining. Do you want to purge the queue '{options.QueueName}'? (Y/N) ");
var answer = Console.ReadLine();
if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase))
return;
}
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
messageCount = channel.QueueDelete(options.QueueName, true, false);
}
}
else
throw;
}
Console.WriteLine(messageCount == 0
? $"Empty or non-existent queue '{options.QueueName}' removed."
: $"{messageCount} message{(messageCount != 1 ? "s" : "")} purged while removing '{options.QueueName}'.");
}
private static void RunBindQueue(BindQueueOptions options)
{
var bindings = ParseBindings(options.Bindings);
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
foreach (var (exchange, routingKey) in bindings)
channel.QueueBind(options.QueueName, exchange, routingKey);
}
Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} added to queue {options.QueueName}.");
}
private static void RunUnbindQueue(UnbindQueueOptions options)
{
var bindings = ParseBindings(options.Bindings);
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
foreach (var (exchange, routingKey) in bindings)
channel.QueueUnbind(options.QueueName, exchange, routingKey);
}
Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} removed from queue {options.QueueName}.");
}
private static Tuple<string, string>[] ParseBindings(IEnumerable<string> bindings)
{
return bindings
.Select(b =>
{
var parts = b.Split(':');
if (parts.Length != 2)
throw new InvalidOperationException($"Invalid binding format: {b}");
return new Tuple<string, string>(parts[0], parts[1]);
})
var exitCode = 1;
var verbTypes = Assembly.GetExecutingAssembly().GetTypes()
.Where(t => t.GetCustomAttribute<ExecutableVerbAttribute>() != null)
.ToArray();
CommandLine.Parser.Default.ParseArguments(args, verbTypes.ToArray())
.WithParsed(o =>
{
try
{
var executableVerbAttribute = o.GetType().GetCustomAttribute<ExecutableVerbAttribute>();
var executer = Activator.CreateInstance(executableVerbAttribute.VerbExecuter, o) as IVerbExecuter;
// Should have been validated by the ExecutableVerbAttribute
Debug.Assert(executer != null, nameof(executer) + " != null");
executer.Execute();
exitCode = 0;
}
catch (Exception e)
{
Console.WriteLine(e.Message);
DebugConfirmClose();
}
})
.WithNotParsed(_ =>
{
DebugConfirmClose();
});
return exitCode;
}
private static void DebugConfirmClose()
{
if (!Debugger.IsAttached)
return;
Console.WriteLine("Press any Enter key to continue...");
Console.ReadLine();
}
}
}

View File

@ -11,22 +11,23 @@ namespace Tapeti.Cmd.Serialization
{
public class EasyNetQMessageSerializer : IMessageSerializer
{
private static readonly Regex InvalidCharRegex = new Regex(@"[\\\/:\*\?\""\<\>|]", RegexOptions.Compiled);
private static readonly Regex InvalidCharRegex = new(@"[\\\/:\*\?\""\<\>|]", RegexOptions.Compiled);
private readonly string path;
private readonly Lazy<string> writablePath;
private int messageCount;
private readonly Lazy<string[]> files;
public EasyNetQMessageSerializer(string path)
{
this.path = path;
writablePath = new Lazy<string>(() =>
{
Directory.CreateDirectory(path);
return path;
});
files = new Lazy<string[]>(() => Directory.GetFiles(path, "*.*.message.txt"));
}
@ -60,9 +61,15 @@ namespace Tapeti.Cmd.Serialization
}
public int GetMessageCount()
{
return files.Value.Length;
}
public IEnumerable<Message> Deserialize(IModel channel)
{
foreach (var file in Directory.GetFiles(path, "*.*.message.txt"))
foreach (var file in files.Value)
{
const string messageTag = ".message.";
@ -303,7 +310,7 @@ namespace Tapeti.Cmd.Serialization
public Message ToMessage()
{
return new Message
return new()
{
//ConsumerTag =
DeliveryTag = DeliverTag,

View File

@ -19,6 +19,8 @@ namespace Tapeti.Cmd.Serialization
public interface IMessageSerializer : IDisposable
{
void Serialize(Message message);
int GetMessageCount();
IEnumerable<Message> Deserialize(IModel channel);
}
}

View File

@ -14,8 +14,11 @@ namespace Tapeti.Cmd.Serialization
private readonly bool disposeStream;
private readonly Encoding encoding;
// StreamReader.DefaultBufferSize is private :-/
private const int DefaultBufferSize = 1024;
private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings
private static readonly JsonSerializerSettings SerializerSettings = new()
{
NullValueHandling = NullValueHandling.Ignore
};
@ -40,23 +43,48 @@ namespace Tapeti.Cmd.Serialization
exportFile.Value.WriteLine(serialized);
}
public int GetMessageCount()
{
if (!stream.CanSeek)
return 0;
var position = stream.Position;
try
{
var lineCount = 0;
using var reader = new StreamReader(stream, encoding, true, DefaultBufferSize, true);
while (!reader.EndOfStream)
{
if (!string.IsNullOrEmpty(reader.ReadLine()))
lineCount++;
}
return lineCount;
}
finally
{
stream.Position = position;
}
}
public IEnumerable<Message> Deserialize(IModel channel)
{
using (var reader = new StreamReader(stream, encoding))
using var reader = new StreamReader(stream, encoding, true, DefaultBufferSize, true);
while (!reader.EndOfStream)
{
while (!reader.EndOfStream)
{
var serialized = reader.ReadLine();
if (string.IsNullOrEmpty(serialized))
continue;
var serialized = reader.ReadLine();
if (string.IsNullOrEmpty(serialized))
continue;
var serializableMessage = JsonConvert.DeserializeObject<SerializableMessage>(serialized);
if (serializableMessage == null)
continue;
var serializableMessage = JsonConvert.DeserializeObject<SerializableMessage>(serialized);
if (serializableMessage == null)
continue;
yield return serializableMessage.ToMessage(channel);
}
yield return serializableMessage.ToMessage(channel);
}
}
@ -135,7 +163,7 @@ namespace Tapeti.Cmd.Serialization
public Message ToMessage(IModel channel)
{
return new Message
return new()
{
DeliveryTag = DeliveryTag,
Redelivered = Redelivered,

View File

@ -12,6 +12,7 @@
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<Version>2.0.0</Version>
<Product>Tapeti Command-line Utility</Product>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>

View File

@ -0,0 +1,22 @@
using CommandLine;
namespace Tapeti.Cmd.Verbs
{
public class BaseConnectionOptions
{
[Option('h', "host", HelpText = "Hostname of the RabbitMQ server.", Default = "localhost")]
public string Host { get; set; }
[Option("port", HelpText = "AMQP port of the RabbitMQ server.", Default = 5672)]
public int Port { get; set; }
[Option('v', "virtualhost", HelpText = "Virtual host used for the RabbitMQ connection.", Default = "/")]
public string VirtualHost { get; set; }
[Option('u', "username", HelpText = "Username used to connect to the RabbitMQ server.", Default = "guest")]
public string Username { get; set; }
[Option('p', "password", HelpText = "Password used to connect to the RabbitMQ server.", Default = "guest")]
public string Password { get; set; }
}
}

View File

@ -0,0 +1,17 @@
using CommandLine;
namespace Tapeti.Cmd.Verbs
{
public enum SerializationMethod
{
SingleFileJSON,
EasyNetQHosepipe
}
public class BaseMessageSerializerOptions : BaseConnectionOptions
{
[Option('s', "serialization", HelpText = "The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe.", Default = SerializationMethod.SingleFileJSON)]
public SerializationMethod SerializationMethod { get; set; }
}
}

View File

@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using CommandLine;
using RabbitMQ.Client;
using Tapeti.Cmd.Parser;
namespace Tapeti.Cmd.Verbs
{
[Verb("bindqueue", HelpText = "Add a binding to a queue.")]
[ExecutableVerb(typeof(BindQueueVerb))]
public class BindQueueOptions : BaseConnectionOptions
{
[Option('q', "queue", Required = true, HelpText = "The name of the queue to add the binding(s) to.")]
public string QueueName { get; set; }
[Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: <exchange>:<routingKey>")]
public IEnumerable<string> Bindings { get; set; }
}
public class BindQueueVerb : IVerbExecuter
{
private readonly BindQueueOptions options;
public BindQueueVerb(BindQueueOptions options)
{
this.options = options;
}
public void Execute()
{
var bindings = BindingParser.Parse(options.Bindings);
var factory = new ConnectionFactory
{
HostName = options.Host,
Port = options.Port,
VirtualHost = options.VirtualHost,
UserName = options.Username,
Password = options.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
foreach (var (exchange, routingKey) in bindings)
channel.QueueBind(options.QueueName, exchange, routingKey);
Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} added to queue {options.QueueName}.");
}
}
}

View File

@ -0,0 +1,57 @@
using System;
using System.Collections.Generic;
using CommandLine;
using RabbitMQ.Client;
using Tapeti.Cmd.Parser;
namespace Tapeti.Cmd.Verbs
{
[Verb("declarequeue", HelpText = "Declares a durable queue without arguments, compatible with Tapeti.")]
[ExecutableVerb(typeof(DeclareQueueVerb))]
public class DeclareQueueOptions : BaseConnectionOptions
{
[Option('q', "queue", Required = true, HelpText = "The name of the queue to declare.")]
public string QueueName { get; set; }
[Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: <exchange>:<routingKey>")]
public IEnumerable<string> Bindings { get; set; }
}
public class DeclareQueueVerb : IVerbExecuter
{
private readonly DeclareQueueOptions options;
public DeclareQueueVerb(DeclareQueueOptions options)
{
this.options = options;
}
public void Execute()
{
// Parse early to fail early
var bindings = BindingParser.Parse(options.Bindings);
var factory = new ConnectionFactory
{
HostName = options.Host,
Port = options.Port,
VirtualHost = options.VirtualHost,
UserName = options.Username,
Password = options.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(options.QueueName, true, false, false);
foreach (var (exchange, routingKey) in bindings)
channel.QueueBind(options.QueueName, exchange, routingKey);
Console.WriteLine($"Queue {options.QueueName} declared with {bindings.Length} binding{(bindings.Length != 1 ? "s" : "")}.");
}
}
}

View File

@ -0,0 +1,53 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using CommandLine;
using RabbitMQ.Client;
using Tapeti.Cmd.Mock;
using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd.Verbs
{
[Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")]
[ExecutableVerb(typeof(ExampleVerb))]
public class ExampleOptions
{
}
public class ExampleVerb : IVerbExecuter
{
public ExampleVerb(ExampleOptions options)
{
// Prevent compiler warnings, the parameter is expected by the Activator
Debug.Assert(options != null);
}
public void Execute()
{
using var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false));
messageSerializer.Serialize(new Message
{
Exchange = "example",
Queue = "example.queue",
RoutingKey = "example.routing.key",
DeliveryTag = 42,
Properties = new MockBasicProperties
{
ContentType = "application/json",
DeliveryMode = 2,
Headers = new Dictionary<string, object>
{
{ "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") }
},
ReplyTo = "reply.queue",
Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds())
},
Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }")
});
}
}
}

View File

@ -0,0 +1,30 @@
using System;
namespace Tapeti.Cmd.Verbs
{
/// <remarks>
/// Implementations are expected to have a constructor which accepts the options class
/// associated with the ExecutableVerb attribute.
/// </remarks>
public interface IVerbExecuter
{
void Execute();
}
[AttributeUsage(AttributeTargets.Class)]
public class ExecutableVerbAttribute : Attribute
{
public Type VerbExecuter { get; }
public ExecutableVerbAttribute(Type verbExecuter)
{
if (!typeof(IVerbExecuter).IsAssignableFrom(verbExecuter))
throw new InvalidCastException("Type must support IVerbExecuter");
VerbExecuter = verbExecuter;
}
}
}

View File

@ -0,0 +1,121 @@
using System;
using System.IO;
using System.Text;
using CommandLine;
using RabbitMQ.Client;
using Tapeti.Cmd.ASCII;
using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd.Verbs
{
[Verb("export", HelpText = "Fetch messages from a queue and write it to disk.")]
[ExecutableVerb(typeof(ExportVerb))]
public class ExportOptions : BaseMessageSerializerOptions
{
[Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")]
public string QueueName { get; set; }
[Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")]
public string OutputPath { get; set; }
[Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")]
public bool RemoveMessages { get; set; }
[Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")]
public int? MaxCount { get; set; }
}
public class ExportVerb : IVerbExecuter
{
private readonly ExportOptions options;
public ExportVerb(ExportOptions options)
{
this.options = options;
}
public void Execute()
{
var factory = new ConnectionFactory
{
HostName = options.Host,
Port = options.Port,
VirtualHost = options.VirtualHost,
UserName = options.Username,
Password = options.Password
};
using var messageSerializer = GetMessageSerializer(options);
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var totalCount = (int)channel.MessageCount(options.QueueName);
if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount)
totalCount = options.MaxCount.Value;
Console.WriteLine($"Exporting {totalCount} message{(totalCount != 1 ? "s" : "")} (actual number may differ if queue has active consumers or publishers)");
var messageCount = 0;
var cancelled = false;
Console.CancelKeyPress += (_, args) =>
{
args.Cancel = true;
cancelled = true;
};
using (var progressBar = new ProgressBar(totalCount))
{
while (!cancelled && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value))
{
var result = channel.BasicGet(options.QueueName, false);
if (result == null)
// No more messages on the queue
break;
messageCount++;
messageSerializer.Serialize(new Message
{
DeliveryTag = result.DeliveryTag,
Redelivered = result.Redelivered,
Exchange = result.Exchange,
RoutingKey = result.RoutingKey,
Queue = options.QueueName,
Properties = result.BasicProperties,
Body = result.Body.ToArray()
});
if (options.RemoveMessages)
channel.BasicAck(result.DeliveryTag, false);
progressBar.Report(messageCount);
}
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported.");
}
private static IMessageSerializer GetMessageSerializer(ExportOptions options)
{
switch (options.SerializationMethod)
{
case SerializationMethod.SingleFileJSON:
return new SingleFileJSONMessageSerializer(new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read), true, Encoding.UTF8);
case SerializationMethod.EasyNetQHosepipe:
if (string.IsNullOrEmpty(options.OutputPath))
throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization");
return new EasyNetQMessageSerializer(options.OutputPath);
default:
throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
}
}
}
}

View File

@ -0,0 +1,149 @@
using System;
using System.IO;
using System.Text;
using CommandLine;
using RabbitMQ.Client;
using Tapeti.Cmd.ASCII;
using Tapeti.Cmd.RateLimiter;
using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd.Verbs
{
[Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")]
[ExecutableVerb(typeof(ImportVerb))]
public class ImportOptions : BaseMessageSerializerOptions
{
[Option('i', "input", Group = "Input", HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")]
public string InputFile { get; set; }
[Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public string InputMessage { get; set; }
[Option('c', "pipe", Group = "Input", HelpText = "Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public bool InputPipe { get; set; }
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
public bool PublishToExchange { get; set; }
[Option("maxrate", HelpText = "The maximum amount of messages per second to import.")]
public int? MaxRate { get; set; }
}
public class ImportVerb : IVerbExecuter
{
private readonly ImportOptions options;
public ImportVerb(ImportOptions options)
{
this.options = options;
}
public void Execute()
{
var factory = new ConnectionFactory
{
HostName = options.Host,
Port = options.Port,
VirtualHost = options.VirtualHost,
UserName = options.Username,
Password = options.Password
};
using var messageSerializer = GetMessageSerializer(options);
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var rateLimiter = GetRateLimiter(options.MaxRate);
var totalCount = messageSerializer.GetMessageCount();
var messageCount = 0;
var cancelled = false;
Console.CancelKeyPress += (_, args) =>
{
args.Cancel = true;
cancelled = true;
};
ProgressBar progress = null;
if (totalCount > 0)
progress = new ProgressBar(totalCount);
try
{
foreach (var message in messageSerializer.Deserialize(channel))
{
if (cancelled)
break;
rateLimiter.Execute(() =>
{
var exchange = options.PublishToExchange ? message.Exchange : "";
var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue;
// ReSharper disable AccessToDisposedClosure
channel.BasicPublish(exchange, routingKey, message.Properties, message.Body);
messageCount++;
progress?.Report(messageCount);
// ReSharper restore AccessToDisposedClosure
});
}
}
finally
{
progress?.Dispose();
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published.");
}
private static IMessageSerializer GetMessageSerializer(ImportOptions options)
{
switch (options.SerializationMethod)
{
case SerializationMethod.SingleFileJSON:
return new SingleFileJSONMessageSerializer(GetInputStream(options, out var disposeStream), disposeStream, Encoding.UTF8);
case SerializationMethod.EasyNetQHosepipe:
if (string.IsNullOrEmpty(options.InputFile))
throw new ArgumentException("An input path must be provided when using EasyNetQHosepipe serialization");
return new EasyNetQMessageSerializer(options.InputFile);
default:
throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
}
}
private static Stream GetInputStream(ImportOptions options, out bool disposeStream)
{
if (options.InputPipe)
{
disposeStream = false;
return Console.OpenStandardInput();
}
if (!string.IsNullOrEmpty(options.InputMessage))
{
disposeStream = true;
return new MemoryStream(Encoding.UTF8.GetBytes(options.InputMessage));
}
disposeStream = true;
return new FileStream(options.InputFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
}
private static IRateLimiter GetRateLimiter(int? maxRate)
{
if (!maxRate.HasValue || maxRate.Value <= 0)
return new NoRateLimiter();
return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1));
}
}
}

View File

@ -0,0 +1,58 @@
using System;
using CommandLine;
using RabbitMQ.Client;
namespace Tapeti.Cmd.Verbs
{
[Verb("purge", HelpText = "Removes all messages from a queue destructively.")]
[ExecutableVerb(typeof(PurgeVerb))]
public class PurgeOptions : BaseConnectionOptions
{
[Option('q', "queue", Required = true, HelpText = "The queue to purge.")]
public string QueueName { get; set; }
[Option("confirm", HelpText = "Confirms the purging of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)]
public bool Confirm { get; set; }
}
public class PurgeVerb : IVerbExecuter
{
private readonly PurgeOptions options;
public PurgeVerb(PurgeOptions options)
{
this.options = options;
}
public void Execute()
{
if (!options.Confirm)
{
Console.Write($"Do you want to purge the queue '{options.QueueName}'? (Y/N) ");
var answer = Console.ReadLine();
if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase))
return;
}
var factory = new ConnectionFactory
{
HostName = options.Host,
Port = options.Port,
VirtualHost = options.VirtualHost,
UserName = options.Username,
Password = options.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var messageCount = channel.QueuePurge(options.QueueName);
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'.");
}
}
}

View File

@ -0,0 +1,90 @@
using System;
using CommandLine;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
namespace Tapeti.Cmd.Verbs
{
[Verb("removequeue", HelpText = "Removes a durable queue.")]
[ExecutableVerb(typeof(RemoveQueueVerb))]
public class RemoveQueueOptions : BaseConnectionOptions
{
[Option('q', "queue", Required = true, HelpText = "The name of the queue to remove.")]
public string QueueName { get; set; }
[Option("confirm", HelpText = "Confirms the removal of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)]
public bool Confirm { get; set; }
[Option("confirmpurge", HelpText = "Confirms the removal of the specified queue even if there still are messages in the queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)]
public bool ConfirmPurge { get; set; }
}
public class RemoveQueueVerb : IVerbExecuter
{
private readonly RemoveQueueOptions options;
public RemoveQueueVerb(RemoveQueueOptions options)
{
this.options = options;
}
public void Execute()
{
if (!options.Confirm)
{
Console.Write($"Do you want to remove the queue '{options.QueueName}'? (Y/N) ");
var answer = Console.ReadLine();
if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase))
return;
}
var factory = new ConnectionFactory
{
HostName = options.Host,
Port = options.Port,
VirtualHost = options.VirtualHost,
UserName = options.Username,
Password = options.Password
};
uint messageCount;
try
{
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
messageCount = channel.QueueDelete(options.QueueName, true, true);
}
catch (OperationInterruptedException e)
{
if (e.ShutdownReason.ReplyCode == 406)
{
if (!options.ConfirmPurge)
{
Console.Write($"There are messages remaining. Do you want to purge the queue '{options.QueueName}'? (Y/N) ");
var answer = Console.ReadLine();
if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase))
return;
}
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
messageCount = channel.QueueDelete(options.QueueName, true, false);
}
else
throw;
}
Console.WriteLine(messageCount == 0
? $"Empty or non-existent queue '{options.QueueName}' removed."
: $"{messageCount} message{(messageCount != 1 ? "s" : "")} purged while removing '{options.QueueName}'.");
}
}
}

View File

@ -0,0 +1,181 @@
using System;
using CommandLine;
using RabbitMQ.Client;
using Tapeti.Cmd.ASCII;
using Tapeti.Cmd.RateLimiter;
namespace Tapeti.Cmd.Verbs
{
[Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")]
[ExecutableVerb(typeof(ShovelVerb))]
public class ShovelOptions : BaseConnectionOptions
{
[Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")]
public string QueueName { get; set; }
[Option('t', "targetqueue", HelpText = "The target queue to publish the messages to. Defaults to the source queue if a different target host, port or virtualhost is specified. Otherwise it must be different from the source queue.")]
public string TargetQueueName { get; set; }
[Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the source queue. If not messages are kept.")]
public bool RemoveMessages { get; set; }
[Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")]
public int? MaxCount { get; set; }
[Option("targethost", HelpText = "Hostname of the target RabbitMQ server. Defaults to the source host. Note that you may still specify a different targetusername for example.")]
public string TargetHost { get; set; }
[Option("targetport", HelpText = "AMQP port of the target RabbitMQ server. Defaults to the source port.")]
public int? TargetPort { get; set; }
[Option("targetvirtualhost", HelpText = "Virtual host used for the target RabbitMQ connection. Defaults to the source virtualhost.")]
public string TargetVirtualHost { get; set; }
[Option("targetusername", HelpText = "Username used to connect to the target RabbitMQ server. Defaults to the source username.")]
public string TargetUsername { get; set; }
[Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")]
public string TargetPassword { get; set; }
[Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")]
public int? MaxRate { get; set; }
}
public class ShovelVerb : IVerbExecuter
{
private readonly ShovelOptions options;
public ShovelVerb(ShovelOptions options)
{
this.options = options;
}
public void Execute()
{
var sourceFactory = new ConnectionFactory
{
HostName = options.Host,
Port = options.Port,
VirtualHost = options.VirtualHost,
UserName = options.Username,
Password = options.Password
};
using var sourceConnection = sourceFactory.CreateConnection();
using var sourceChannel = sourceConnection.CreateModel();
if (RequiresSecondConnection(options))
{
var targetFactory = new ConnectionFactory
{
HostName = !string.IsNullOrEmpty(options.TargetHost) ? options.TargetHost : options.Host,
Port = options.TargetPort ?? options.Port,
VirtualHost = !string.IsNullOrEmpty(options.TargetVirtualHost) ? options.TargetVirtualHost : options.VirtualHost,
UserName = !string.IsNullOrEmpty(options.TargetUsername) ? options.TargetUsername : options.Username,
Password = !string.IsNullOrEmpty(options.TargetPassword) ? options.TargetPassword : options.Password
};
using var targetConnection = targetFactory.CreateConnection();
using var targetChannel = targetConnection.CreateModel();
Shovel(options, sourceChannel, targetChannel);
}
else
Shovel(options, sourceChannel, sourceChannel);
}
private static void Shovel(ShovelOptions options, IModel sourceChannel, IModel targetChannel)
{
var rateLimiter = GetRateLimiter(options.MaxRate);
var targetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName;
var totalCount = (int)sourceChannel.MessageCount(options.QueueName);
if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount)
totalCount = options.MaxCount.Value;
Console.WriteLine($"Shoveling {totalCount} message{(totalCount != 1 ? "s" : "")} (actual number may differ if queue has active consumers or publishers)");
var messageCount = 0;
var cancelled = false;
Console.CancelKeyPress += (_, args) =>
{
args.Cancel = true;
cancelled = true;
};
using (var progressBar = new ProgressBar(totalCount))
{
while (!cancelled && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value))
{
var result = sourceChannel.BasicGet(options.QueueName, false);
if (result == null)
// No more messages on the queue
break;
// Since RabbitMQ client 6 we need to copy the body before calling another channel method
// like BasicPublish, or the published body will be corrupted if sourceChannel and targetChannel are the same
var bodyCopy = result.Body.ToArray();
rateLimiter.Execute(() =>
{
targetChannel.BasicPublish("", targetQueueName, result.BasicProperties, bodyCopy);
messageCount++;
if (options.RemoveMessages)
sourceChannel.BasicAck(result.DeliveryTag, false);
// ReSharper disable once AccessToDisposedClosure
progressBar.Report(messageCount);
});
}
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled.");
}
private static bool RequiresSecondConnection(ShovelOptions options)
{
if (!string.IsNullOrEmpty(options.TargetHost) && options.TargetHost != options.Host)
return true;
if (options.TargetPort.HasValue && options.TargetPort.Value != options.Port)
return true;
if (!string.IsNullOrEmpty(options.TargetVirtualHost) && options.TargetVirtualHost != options.VirtualHost)
return true;
// All relevant target host parameters are either omitted or the same. This means the queue must be different
// to prevent an infinite loop.
if (string.IsNullOrEmpty(options.TargetQueueName) || options.TargetQueueName == options.QueueName)
throw new ArgumentException("Target queue must be different from the source queue when shoveling within the same (virtual) host");
if (!string.IsNullOrEmpty(options.TargetUsername) && options.TargetUsername != options.Username)
return true;
// ReSharper disable once ConvertIfStatementToReturnStatement
if (!string.IsNullOrEmpty(options.TargetPassword) && options.TargetPassword != options.Password)
return true;
// Everything's the same, we can use the same channel
return false;
}
private static IRateLimiter GetRateLimiter(int? maxRate)
{
if (!maxRate.HasValue || maxRate.Value <= 0)
return new NoRateLimiter();
return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1));
}
}
}

View File

@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using CommandLine;
using RabbitMQ.Client;
using Tapeti.Cmd.Parser;
namespace Tapeti.Cmd.Verbs
{
[Verb("unbindqueue", HelpText = "Remove a binding from a queue.")]
[ExecutableVerb(typeof(UnbindQueueVerb))]
public class UnbindQueueOptions : BaseConnectionOptions
{
[Option('q', "queue", Required = true, HelpText = "The name of the queue to remove the binding(s) from.")]
public string QueueName { get; set; }
[Option('b', "bindings", Required = false, HelpText = "One or more bindings to remove from the queue. Format: <exchange>:<routingKey>")]
public IEnumerable<string> Bindings { get; set; }
}
public class UnbindQueueVerb : IVerbExecuter
{
private readonly UnbindQueueOptions options;
public UnbindQueueVerb(UnbindQueueOptions options)
{
this.options = options;
}
public void Execute()
{
var bindings = BindingParser.Parse(options.Bindings);
var factory = new ConnectionFactory
{
HostName = options.Host,
Port = options.Port,
VirtualHost = options.VirtualHost,
UserName = options.Username,
Password = options.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
foreach (var (exchange, routingKey) in bindings)
channel.QueueUnbind(options.QueueName, exchange, routingKey);
Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} removed from queue {options.QueueName}.");
}
}
}