diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index 99c412d..81bf8d5 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -2,9 +2,11 @@ using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Text; using CommandLine; using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; using Tapeti.Cmd.Commands; using Tapeti.Cmd.Mock; using Tapeti.Cmd.RateLimiter; @@ -85,6 +87,12 @@ namespace Tapeti.Cmd } + [Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")] + public class ExampleOptions + { + } + + [Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")] public class ShovelOptions : CommonOptions { @@ -131,22 +139,43 @@ namespace Tapeti.Cmd } - [Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")] - public class ExampleOptions + [Verb("declarequeue", HelpText = "Declares a durable queue without arguments, compatible with Tapeti.")] + public class DeclareQueueOptions : CommonOptions { + [Option('q', "queue", Required = true, HelpText = "The name of the queue to declare.")] + public string QueueName { get; set; } + + [Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: :")] + public IEnumerable Bindings { get; set; } + } + + + [Verb("removequeue", HelpText = "Removes a durable queue.")] + public class RemoveQueueOptions : CommonOptions + { + [Option('q', "queue", Required = true, HelpText = "The name of the queue to declare.")] + public string QueueName { get; set; } + + [Option("confirm", HelpText = "Confirms the removal of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] + public bool Confirm { get; set; } + + [Option("confirmpurge", HelpText = "Confirms the removal of the specified queue even if there still are messages in the queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] + public bool ConfirmPurge { get; set; } } public static int Main(string[] args) { - return Parser.Default.ParseArguments(args) + return Parser.Default.ParseArguments(args) .MapResult( (ExportOptions o) => ExecuteVerb(o, RunExport), (ImportOptions o) => ExecuteVerb(o, RunImport), + (ExampleOptions o) => ExecuteVerb(o, RunExample), (ShovelOptions o) => ExecuteVerb(o, RunShovel), (PurgeOptions o) => ExecuteVerb(o, RunPurge), - (ExampleOptions o) => ExecuteVerb(o, RunExample), + (DeclareQueueOptions o) => ExecuteVerb(o, RunDeclareQueue), + (RemoveQueueOptions o) => ExecuteVerb(o, RunRemoveQueue), errs => { if (!Debugger.IsAttached) @@ -305,6 +334,33 @@ namespace Tapeti.Cmd } + private static void RunExample(ExampleOptions options) + { + using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false))) + { + messageSerializer.Serialize(new Message + { + Exchange = "example", + Queue = "example.queue", + RoutingKey = "example.routing.key", + DeliveryTag = 42, + Properties = new MockBasicProperties + { + ContentType = "application/json", + DeliveryMode = 2, + Headers = new Dictionary + { + { "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") } + }, + ReplyTo = "reply.queue", + Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()) + }, + Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }") + }); + } + } + + private static void RunShovel(ShovelOptions options) { int messageCount; @@ -403,34 +459,83 @@ namespace Tapeti.Cmd } Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'."); - } - private static void RunExample(ExampleOptions options) + private static void RunDeclareQueue(DeclareQueueOptions options) { - using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false))) - { - messageSerializer.Serialize(new Message + // Parse early to fail early + var bindings = options.Bindings + .Select(b => { - Exchange = "example", - Queue = "example.queue", - RoutingKey = "example.routing.key", - DeliveryTag = 42, - Properties = new MockBasicProperties - { - ContentType = "application/json", - DeliveryMode = 2, - Headers = new Dictionary - { - { "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") } - }, - ReplyTo = "reply.queue", - Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()) - }, - Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }") - }); + var parts = b.Split(':'); + if (parts.Length != 2) + throw new InvalidOperationException($"Invalid binding format: {b}"); + + return new Tuple(parts[0], parts[1]); + }) + .ToArray(); + + using (var connection = GetConnection(options)) + using (var channel = connection.CreateModel()) + { + channel.QueueDeclare(options.QueueName, true, false, false); + + foreach (var (exchange, routingKey) in bindings) + channel.QueueBind(options.QueueName, exchange, routingKey); } + + Console.WriteLine($"Queue {options.QueueName} declared with {bindings.Length} binding{(bindings.Length != 1 ? "s" : "")}."); + } + + + private static void RunRemoveQueue(RemoveQueueOptions options) + { + if (!options.Confirm) + { + Console.Write($"Do you want to remove the queue '{options.QueueName}'? (Y/N) "); + var answer = Console.ReadLine(); + + if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + return; + } + + uint messageCount; + + try + { + using (var connection = GetConnection(options)) + using (var channel = connection.CreateModel()) + { + messageCount = channel.QueueDelete(options.QueueName, true, true); + } + } + catch (OperationInterruptedException e) + { + if (e.ShutdownReason.ReplyCode == 406) + { + if (!options.ConfirmPurge) + { + Console.Write($"There are messages remaining. Do you want to purge the queue '{options.QueueName}'? (Y/N) "); + var answer = Console.ReadLine(); + + if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + return; + } + + using (var connection = GetConnection(options)) + using (var channel = connection.CreateModel()) + { + messageCount = channel.QueueDelete(options.QueueName, true, false); + } + } + else + throw; + } + + Console.WriteLine(messageCount == 0 + ? $"Empty or non-existent queue '{options.QueueName}' removed." + : $"{messageCount} message{(messageCount != 1 ? "s" : "")} purged while removing '{options.QueueName}'."); } } }