From 0bed6a8f92aaa130fead627522030e0aca9bdfca Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Sat, 4 Sep 2021 11:33:59 +0200 Subject: [PATCH] Added progress indicators to Tapeti.Cmd Refactored Tapeti.Cmd internals --- Tapeti.Cmd/ASCII/ProgressBar.cs | 103 +++ Tapeti.Cmd/Commands/ExportCommand.cs | 46 -- Tapeti.Cmd/Commands/ImportCommand.cs | 33 - Tapeti.Cmd/Commands/ShovelCommand.cs | 43 -- Tapeti.Cmd/Parser/BindingParser.cs | 23 + Tapeti.Cmd/Program.cs | 629 ++---------------- .../EasyNetQMessageSerializer.cs | 19 +- .../Serialization/IMessageSerializer.cs | 2 + .../SingleFileJSONMessageSerializer.cs | 54 +- Tapeti.Cmd/Tapeti.Cmd.csproj | 1 + Tapeti.Cmd/Verbs/BaseConnectionOptions.cs | 22 + .../Verbs/BaseMessageSerializerOptions.cs | 17 + Tapeti.Cmd/Verbs/BindQueueVerb.cs | 54 ++ Tapeti.Cmd/Verbs/DeclareQueueVerb.cs | 57 ++ Tapeti.Cmd/Verbs/ExampleVerb.cs | 53 ++ Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs | 30 + Tapeti.Cmd/Verbs/ExportVerb.cs | 121 ++++ Tapeti.Cmd/Verbs/ImportVerb.cs | 149 +++++ Tapeti.Cmd/Verbs/PurgeVerb.cs | 58 ++ Tapeti.Cmd/Verbs/RemoveQueueVerb.cs | 90 +++ Tapeti.Cmd/Verbs/ShovelVerb.cs | 181 +++++ Tapeti.Cmd/Verbs/UnbindQueueVerb.cs | 54 ++ 22 files changed, 1111 insertions(+), 728 deletions(-) create mode 100644 Tapeti.Cmd/ASCII/ProgressBar.cs delete mode 100644 Tapeti.Cmd/Commands/ExportCommand.cs delete mode 100644 Tapeti.Cmd/Commands/ImportCommand.cs delete mode 100644 Tapeti.Cmd/Commands/ShovelCommand.cs create mode 100644 Tapeti.Cmd/Parser/BindingParser.cs create mode 100644 Tapeti.Cmd/Verbs/BaseConnectionOptions.cs create mode 100644 Tapeti.Cmd/Verbs/BaseMessageSerializerOptions.cs create mode 100644 Tapeti.Cmd/Verbs/BindQueueVerb.cs create mode 100644 Tapeti.Cmd/Verbs/DeclareQueueVerb.cs create mode 100644 Tapeti.Cmd/Verbs/ExampleVerb.cs create mode 100644 Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs create mode 100644 Tapeti.Cmd/Verbs/ExportVerb.cs create mode 100644 Tapeti.Cmd/Verbs/ImportVerb.cs create mode 100644 Tapeti.Cmd/Verbs/PurgeVerb.cs create mode 100644 Tapeti.Cmd/Verbs/RemoveQueueVerb.cs create mode 100644 Tapeti.Cmd/Verbs/ShovelVerb.cs create mode 100644 Tapeti.Cmd/Verbs/UnbindQueueVerb.cs diff --git a/Tapeti.Cmd/ASCII/ProgressBar.cs b/Tapeti.Cmd/ASCII/ProgressBar.cs new file mode 100644 index 0000000..f357457 --- /dev/null +++ b/Tapeti.Cmd/ASCII/ProgressBar.cs @@ -0,0 +1,103 @@ +using System; +using System.Text; + +namespace Tapeti.Cmd.ASCII +{ + public class ProgressBar : IDisposable, IProgress + { + private static readonly TimeSpan UpdateInterval = TimeSpan.FromMilliseconds(20); + + private readonly int max; + private readonly int width; + private readonly bool showPosition; + private int position; + + private readonly bool enabled; + private DateTime lastUpdate = DateTime.MinValue; + private int lastOutputLength; + + + public ProgressBar(int max, int width = 10, bool showPosition = true) + { + if (width <= 0) + throw new ArgumentOutOfRangeException(nameof(width), "Width must be greater than zero"); + + if (max <= 0) + throw new ArgumentOutOfRangeException(nameof(max), "Max must be greater than zero"); + + this.max = max; + this.width = width; + this.showPosition = showPosition; + + enabled = !Console.IsOutputRedirected; + if (!enabled) + return; + + Console.CursorVisible = false; + Redraw(); + } + + + public void Dispose() + { + if (!enabled || lastOutputLength <= 0) + return; + + Console.CursorLeft = 0; + Console.Write(new string(' ', lastOutputLength)); + Console.CursorLeft = 0; + Console.CursorVisible = true; + } + + + public void Report(int value) + { + if (!enabled) + return; + + value = Math.Max(0, Math.Min(max, value)); + position = value; + + var now = DateTime.Now; + if (now - lastUpdate < UpdateInterval) + return; + + lastUpdate = now; + Redraw(); + } + + + private void Redraw() + { + var output = new StringBuilder("["); + + var blockCount = (int)Math.Truncate((decimal)position / max * width); + if (blockCount > 0) + output.Append(new string('#', blockCount)); + + if (blockCount < width) + output.Append(new string('.', width - blockCount)); + + output.Append("] "); + + if (showPosition) + { + output + .Append(position.ToString("N0")).Append(" / ").Append(max.ToString("N0")) + .Append(" (").Append((int) Math.Truncate((decimal) position / max * 100)).Append("%)"); + } + else + output.Append(" ").Append((int)Math.Truncate((decimal)position / max * 100)).Append("%"); + + + var newLength = output.Length; + if (newLength < lastOutputLength) + output.Append(new string(' ', lastOutputLength - output.Length)); + + Console.CursorLeft = 0; + Console.Write(output); + + lastOutputLength = newLength; + } + } +} diff --git a/Tapeti.Cmd/Commands/ExportCommand.cs b/Tapeti.Cmd/Commands/ExportCommand.cs deleted file mode 100644 index 013a9bb..0000000 --- a/Tapeti.Cmd/Commands/ExportCommand.cs +++ /dev/null @@ -1,46 +0,0 @@ -using RabbitMQ.Client; -using Tapeti.Cmd.Serialization; - -namespace Tapeti.Cmd.Commands -{ - public class ExportCommand - { - public IMessageSerializer MessageSerializer { get; set; } - - public string QueueName { get; set; } - public bool RemoveMessages { get; set; } - public int? MaxCount { get; set; } - - - public int Execute(IModel channel) - { - var messageCount = 0; - - while (!MaxCount.HasValue || messageCount < MaxCount.Value) - { - var result = channel.BasicGet(QueueName, false); - if (result == null) - // No more messages on the queue - break; - - messageCount++; - - MessageSerializer.Serialize(new Message - { - DeliveryTag = result.DeliveryTag, - Redelivered = result.Redelivered, - Exchange = result.Exchange, - RoutingKey = result.RoutingKey, - Queue = QueueName, - Properties = result.BasicProperties, - Body = result.Body.ToArray() - }); - - if (RemoveMessages) - channel.BasicAck(result.DeliveryTag, false); - } - - return messageCount; - } - } -} diff --git a/Tapeti.Cmd/Commands/ImportCommand.cs b/Tapeti.Cmd/Commands/ImportCommand.cs deleted file mode 100644 index ecb3eb1..0000000 --- a/Tapeti.Cmd/Commands/ImportCommand.cs +++ /dev/null @@ -1,33 +0,0 @@ -using RabbitMQ.Client; -using Tapeti.Cmd.RateLimiter; -using Tapeti.Cmd.Serialization; - -namespace Tapeti.Cmd.Commands -{ - public class ImportCommand - { - public IMessageSerializer MessageSerializer { get; set; } - - public bool DirectToQueue { get; set; } - - - public int Execute(IModel channel, IRateLimiter rateLimiter) - { - var messageCount = 0; - - foreach (var message in MessageSerializer.Deserialize(channel)) - { - rateLimiter.Execute(() => - { - var exchange = DirectToQueue ? "" : message.Exchange; - var routingKey = DirectToQueue ? message.Queue : message.RoutingKey; - - channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); - messageCount++; - }); - } - - return messageCount; - } - } -} diff --git a/Tapeti.Cmd/Commands/ShovelCommand.cs b/Tapeti.Cmd/Commands/ShovelCommand.cs deleted file mode 100644 index 3de6aca..0000000 --- a/Tapeti.Cmd/Commands/ShovelCommand.cs +++ /dev/null @@ -1,43 +0,0 @@ -using RabbitMQ.Client; -using Tapeti.Cmd.RateLimiter; - -namespace Tapeti.Cmd.Commands -{ - public class ShovelCommand - { - public string QueueName { get; set; } - public string TargetQueueName { get; set; } - public bool RemoveMessages { get; set; } - public int? MaxCount { get; set; } - - - public int Execute(IModel sourceChannel, IModel targetChannel, IRateLimiter rateLimiter) - { - var messageCount = 0; - - while (!MaxCount.HasValue || messageCount < MaxCount.Value) - { - var result = sourceChannel.BasicGet(QueueName, false); - if (result == null) - // No more messages on the queue - break; - - // Since RabbitMQ client 6 we need to copy the body before calling another channel method - // like BasicPublish, or the published body will be corrupted - var bodyCopy = result.Body.ToArray(); - - - rateLimiter.Execute(() => - { - targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, bodyCopy); - messageCount++; - - if (RemoveMessages) - sourceChannel.BasicAck(result.DeliveryTag, false); - }); - } - - return messageCount; - } - } -} diff --git a/Tapeti.Cmd/Parser/BindingParser.cs b/Tapeti.Cmd/Parser/BindingParser.cs new file mode 100644 index 0000000..b839c73 --- /dev/null +++ b/Tapeti.Cmd/Parser/BindingParser.cs @@ -0,0 +1,23 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Tapeti.Cmd.Parser +{ + public static class BindingParser + { + public static Tuple[] Parse(IEnumerable bindings) + { + return bindings + .Select(b => + { + var parts = b.Split(':'); + if (parts.Length != 2) + throw new InvalidOperationException($"Invalid binding format: {b}"); + + return new Tuple(parts[0], parts[1]); + }) + .ToArray(); + } + } +} diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index 1171015..b1f563a 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -1,602 +1,57 @@ using System; -using System.Collections.Generic; using System.Diagnostics; -using System.IO; using System.Linq; -using System.Text; +using System.Reflection; using CommandLine; -using RabbitMQ.Client; -using RabbitMQ.Client.Exceptions; -using Tapeti.Cmd.Commands; -using Tapeti.Cmd.Mock; -using Tapeti.Cmd.RateLimiter; -using Tapeti.Cmd.Serialization; +using Tapeti.Cmd.Verbs; namespace Tapeti.Cmd { public class Program { - public class CommonOptions - { - [Option('h', "host", HelpText = "Hostname of the RabbitMQ server.", Default = "localhost")] - public string Host { get; set; } - - [Option("port", HelpText = "AMQP port of the RabbitMQ server.", Default = 5672)] - public int Port { get; set; } - - [Option('v', "virtualhost", HelpText = "Virtual host used for the RabbitMQ connection.", Default = "/")] - public string VirtualHost { get; set; } - - [Option('u', "username", HelpText = "Username used to connect to the RabbitMQ server.", Default = "guest")] - public string Username { get; set; } - - [Option('p', "password", HelpText = "Password used to connect to the RabbitMQ server.", Default = "guest")] - public string Password { get; set; } - } - - - public enum SerializationMethod - { - SingleFileJSON, - EasyNetQHosepipe - } - - - public class MessageSerializerOptions : CommonOptions - { - [Option('s', "serialization", HelpText = "The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe.", Default = SerializationMethod.SingleFileJSON)] - public SerializationMethod SerializationMethod { get; set; } - } - - - - [Verb("export", HelpText = "Fetch messages from a queue and write it to disk.")] - public class ExportOptions : MessageSerializerOptions - { - [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")] - public string QueueName { get; set; } - - [Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")] - public string OutputPath { get; set; } - - [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")] - public bool RemoveMessages { get; set; } - - [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] - public int? MaxCount { get; set; } - } - - - [Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")] - public class ImportOptions : MessageSerializerOptions - { - [Option('i', "input", Group = "Input", HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")] - public string InputFile { get; set; } - - [Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] - public string InputMessage { get; set; } - - [Option('c', "pipe", Group = "Input", HelpText = "Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] - public bool InputPipe { get; set; } - - [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] - public bool PublishToExchange { get; set; } - - [Option("maxrate", HelpText = "The maximum amount of messages per second to import.")] - public int? MaxRate { get; set; } - } - - - [Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")] - public class ExampleOptions - { - } - - - [Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")] - public class ShovelOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")] - public string QueueName { get; set; } - - [Option('t', "targetqueue", HelpText = "The target queue to publish the messages to. Defaults to the source queue if a different target host, port or virtualhost is specified. Otherwise it must be different from the source queue.")] - public string TargetQueueName { get; set; } - - [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the source queue. If not messages are kept.")] - public bool RemoveMessages { get; set; } - - [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] - public int? MaxCount { get; set; } - - [Option("targethost", HelpText = "Hostname of the target RabbitMQ server. Defaults to the source host. Note that you may still specify a different targetusername for example.")] - public string TargetHost { get; set; } - - [Option("targetport", HelpText = "AMQP port of the target RabbitMQ server. Defaults to the source port.")] - public int? TargetPort { get; set; } - - [Option("targetvirtualhost", HelpText = "Virtual host used for the target RabbitMQ connection. Defaults to the source virtualhost.")] - public string TargetVirtualHost { get; set; } - - [Option("targetusername", HelpText = "Username used to connect to the target RabbitMQ server. Defaults to the source username.")] - public string TargetUsername { get; set; } - - [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] - public string TargetPassword { get; set; } - - [Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")] - public int? MaxRate { get; set; } - } - - - [Verb("purge", HelpText = "Removes all messages from a queue destructively.")] - public class PurgeOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The queue to purge.")] - public string QueueName { get; set; } - - [Option("confirm", HelpText = "Confirms the purging of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] - public bool Confirm { get; set; } - } - - - [Verb("declarequeue", HelpText = "Declares a durable queue without arguments, compatible with Tapeti.")] - public class DeclareQueueOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The name of the queue to declare.")] - public string QueueName { get; set; } - - [Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: :")] - public IEnumerable Bindings { get; set; } - } - - - [Verb("removequeue", HelpText = "Removes a durable queue.")] - public class RemoveQueueOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The name of the queue to remove.")] - public string QueueName { get; set; } - - [Option("confirm", HelpText = "Confirms the removal of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] - public bool Confirm { get; set; } - - [Option("confirmpurge", HelpText = "Confirms the removal of the specified queue even if there still are messages in the queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] - public bool ConfirmPurge { get; set; } - } - - - [Verb("bindqueue", HelpText = "Add a binding to a queue.")] - public class BindQueueOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The name of the queue to add the binding(s) to.")] - public string QueueName { get; set; } - - [Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: :")] - public IEnumerable Bindings { get; set; } - } - - - [Verb("unbindqueue", HelpText = "Remove a binding from a queue.")] - public class UnbindQueueOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The name of the queue to remove the binding(s) from.")] - public string QueueName { get; set; } - - [Option('b', "bindings", Required = false, HelpText = "One or more bindings to remove from the queue. Format: :")] - public IEnumerable Bindings { get; set; } - } - - public static int Main(string[] args) { - return Parser.Default.ParseArguments(args) - .MapResult( - (ExportOptions o) => ExecuteVerb(o, RunExport), - (ImportOptions o) => ExecuteVerb(o, RunImport), - (ExampleOptions o) => ExecuteVerb(o, RunExample), - (ShovelOptions o) => ExecuteVerb(o, RunShovel), - (PurgeOptions o) => ExecuteVerb(o, RunPurge), - (DeclareQueueOptions o) => ExecuteVerb(o, RunDeclareQueue), - (RemoveQueueOptions o) => ExecuteVerb(o, RunRemoveQueue), - (BindQueueOptions o) => ExecuteVerb(o, RunBindQueue), - (UnbindQueueOptions o) => ExecuteVerb(o, RunUnbindQueue), - errs => - { - if (!Debugger.IsAttached) - return 1; - - Console.WriteLine("Press any Enter key to continue..."); - Console.ReadLine(); - return 1; - } - ); - } - - - private static int ExecuteVerb(T options, Action execute) where T : class - { - try - { - execute(options); - return 0; - } - catch (Exception e) - { - Console.WriteLine(e.Message); - return 1; - } - } - - - private static IConnection GetConnection(CommonOptions options) - { - var factory = new ConnectionFactory - { - HostName = options.Host, - Port = options.Port, - VirtualHost = options.VirtualHost, - UserName = options.Username, - Password = options.Password - }; - - return factory.CreateConnection(); - } - - - private static IMessageSerializer GetMessageSerializer(ImportOptions options) - { - switch (options.SerializationMethod) - { - case SerializationMethod.SingleFileJSON: - return new SingleFileJSONMessageSerializer(GetInputStream(options, out var disposeStream), disposeStream, Encoding.UTF8); - - case SerializationMethod.EasyNetQHosepipe: - if (string.IsNullOrEmpty(options.InputFile)) - throw new ArgumentException("An input path must be provided when using EasyNetQHosepipe serialization"); - - return new EasyNetQMessageSerializer(options.InputFile); - - default: - throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); - } - } - - - private static Stream GetInputStream(ImportOptions options, out bool disposeStream) - { - if (options.InputPipe) - { - disposeStream = false; - return Console.OpenStandardInput(); - } - - if (!string.IsNullOrEmpty(options.InputMessage)) - { - disposeStream = true; - return new MemoryStream(Encoding.UTF8.GetBytes(options.InputMessage)); - } - - disposeStream = true; - return new FileStream(options.InputFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); - } - - - private static IMessageSerializer GetMessageSerializer(ExportOptions options) - { - switch (options.SerializationMethod) - { - case SerializationMethod.SingleFileJSON: - return new SingleFileJSONMessageSerializer(GetOutputStream(options, out var disposeStream), disposeStream, Encoding.UTF8); - - case SerializationMethod.EasyNetQHosepipe: - if (string.IsNullOrEmpty(options.OutputPath)) - throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization"); - - return new EasyNetQMessageSerializer(options.OutputPath); - - default: - throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); - } - } - - - private static Stream GetOutputStream(ExportOptions options, out bool disposeStream) - { - disposeStream = true; - return new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read); - } - - - private static IRateLimiter GetRateLimiter(int? maxRate) - { - if (!maxRate.HasValue || maxRate.Value <= 0) - return new NoRateLimiter(); - - return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1)); - } - - - private static void RunExport(ExportOptions options) - { - int messageCount; - - using (var messageSerializer = GetMessageSerializer(options)) - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - messageCount = new ExportCommand - { - MessageSerializer = messageSerializer, - - QueueName = options.QueueName, - RemoveMessages = options.RemoveMessages, - MaxCount = options.MaxCount - }.Execute(channel); - } - - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported."); - } - - - private static void RunImport(ImportOptions options) - { - int messageCount; - - using (var messageSerializer = GetMessageSerializer(options)) - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - messageCount = new ImportCommand - { - MessageSerializer = messageSerializer, - - DirectToQueue = !options.PublishToExchange - }.Execute(channel, GetRateLimiter(options.MaxRate)); - } - - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); - } - - - private static void RunExample(ExampleOptions options) - { - using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false))) - { - messageSerializer.Serialize(new Message - { - Exchange = "example", - Queue = "example.queue", - RoutingKey = "example.routing.key", - DeliveryTag = 42, - Properties = new MockBasicProperties - { - ContentType = "application/json", - DeliveryMode = 2, - Headers = new Dictionary - { - { "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") } - }, - ReplyTo = "reply.queue", - Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()) - }, - Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }") - }); - } - } - - - private static void RunShovel(ShovelOptions options) - { - int messageCount; - - using (var sourceConnection = GetConnection(options)) - using (var sourceChannel = sourceConnection.CreateModel()) - { - var shovelCommand = new ShovelCommand - { - QueueName = options.QueueName, - TargetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName, - RemoveMessages = options.RemoveMessages, - MaxCount = options.MaxCount - }; - - - if (RequiresSecondConnection(options)) - { - using (var targetConnection = GetTargetConnection(options)) - using (var targetChannel = targetConnection.CreateModel()) - { - messageCount = shovelCommand.Execute(sourceChannel, targetChannel, GetRateLimiter(options.MaxRate)); - } - } - else - messageCount = shovelCommand.Execute(sourceChannel, sourceChannel, GetRateLimiter(options.MaxRate)); - } - - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled."); - } - - - private static bool RequiresSecondConnection(ShovelOptions options) - { - if (!string.IsNullOrEmpty(options.TargetHost) && options.TargetHost != options.Host) - return true; - - if (options.TargetPort.HasValue && options.TargetPort.Value != options.Port) - return true; - - if (!string.IsNullOrEmpty(options.TargetVirtualHost) && options.TargetVirtualHost != options.VirtualHost) - return true; - - - // All relevant target host parameters are either omitted or the same. This means the queue must be different - // to prevent an infinite loop. - if (string.IsNullOrEmpty(options.TargetQueueName) || options.TargetQueueName == options.QueueName) - throw new ArgumentException("Target queue must be different from the source queue when shoveling within the same (virtual) host"); - - - if (!string.IsNullOrEmpty(options.TargetUsername) && options.TargetUsername != options.Username) - return true; - - // ReSharper disable once ConvertIfStatementToReturnStatement - if (!string.IsNullOrEmpty(options.TargetPassword) && options.TargetPassword != options.Password) - return true; - - - // Everything's the same, we can use the same channel - return false; - } - - - private static IConnection GetTargetConnection(ShovelOptions options) - { - var factory = new ConnectionFactory - { - HostName = !string.IsNullOrEmpty(options.TargetHost) ? options.TargetHost : options.Host, - Port = options.TargetPort ?? options.Port, - VirtualHost = !string.IsNullOrEmpty(options.TargetVirtualHost) ? options.TargetVirtualHost : options.VirtualHost, - UserName = !string.IsNullOrEmpty(options.TargetUsername) ? options.TargetUsername : options.Username, - Password = !string.IsNullOrEmpty(options.TargetPassword) ? options.TargetPassword : options.Password - }; - - return factory.CreateConnection(); - } - - - private static void RunPurge(PurgeOptions options) - { - if (!options.Confirm) - { - Console.Write($"Do you want to purge the queue '{options.QueueName}'? (Y/N) "); - var answer = Console.ReadLine(); - - if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) - return; - } - - uint messageCount; - - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - messageCount = channel.QueuePurge(options.QueueName); - } - - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'."); - } - - - private static void RunDeclareQueue(DeclareQueueOptions options) - { - // Parse early to fail early - var bindings = ParseBindings(options.Bindings); - - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - channel.QueueDeclare(options.QueueName, true, false, false); - - foreach (var (exchange, routingKey) in bindings) - channel.QueueBind(options.QueueName, exchange, routingKey); - } - - Console.WriteLine($"Queue {options.QueueName} declared with {bindings.Length} binding{(bindings.Length != 1 ? "s" : "")}."); - } - - - private static void RunRemoveQueue(RemoveQueueOptions options) - { - if (!options.Confirm) - { - Console.Write($"Do you want to remove the queue '{options.QueueName}'? (Y/N) "); - var answer = Console.ReadLine(); - - if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) - return; - } - - uint messageCount; - - try - { - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - messageCount = channel.QueueDelete(options.QueueName, true, true); - } - } - catch (OperationInterruptedException e) - { - if (e.ShutdownReason.ReplyCode == 406) - { - if (!options.ConfirmPurge) - { - Console.Write($"There are messages remaining. Do you want to purge the queue '{options.QueueName}'? (Y/N) "); - var answer = Console.ReadLine(); - - if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) - return; - } - - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - messageCount = channel.QueueDelete(options.QueueName, true, false); - } - } - else - throw; - } - - Console.WriteLine(messageCount == 0 - ? $"Empty or non-existent queue '{options.QueueName}' removed." - : $"{messageCount} message{(messageCount != 1 ? "s" : "")} purged while removing '{options.QueueName}'."); - } - - - private static void RunBindQueue(BindQueueOptions options) - { - var bindings = ParseBindings(options.Bindings); - - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - foreach (var (exchange, routingKey) in bindings) - channel.QueueBind(options.QueueName, exchange, routingKey); - } - - Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} added to queue {options.QueueName}."); - } - - - private static void RunUnbindQueue(UnbindQueueOptions options) - { - var bindings = ParseBindings(options.Bindings); - - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - foreach (var (exchange, routingKey) in bindings) - channel.QueueUnbind(options.QueueName, exchange, routingKey); - } - - Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} removed from queue {options.QueueName}."); - } - - - - private static Tuple[] ParseBindings(IEnumerable bindings) - { - return bindings - .Select(b => - { - var parts = b.Split(':'); - if (parts.Length != 2) - throw new InvalidOperationException($"Invalid binding format: {b}"); - - return new Tuple(parts[0], parts[1]); - }) + var exitCode = 1; + var verbTypes = Assembly.GetExecutingAssembly().GetTypes() + .Where(t => t.GetCustomAttribute() != null) .ToArray(); + + CommandLine.Parser.Default.ParseArguments(args, verbTypes.ToArray()) + .WithParsed(o => + { + try + { + var executableVerbAttribute = o.GetType().GetCustomAttribute(); + var executer = Activator.CreateInstance(executableVerbAttribute.VerbExecuter, o) as IVerbExecuter; + + // Should have been validated by the ExecutableVerbAttribute + Debug.Assert(executer != null, nameof(executer) + " != null"); + + executer.Execute(); + exitCode = 0; + } + catch (Exception e) + { + Console.WriteLine(e.Message); + DebugConfirmClose(); + } + }) + .WithNotParsed(_ => + { + DebugConfirmClose(); + }); + + return exitCode; + } + + + private static void DebugConfirmClose() + { + if (!Debugger.IsAttached) + return; + + Console.WriteLine("Press any Enter key to continue..."); + Console.ReadLine(); } } } diff --git a/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs b/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs index d0b99ee..570fb02 100644 --- a/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs +++ b/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs @@ -11,22 +11,23 @@ namespace Tapeti.Cmd.Serialization { public class EasyNetQMessageSerializer : IMessageSerializer { - private static readonly Regex InvalidCharRegex = new Regex(@"[\\\/:\*\?\""\<\>|]", RegexOptions.Compiled); + private static readonly Regex InvalidCharRegex = new(@"[\\\/:\*\?\""\<\>|]", RegexOptions.Compiled); - private readonly string path; private readonly Lazy writablePath; private int messageCount; + private readonly Lazy files; + public EasyNetQMessageSerializer(string path) { - this.path = path; - writablePath = new Lazy(() => { Directory.CreateDirectory(path); return path; }); + + files = new Lazy(() => Directory.GetFiles(path, "*.*.message.txt")); } @@ -60,9 +61,15 @@ namespace Tapeti.Cmd.Serialization } + public int GetMessageCount() + { + return files.Value.Length; + } + + public IEnumerable Deserialize(IModel channel) { - foreach (var file in Directory.GetFiles(path, "*.*.message.txt")) + foreach (var file in files.Value) { const string messageTag = ".message."; @@ -303,7 +310,7 @@ namespace Tapeti.Cmd.Serialization public Message ToMessage() { - return new Message + return new() { //ConsumerTag = DeliveryTag = DeliverTag, diff --git a/Tapeti.Cmd/Serialization/IMessageSerializer.cs b/Tapeti.Cmd/Serialization/IMessageSerializer.cs index 3aae717..831b4df 100644 --- a/Tapeti.Cmd/Serialization/IMessageSerializer.cs +++ b/Tapeti.Cmd/Serialization/IMessageSerializer.cs @@ -19,6 +19,8 @@ namespace Tapeti.Cmd.Serialization public interface IMessageSerializer : IDisposable { void Serialize(Message message); + + int GetMessageCount(); IEnumerable Deserialize(IModel channel); } } diff --git a/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs b/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs index 05f2e88..2452157 100644 --- a/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs +++ b/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs @@ -14,8 +14,11 @@ namespace Tapeti.Cmd.Serialization private readonly bool disposeStream; private readonly Encoding encoding; + // StreamReader.DefaultBufferSize is private :-/ + private const int DefaultBufferSize = 1024; - private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings + + private static readonly JsonSerializerSettings SerializerSettings = new() { NullValueHandling = NullValueHandling.Ignore }; @@ -40,23 +43,48 @@ namespace Tapeti.Cmd.Serialization exportFile.Value.WriteLine(serialized); } + + public int GetMessageCount() + { + if (!stream.CanSeek) + return 0; + + var position = stream.Position; + try + { + var lineCount = 0; + using var reader = new StreamReader(stream, encoding, true, DefaultBufferSize, true); + + while (!reader.EndOfStream) + { + if (!string.IsNullOrEmpty(reader.ReadLine())) + lineCount++; + } + + return lineCount; + } + finally + { + stream.Position = position; + } + } + public IEnumerable Deserialize(IModel channel) { - using (var reader = new StreamReader(stream, encoding)) + using var reader = new StreamReader(stream, encoding, true, DefaultBufferSize, true); + + while (!reader.EndOfStream) { - while (!reader.EndOfStream) - { - var serialized = reader.ReadLine(); - if (string.IsNullOrEmpty(serialized)) - continue; + var serialized = reader.ReadLine(); + if (string.IsNullOrEmpty(serialized)) + continue; - var serializableMessage = JsonConvert.DeserializeObject(serialized); - if (serializableMessage == null) - continue; + var serializableMessage = JsonConvert.DeserializeObject(serialized); + if (serializableMessage == null) + continue; - yield return serializableMessage.ToMessage(channel); - } + yield return serializableMessage.ToMessage(channel); } } @@ -135,7 +163,7 @@ namespace Tapeti.Cmd.Serialization public Message ToMessage(IModel channel) { - return new Message + return new() { DeliveryTag = DeliveryTag, Redelivered = Redelivered, diff --git a/Tapeti.Cmd/Tapeti.Cmd.csproj b/Tapeti.Cmd/Tapeti.Cmd.csproj index 388750a..d5eda89 100644 --- a/Tapeti.Cmd/Tapeti.Cmd.csproj +++ b/Tapeti.Cmd/Tapeti.Cmd.csproj @@ -12,6 +12,7 @@ https://github.com/MvRens/Tapeti 2.0.0 Tapeti Command-line Utility + latest diff --git a/Tapeti.Cmd/Verbs/BaseConnectionOptions.cs b/Tapeti.Cmd/Verbs/BaseConnectionOptions.cs new file mode 100644 index 0000000..2f09302 --- /dev/null +++ b/Tapeti.Cmd/Verbs/BaseConnectionOptions.cs @@ -0,0 +1,22 @@ +using CommandLine; + +namespace Tapeti.Cmd.Verbs +{ + public class BaseConnectionOptions + { + [Option('h', "host", HelpText = "Hostname of the RabbitMQ server.", Default = "localhost")] + public string Host { get; set; } + + [Option("port", HelpText = "AMQP port of the RabbitMQ server.", Default = 5672)] + public int Port { get; set; } + + [Option('v', "virtualhost", HelpText = "Virtual host used for the RabbitMQ connection.", Default = "/")] + public string VirtualHost { get; set; } + + [Option('u', "username", HelpText = "Username used to connect to the RabbitMQ server.", Default = "guest")] + public string Username { get; set; } + + [Option('p', "password", HelpText = "Password used to connect to the RabbitMQ server.", Default = "guest")] + public string Password { get; set; } + } +} diff --git a/Tapeti.Cmd/Verbs/BaseMessageSerializerOptions.cs b/Tapeti.Cmd/Verbs/BaseMessageSerializerOptions.cs new file mode 100644 index 0000000..89727ed --- /dev/null +++ b/Tapeti.Cmd/Verbs/BaseMessageSerializerOptions.cs @@ -0,0 +1,17 @@ +using CommandLine; + +namespace Tapeti.Cmd.Verbs +{ + public enum SerializationMethod + { + SingleFileJSON, + EasyNetQHosepipe + } + + + public class BaseMessageSerializerOptions : BaseConnectionOptions + { + [Option('s', "serialization", HelpText = "The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe.", Default = SerializationMethod.SingleFileJSON)] + public SerializationMethod SerializationMethod { get; set; } + } +} diff --git a/Tapeti.Cmd/Verbs/BindQueueVerb.cs b/Tapeti.Cmd/Verbs/BindQueueVerb.cs new file mode 100644 index 0000000..368b7df --- /dev/null +++ b/Tapeti.Cmd/Verbs/BindQueueVerb.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.Parser; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("bindqueue", HelpText = "Add a binding to a queue.")] + [ExecutableVerb(typeof(BindQueueVerb))] + public class BindQueueOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The name of the queue to add the binding(s) to.")] + public string QueueName { get; set; } + + [Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: :")] + public IEnumerable Bindings { get; set; } + } + + + public class BindQueueVerb : IVerbExecuter + { + private readonly BindQueueOptions options; + + + public BindQueueVerb(BindQueueOptions options) + { + this.options = options; + } + + + public void Execute() + { + var bindings = BindingParser.Parse(options.Bindings); + + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + foreach (var (exchange, routingKey) in bindings) + channel.QueueBind(options.QueueName, exchange, routingKey); + + Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} added to queue {options.QueueName}."); + } + } +} diff --git a/Tapeti.Cmd/Verbs/DeclareQueueVerb.cs b/Tapeti.Cmd/Verbs/DeclareQueueVerb.cs new file mode 100644 index 0000000..5ebd72f --- /dev/null +++ b/Tapeti.Cmd/Verbs/DeclareQueueVerb.cs @@ -0,0 +1,57 @@ +using System; +using System.Collections.Generic; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.Parser; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("declarequeue", HelpText = "Declares a durable queue without arguments, compatible with Tapeti.")] + [ExecutableVerb(typeof(DeclareQueueVerb))] + public class DeclareQueueOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The name of the queue to declare.")] + public string QueueName { get; set; } + + [Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: :")] + public IEnumerable Bindings { get; set; } + } + + + public class DeclareQueueVerb : IVerbExecuter + { + private readonly DeclareQueueOptions options; + + + public DeclareQueueVerb(DeclareQueueOptions options) + { + this.options = options; + } + + + public void Execute() + { + // Parse early to fail early + var bindings = BindingParser.Parse(options.Bindings); + + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + channel.QueueDeclare(options.QueueName, true, false, false); + + foreach (var (exchange, routingKey) in bindings) + channel.QueueBind(options.QueueName, exchange, routingKey); + + Console.WriteLine($"Queue {options.QueueName} declared with {bindings.Length} binding{(bindings.Length != 1 ? "s" : "")}."); + } + } +} diff --git a/Tapeti.Cmd/Verbs/ExampleVerb.cs b/Tapeti.Cmd/Verbs/ExampleVerb.cs new file mode 100644 index 0000000..6998e70 --- /dev/null +++ b/Tapeti.Cmd/Verbs/ExampleVerb.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.Mock; +using Tapeti.Cmd.Serialization; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")] + [ExecutableVerb(typeof(ExampleVerb))] + public class ExampleOptions + { + } + + + public class ExampleVerb : IVerbExecuter + { + public ExampleVerb(ExampleOptions options) + { + // Prevent compiler warnings, the parameter is expected by the Activator + Debug.Assert(options != null); + } + + + public void Execute() + { + using var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false)); + + messageSerializer.Serialize(new Message + { + Exchange = "example", + Queue = "example.queue", + RoutingKey = "example.routing.key", + DeliveryTag = 42, + Properties = new MockBasicProperties + { + ContentType = "application/json", + DeliveryMode = 2, + Headers = new Dictionary + { + { "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") } + }, + ReplyTo = "reply.queue", + Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()) + }, + Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }") + }); + } + } +} diff --git a/Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs b/Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs new file mode 100644 index 0000000..aaeec9e --- /dev/null +++ b/Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs @@ -0,0 +1,30 @@ +using System; + +namespace Tapeti.Cmd.Verbs +{ + /// + /// Implementations are expected to have a constructor which accepts the options class + /// associated with the ExecutableVerb attribute. + /// + public interface IVerbExecuter + { + void Execute(); + } + + + + [AttributeUsage(AttributeTargets.Class)] + public class ExecutableVerbAttribute : Attribute + { + public Type VerbExecuter { get; } + + + public ExecutableVerbAttribute(Type verbExecuter) + { + if (!typeof(IVerbExecuter).IsAssignableFrom(verbExecuter)) + throw new InvalidCastException("Type must support IVerbExecuter"); + + VerbExecuter = verbExecuter; + } + } +} diff --git a/Tapeti.Cmd/Verbs/ExportVerb.cs b/Tapeti.Cmd/Verbs/ExportVerb.cs new file mode 100644 index 0000000..33af936 --- /dev/null +++ b/Tapeti.Cmd/Verbs/ExportVerb.cs @@ -0,0 +1,121 @@ +using System; +using System.IO; +using System.Text; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.ASCII; +using Tapeti.Cmd.Serialization; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("export", HelpText = "Fetch messages from a queue and write it to disk.")] + [ExecutableVerb(typeof(ExportVerb))] + public class ExportOptions : BaseMessageSerializerOptions + { + [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")] + public string QueueName { get; set; } + + [Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")] + public string OutputPath { get; set; } + + [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")] + public bool RemoveMessages { get; set; } + + [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] + public int? MaxCount { get; set; } + } + + + public class ExportVerb : IVerbExecuter + { + private readonly ExportOptions options; + + + public ExportVerb(ExportOptions options) + { + this.options = options; + } + + + public void Execute() + { + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var messageSerializer = GetMessageSerializer(options); + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + var totalCount = (int)channel.MessageCount(options.QueueName); + if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) + totalCount = options.MaxCount.Value; + + Console.WriteLine($"Exporting {totalCount} message{(totalCount != 1 ? "s" : "")} (actual number may differ if queue has active consumers or publishers)"); + var messageCount = 0; + var cancelled = false; + + Console.CancelKeyPress += (_, args) => + { + args.Cancel = true; + cancelled = true; + }; + + using (var progressBar = new ProgressBar(totalCount)) + { + while (!cancelled && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value)) + { + var result = channel.BasicGet(options.QueueName, false); + if (result == null) + // No more messages on the queue + break; + + messageCount++; + + messageSerializer.Serialize(new Message + { + DeliveryTag = result.DeliveryTag, + Redelivered = result.Redelivered, + Exchange = result.Exchange, + RoutingKey = result.RoutingKey, + Queue = options.QueueName, + Properties = result.BasicProperties, + Body = result.Body.ToArray() + }); + + if (options.RemoveMessages) + channel.BasicAck(result.DeliveryTag, false); + + + progressBar.Report(messageCount); + } + } + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported."); + } + + + private static IMessageSerializer GetMessageSerializer(ExportOptions options) + { + switch (options.SerializationMethod) + { + case SerializationMethod.SingleFileJSON: + return new SingleFileJSONMessageSerializer(new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read), true, Encoding.UTF8); + + case SerializationMethod.EasyNetQHosepipe: + if (string.IsNullOrEmpty(options.OutputPath)) + throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization"); + + return new EasyNetQMessageSerializer(options.OutputPath); + + default: + throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); + } + } + } +} diff --git a/Tapeti.Cmd/Verbs/ImportVerb.cs b/Tapeti.Cmd/Verbs/ImportVerb.cs new file mode 100644 index 0000000..1040220 --- /dev/null +++ b/Tapeti.Cmd/Verbs/ImportVerb.cs @@ -0,0 +1,149 @@ +using System; +using System.IO; +using System.Text; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.ASCII; +using Tapeti.Cmd.RateLimiter; +using Tapeti.Cmd.Serialization; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")] + [ExecutableVerb(typeof(ImportVerb))] + public class ImportOptions : BaseMessageSerializerOptions + { + [Option('i', "input", Group = "Input", HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")] + public string InputFile { get; set; } + + [Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] + public string InputMessage { get; set; } + + [Option('c', "pipe", Group = "Input", HelpText = "Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] + public bool InputPipe { get; set; } + + [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] + public bool PublishToExchange { get; set; } + + [Option("maxrate", HelpText = "The maximum amount of messages per second to import.")] + public int? MaxRate { get; set; } + } + + + public class ImportVerb : IVerbExecuter + { + private readonly ImportOptions options; + + + public ImportVerb(ImportOptions options) + { + this.options = options; + } + + + public void Execute() + { + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var messageSerializer = GetMessageSerializer(options); + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + var rateLimiter = GetRateLimiter(options.MaxRate); + + var totalCount = messageSerializer.GetMessageCount(); + var messageCount = 0; + var cancelled = false; + + Console.CancelKeyPress += (_, args) => + { + args.Cancel = true; + cancelled = true; + }; + + ProgressBar progress = null; + if (totalCount > 0) + progress = new ProgressBar(totalCount); + try + { + foreach (var message in messageSerializer.Deserialize(channel)) + { + if (cancelled) + break; + + rateLimiter.Execute(() => + { + var exchange = options.PublishToExchange ? message.Exchange : ""; + var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue; + + // ReSharper disable AccessToDisposedClosure + channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); + messageCount++; + + progress?.Report(messageCount); + // ReSharper restore AccessToDisposedClosure + }); + } + } + finally + { + progress?.Dispose(); + } + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); + } + + + private static IMessageSerializer GetMessageSerializer(ImportOptions options) + { + switch (options.SerializationMethod) + { + case SerializationMethod.SingleFileJSON: + return new SingleFileJSONMessageSerializer(GetInputStream(options, out var disposeStream), disposeStream, Encoding.UTF8); + + case SerializationMethod.EasyNetQHosepipe: + if (string.IsNullOrEmpty(options.InputFile)) + throw new ArgumentException("An input path must be provided when using EasyNetQHosepipe serialization"); + + return new EasyNetQMessageSerializer(options.InputFile); + + default: + throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); + } + } + + + private static Stream GetInputStream(ImportOptions options, out bool disposeStream) + { + if (options.InputPipe) + { + disposeStream = false; + return Console.OpenStandardInput(); + } + + if (!string.IsNullOrEmpty(options.InputMessage)) + { + disposeStream = true; + return new MemoryStream(Encoding.UTF8.GetBytes(options.InputMessage)); + } + + disposeStream = true; + return new FileStream(options.InputFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); + } + + + private static IRateLimiter GetRateLimiter(int? maxRate) + { + if (!maxRate.HasValue || maxRate.Value <= 0) + return new NoRateLimiter(); + + return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1)); + } + } +} diff --git a/Tapeti.Cmd/Verbs/PurgeVerb.cs b/Tapeti.Cmd/Verbs/PurgeVerb.cs new file mode 100644 index 0000000..30c6d00 --- /dev/null +++ b/Tapeti.Cmd/Verbs/PurgeVerb.cs @@ -0,0 +1,58 @@ +using System; +using CommandLine; +using RabbitMQ.Client; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("purge", HelpText = "Removes all messages from a queue destructively.")] + [ExecutableVerb(typeof(PurgeVerb))] + public class PurgeOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The queue to purge.")] + public string QueueName { get; set; } + + [Option("confirm", HelpText = "Confirms the purging of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] + public bool Confirm { get; set; } + } + + + public class PurgeVerb : IVerbExecuter + { + private readonly PurgeOptions options; + + + public PurgeVerb(PurgeOptions options) + { + this.options = options; + } + + + public void Execute() + { + if (!options.Confirm) + { + Console.Write($"Do you want to purge the queue '{options.QueueName}'? (Y/N) "); + var answer = Console.ReadLine(); + + if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + return; + } + + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + var messageCount = channel.QueuePurge(options.QueueName); + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'."); + } + } +} diff --git a/Tapeti.Cmd/Verbs/RemoveQueueVerb.cs b/Tapeti.Cmd/Verbs/RemoveQueueVerb.cs new file mode 100644 index 0000000..f83b4d1 --- /dev/null +++ b/Tapeti.Cmd/Verbs/RemoveQueueVerb.cs @@ -0,0 +1,90 @@ +using System; +using CommandLine; +using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("removequeue", HelpText = "Removes a durable queue.")] + [ExecutableVerb(typeof(RemoveQueueVerb))] + public class RemoveQueueOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The name of the queue to remove.")] + public string QueueName { get; set; } + + [Option("confirm", HelpText = "Confirms the removal of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] + public bool Confirm { get; set; } + + [Option("confirmpurge", HelpText = "Confirms the removal of the specified queue even if there still are messages in the queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] + public bool ConfirmPurge { get; set; } + } + + + public class RemoveQueueVerb : IVerbExecuter + { + private readonly RemoveQueueOptions options; + + + public RemoveQueueVerb(RemoveQueueOptions options) + { + this.options = options; + } + + + public void Execute() + { + if (!options.Confirm) + { + Console.Write($"Do you want to remove the queue '{options.QueueName}'? (Y/N) "); + var answer = Console.ReadLine(); + + if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + return; + } + + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + uint messageCount; + + try + { + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + messageCount = channel.QueueDelete(options.QueueName, true, true); + } + catch (OperationInterruptedException e) + { + if (e.ShutdownReason.ReplyCode == 406) + { + if (!options.ConfirmPurge) + { + Console.Write($"There are messages remaining. Do you want to purge the queue '{options.QueueName}'? (Y/N) "); + var answer = Console.ReadLine(); + + if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + return; + } + + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + messageCount = channel.QueueDelete(options.QueueName, true, false); + } + else + throw; + } + + Console.WriteLine(messageCount == 0 + ? $"Empty or non-existent queue '{options.QueueName}' removed." + : $"{messageCount} message{(messageCount != 1 ? "s" : "")} purged while removing '{options.QueueName}'."); + } + } +} diff --git a/Tapeti.Cmd/Verbs/ShovelVerb.cs b/Tapeti.Cmd/Verbs/ShovelVerb.cs new file mode 100644 index 0000000..c601b5a --- /dev/null +++ b/Tapeti.Cmd/Verbs/ShovelVerb.cs @@ -0,0 +1,181 @@ +using System; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.ASCII; +using Tapeti.Cmd.RateLimiter; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")] + [ExecutableVerb(typeof(ShovelVerb))] + public class ShovelOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")] + public string QueueName { get; set; } + + [Option('t', "targetqueue", HelpText = "The target queue to publish the messages to. Defaults to the source queue if a different target host, port or virtualhost is specified. Otherwise it must be different from the source queue.")] + public string TargetQueueName { get; set; } + + [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the source queue. If not messages are kept.")] + public bool RemoveMessages { get; set; } + + [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] + public int? MaxCount { get; set; } + + [Option("targethost", HelpText = "Hostname of the target RabbitMQ server. Defaults to the source host. Note that you may still specify a different targetusername for example.")] + public string TargetHost { get; set; } + + [Option("targetport", HelpText = "AMQP port of the target RabbitMQ server. Defaults to the source port.")] + public int? TargetPort { get; set; } + + [Option("targetvirtualhost", HelpText = "Virtual host used for the target RabbitMQ connection. Defaults to the source virtualhost.")] + public string TargetVirtualHost { get; set; } + + [Option("targetusername", HelpText = "Username used to connect to the target RabbitMQ server. Defaults to the source username.")] + public string TargetUsername { get; set; } + + [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] + public string TargetPassword { get; set; } + + [Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")] + public int? MaxRate { get; set; } + } + + + public class ShovelVerb : IVerbExecuter + { + private readonly ShovelOptions options; + + + public ShovelVerb(ShovelOptions options) + { + this.options = options; + } + + + public void Execute() + { + var sourceFactory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var sourceConnection = sourceFactory.CreateConnection(); + using var sourceChannel = sourceConnection.CreateModel(); + + if (RequiresSecondConnection(options)) + { + var targetFactory = new ConnectionFactory + { + HostName = !string.IsNullOrEmpty(options.TargetHost) ? options.TargetHost : options.Host, + Port = options.TargetPort ?? options.Port, + VirtualHost = !string.IsNullOrEmpty(options.TargetVirtualHost) ? options.TargetVirtualHost : options.VirtualHost, + UserName = !string.IsNullOrEmpty(options.TargetUsername) ? options.TargetUsername : options.Username, + Password = !string.IsNullOrEmpty(options.TargetPassword) ? options.TargetPassword : options.Password + }; + + using var targetConnection = targetFactory.CreateConnection(); + using var targetChannel = targetConnection.CreateModel(); + + Shovel(options, sourceChannel, targetChannel); + } + else + Shovel(options, sourceChannel, sourceChannel); + } + + + private static void Shovel(ShovelOptions options, IModel sourceChannel, IModel targetChannel) + { + var rateLimiter = GetRateLimiter(options.MaxRate); + var targetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName; + + var totalCount = (int)sourceChannel.MessageCount(options.QueueName); + if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) + totalCount = options.MaxCount.Value; + + Console.WriteLine($"Shoveling {totalCount} message{(totalCount != 1 ? "s" : "")} (actual number may differ if queue has active consumers or publishers)"); + var messageCount = 0; + var cancelled = false; + + Console.CancelKeyPress += (_, args) => + { + args.Cancel = true; + cancelled = true; + }; + + using (var progressBar = new ProgressBar(totalCount)) + { + while (!cancelled && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value)) + { + var result = sourceChannel.BasicGet(options.QueueName, false); + if (result == null) + // No more messages on the queue + break; + + // Since RabbitMQ client 6 we need to copy the body before calling another channel method + // like BasicPublish, or the published body will be corrupted if sourceChannel and targetChannel are the same + var bodyCopy = result.Body.ToArray(); + + + rateLimiter.Execute(() => + { + targetChannel.BasicPublish("", targetQueueName, result.BasicProperties, bodyCopy); + messageCount++; + + if (options.RemoveMessages) + sourceChannel.BasicAck(result.DeliveryTag, false); + + // ReSharper disable once AccessToDisposedClosure + progressBar.Report(messageCount); + }); + } + } + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled."); + } + + + private static bool RequiresSecondConnection(ShovelOptions options) + { + if (!string.IsNullOrEmpty(options.TargetHost) && options.TargetHost != options.Host) + return true; + + if (options.TargetPort.HasValue && options.TargetPort.Value != options.Port) + return true; + + if (!string.IsNullOrEmpty(options.TargetVirtualHost) && options.TargetVirtualHost != options.VirtualHost) + return true; + + + // All relevant target host parameters are either omitted or the same. This means the queue must be different + // to prevent an infinite loop. + if (string.IsNullOrEmpty(options.TargetQueueName) || options.TargetQueueName == options.QueueName) + throw new ArgumentException("Target queue must be different from the source queue when shoveling within the same (virtual) host"); + + + if (!string.IsNullOrEmpty(options.TargetUsername) && options.TargetUsername != options.Username) + return true; + + // ReSharper disable once ConvertIfStatementToReturnStatement + if (!string.IsNullOrEmpty(options.TargetPassword) && options.TargetPassword != options.Password) + return true; + + + // Everything's the same, we can use the same channel + return false; + } + + + private static IRateLimiter GetRateLimiter(int? maxRate) + { + if (!maxRate.HasValue || maxRate.Value <= 0) + return new NoRateLimiter(); + + return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1)); + } + } +} diff --git a/Tapeti.Cmd/Verbs/UnbindQueueVerb.cs b/Tapeti.Cmd/Verbs/UnbindQueueVerb.cs new file mode 100644 index 0000000..12c91f2 --- /dev/null +++ b/Tapeti.Cmd/Verbs/UnbindQueueVerb.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.Parser; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("unbindqueue", HelpText = "Remove a binding from a queue.")] + [ExecutableVerb(typeof(UnbindQueueVerb))] + public class UnbindQueueOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The name of the queue to remove the binding(s) from.")] + public string QueueName { get; set; } + + [Option('b', "bindings", Required = false, HelpText = "One or more bindings to remove from the queue. Format: :")] + public IEnumerable Bindings { get; set; } + } + + + public class UnbindQueueVerb : IVerbExecuter + { + private readonly UnbindQueueOptions options; + + + public UnbindQueueVerb(UnbindQueueOptions options) + { + this.options = options; + } + + + public void Execute() + { + var bindings = BindingParser.Parse(options.Bindings); + + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + foreach (var (exchange, routingKey) in bindings) + channel.QueueUnbind(options.QueueName, exchange, routingKey); + + Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} removed from queue {options.QueueName}."); + } + } +}