diff --git a/Tapeti.Cmd/Commands/ExportCommand.cs b/Tapeti.Cmd/Commands/ExportCommand.cs index a13ee74..2f69aa4 100644 --- a/Tapeti.Cmd/Commands/ExportCommand.cs +++ b/Tapeti.Cmd/Commands/ExportCommand.cs @@ -5,7 +5,6 @@ namespace Tapeti.Cmd.Commands { public class ExportCommand { - public ConnectionFactory ConnectionFactory { get; set; } public IMessageSerializer MessageSerializer { get; set; } public string QueueName { get; set; } @@ -13,19 +12,7 @@ namespace Tapeti.Cmd.Commands public int? MaxCount { get; set; } - public int Execute() - { - using (var connection = ConnectionFactory.CreateConnection()) - { - using (var channel = connection.CreateModel()) - { - return GetMessages(channel); - } - } - } - - - private int GetMessages(IModel channel) + public int Execute(IModel channel) { var messageCount = 0; diff --git a/Tapeti.Cmd/Commands/ImportCommand.cs b/Tapeti.Cmd/Commands/ImportCommand.cs index e53de39..ccdf308 100644 --- a/Tapeti.Cmd/Commands/ImportCommand.cs +++ b/Tapeti.Cmd/Commands/ImportCommand.cs @@ -5,25 +5,12 @@ namespace Tapeti.Cmd.Commands { public class ImportCommand { - public ConnectionFactory ConnectionFactory { get; set; } public IMessageSerializer MessageSerializer { get; set; } public bool DirectToQueue { get; set; } - public int Execute() - { - using (var connection = ConnectionFactory.CreateConnection()) - { - using (var channel = connection.CreateModel()) - { - return PublishMessages(channel); - } - } - } - - - private int PublishMessages(IModel channel) + public int Execute(IModel channel) { var messageCount = 0; diff --git a/Tapeti.Cmd/Commands/ShovelCommand.cs b/Tapeti.Cmd/Commands/ShovelCommand.cs new file mode 100644 index 0000000..9b42a3a --- /dev/null +++ b/Tapeti.Cmd/Commands/ShovelCommand.cs @@ -0,0 +1,37 @@ +using RabbitMQ.Client; + +namespace Tapeti.Cmd.Commands +{ + public class ShovelCommand + { + public string QueueName { get; set; } + public string TargetQueueName { get; set; } + public bool RemoveMessages { get; set; } + public int? MaxCount { get; set; } + + + public int Execute(IModel sourceChannel, IModel targetChannel) + { + var messageCount = 0; + + while (!MaxCount.HasValue || messageCount < MaxCount.Value) + { + var result = sourceChannel.BasicGet(QueueName, false); + if (result == null) + // No more messages on the queue + break; + + + targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body); + + + messageCount++; + + if (RemoveMessages) + sourceChannel.BasicAck(result.DeliveryTag, false); + } + + return messageCount; + } + } +} diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index 4c2f507..a3abb8e 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -71,13 +71,47 @@ namespace Tapeti.Cmd } + [Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")] + public class ShovelOptions : CommonOptions + { + [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")] + public string QueueName { get; set; } + + [Option('t', "targetqueue", HelpText = "The target queue to publish the messages to. Defaults to the source queue if a different target host, port or virtualhost is specified. Otherwise it must be different from the source queue.")] + public string TargetQueueName { get; set; } + + [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the source queue. If not messages are kept.")] + public bool RemoveMessages { get; set; } + + [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] + public int? MaxCount { get; set; } + + [Option("targethost", HelpText = "Hostname of the target RabbitMQ server. Defaults to the source host. Note that you may still specify a different targetusername for example.")] + public string TargetHost { get; set; } + + [Option("targetport", HelpText = "AMQP port of the target RabbitMQ server. Defaults to the source port.")] + public int? TargetPort { get; set; } + + [Option("targetvirtualhost", HelpText = "Virtual host used for the target RabbitMQ connection. Defaults to the source virtualhost.")] + public string TargetVirtualHost { get; set; } + + [Option("targetusername", HelpText = "Username used to connect to the target RabbitMQ server. Defaults to the source username.")] + public string TargetUsername { get; set; } + + [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] + public string TargetPassword { get; set; } + } + + + public static int Main(string[] args) { - return Parser.Default.ParseArguments(args) + return Parser.Default.ParseArguments(args) .MapResult( (ExportOptions o) => ExecuteVerb(o, RunExport), (ImportOptions o) => ExecuteVerb(o, RunImport), + (ShovelOptions o) => ExecuteVerb(o, RunShovel), errs => { if (!Debugger.IsAttached) @@ -106,9 +140,9 @@ namespace Tapeti.Cmd } - private static ConnectionFactory GetConnectionFactory(CommonOptions options) + private static IConnection GetConnection(CommonOptions options) { - return new ConnectionFactory + var factory = new ConnectionFactory { HostName = options.Host, Port = options.Port, @@ -116,6 +150,8 @@ namespace Tapeti.Cmd UserName = options.Username, Password = options.Password }; + + return factory.CreateConnection(); } @@ -140,16 +176,17 @@ namespace Tapeti.Cmd int messageCount; using (var messageSerializer = GetMessageSerializer(options, options.OutputPath)) + using (var connection = GetConnection(options)) + using (var channel = connection.CreateModel()) { messageCount = new ExportCommand { - ConnectionFactory = GetConnectionFactory(options), MessageSerializer = messageSerializer, QueueName = options.QueueName, RemoveMessages = options.RemoveMessages, MaxCount = options.MaxCount - }.Execute(); + }.Execute(channel); } Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported."); @@ -161,17 +198,96 @@ namespace Tapeti.Cmd int messageCount; using (var messageSerializer = GetMessageSerializer(options, options.Input)) + using (var connection = GetConnection(options)) + using (var channel = connection.CreateModel()) { messageCount = new ImportCommand { - ConnectionFactory = GetConnectionFactory(options), MessageSerializer = messageSerializer, DirectToQueue = !options.PublishToExchange - }.Execute(); + }.Execute(channel); } Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); } + + + private static void RunShovel(ShovelOptions options) + { + int messageCount; + + using (var sourceConnection = GetConnection(options)) + using (var sourceChannel = sourceConnection.CreateModel()) + { + var shovelCommand = new ShovelCommand + { + QueueName = options.QueueName, + TargetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName, + RemoveMessages = options.RemoveMessages, + MaxCount = options.MaxCount + }; + + + if (RequiresSecondConnection(options)) + { + using (var targetConnection = GetTargetConnection(options)) + using (var targetChannel = targetConnection.CreateModel()) + { + messageCount = shovelCommand.Execute(sourceChannel, targetChannel); + } + } + else + messageCount = shovelCommand.Execute(sourceChannel, sourceChannel); + } + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled."); + } + + + private static bool RequiresSecondConnection(ShovelOptions options) + { + if (!string.IsNullOrEmpty(options.TargetHost) && options.TargetHost != options.Host) + return true; + + if (options.TargetPort.HasValue && options.TargetPort.Value != options.Port) + return true; + + if (!string.IsNullOrEmpty(options.TargetVirtualHost) && options.TargetVirtualHost != options.VirtualHost) + return true; + + + // All relevant target host parameters are either omitted or the same. This means the queue must be different + // to prevent an infinite loop. + if (string.IsNullOrEmpty(options.TargetQueueName) || options.TargetQueueName == options.QueueName) + throw new ArgumentException("Target queue must be different from the source queue when shoveling within the same (virtual) host"); + + + if (!string.IsNullOrEmpty(options.TargetUsername) && options.TargetUsername != options.Username) + return true; + + // ReSharper disable once ConvertIfStatementToReturnStatement + if (!string.IsNullOrEmpty(options.TargetPassword) && options.TargetPassword != options.Password) + return true; + + + // Everything's the same, we can use the same channel + return false; + } + + + private static IConnection GetTargetConnection(ShovelOptions options) + { + var factory = new ConnectionFactory + { + HostName = !string.IsNullOrEmpty(options.TargetHost) ? options.TargetHost : options.Host, + Port = options.TargetPort ?? options.Port, + VirtualHost = !string.IsNullOrEmpty(options.TargetVirtualHost) ? options.TargetVirtualHost : options.VirtualHost, + UserName = !string.IsNullOrEmpty(options.TargetUsername) ? options.TargetUsername : options.Username, + Password = !string.IsNullOrEmpty(options.TargetPassword) ? options.TargetPassword : options.Password, + }; + + return factory.CreateConnection(); + } } }