From 34da354fc2f2e135cd129c522f487d671d3e011d Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 16 Oct 2019 13:54:43 +0200 Subject: [PATCH] Start of Tapeti.Cmd [ci skip] Support for basic queue to/from disk operations --- Tapeti.Cmd/Commands/ExportCommand.cs | 59 +++++ Tapeti.Cmd/Commands/ImportCommand.cs | 42 ++++ Tapeti.Cmd/Program.cs | 177 +++++++++++++ .../Serialization/IMessageSerializer.cs | 24 ++ .../SingleFileJSONMessageSerializer.cs | 234 ++++++++++++++++++ Tapeti.Cmd/Tapeti.Cmd.csproj | 18 ++ Tapeti.Cmd/build-release.bat | 1 + Tapeti.sln | 17 +- Tapeti.sln.DotSettings | 1 + 9 files changed, 569 insertions(+), 4 deletions(-) create mode 100644 Tapeti.Cmd/Commands/ExportCommand.cs create mode 100644 Tapeti.Cmd/Commands/ImportCommand.cs create mode 100644 Tapeti.Cmd/Program.cs create mode 100644 Tapeti.Cmd/Serialization/IMessageSerializer.cs create mode 100644 Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs create mode 100644 Tapeti.Cmd/Tapeti.Cmd.csproj create mode 100644 Tapeti.Cmd/build-release.bat diff --git a/Tapeti.Cmd/Commands/ExportCommand.cs b/Tapeti.Cmd/Commands/ExportCommand.cs new file mode 100644 index 0000000..a13ee74 --- /dev/null +++ b/Tapeti.Cmd/Commands/ExportCommand.cs @@ -0,0 +1,59 @@ +using RabbitMQ.Client; +using Tapeti.Cmd.Serialization; + +namespace Tapeti.Cmd.Commands +{ + public class ExportCommand + { + public ConnectionFactory ConnectionFactory { get; set; } + public IMessageSerializer MessageSerializer { get; set; } + + public string QueueName { get; set; } + public bool RemoveMessages { get; set; } + public int? MaxCount { get; set; } + + + public int Execute() + { + using (var connection = ConnectionFactory.CreateConnection()) + { + using (var channel = connection.CreateModel()) + { + return GetMessages(channel); + } + } + } + + + private int GetMessages(IModel channel) + { + var messageCount = 0; + + while (!MaxCount.HasValue || messageCount < MaxCount.Value) + { + var result = channel.BasicGet(QueueName, false); + if (result == null) + // No more messages on the queue + break; + + messageCount++; + + MessageSerializer.Serialize(new Message + { + DeliveryTag = result.DeliveryTag, + Redelivered = result.Redelivered, + Exchange = result.Exchange, + RoutingKey = result.RoutingKey, + Queue = QueueName, + Properties = result.BasicProperties, + Body = result.Body + }); + + if (RemoveMessages) + channel.BasicAck(result.DeliveryTag, false); + } + + return messageCount; + } + } +} diff --git a/Tapeti.Cmd/Commands/ImportCommand.cs b/Tapeti.Cmd/Commands/ImportCommand.cs new file mode 100644 index 0000000..e53de39 --- /dev/null +++ b/Tapeti.Cmd/Commands/ImportCommand.cs @@ -0,0 +1,42 @@ +using RabbitMQ.Client; +using Tapeti.Cmd.Serialization; + +namespace Tapeti.Cmd.Commands +{ + public class ImportCommand + { + public ConnectionFactory ConnectionFactory { get; set; } + public IMessageSerializer MessageSerializer { get; set; } + + public bool DirectToQueue { get; set; } + + + public int Execute() + { + using (var connection = ConnectionFactory.CreateConnection()) + { + using (var channel = connection.CreateModel()) + { + return PublishMessages(channel); + } + } + } + + + private int PublishMessages(IModel channel) + { + var messageCount = 0; + + foreach (var message in MessageSerializer.Deserialize()) + { + var exchange = DirectToQueue ? "" : message.Exchange; + var routingKey = DirectToQueue ? message.Queue : message.RoutingKey; + + channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); + messageCount++; + } + + return messageCount; + } + } +} diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs new file mode 100644 index 0000000..4c2f507 --- /dev/null +++ b/Tapeti.Cmd/Program.cs @@ -0,0 +1,177 @@ +using System; +using System.Diagnostics; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.Commands; +using Tapeti.Cmd.Serialization; + +namespace Tapeti.Cmd +{ + public class Program + { + public class CommonOptions + { + [Option('h', "host", HelpText = "Hostname of the RabbitMQ server.", Default = "localhost")] + public string Host { get; set; } + + [Option('p', "port", HelpText = "AMQP port of the RabbitMQ server.", Default = 5672)] + public int Port { get; set; } + + [Option('v', "virtualhost", HelpText = "Virtual host used for the RabbitMQ connection.", Default = "/")] + public string VirtualHost { get; set; } + + [Option('u', "username", HelpText = "Username used to connect to the RabbitMQ server.", Default = "guest")] + public string Username { get; set; } + + [Option('p', "password", HelpText = "Password used to connect to the RabbitMQ server.", Default = "guest")] + public string Password { get; set; } + } + + + public enum SerializationMethod + { + SingleFileJSON, + EasyNetQHosepipe + } + + + public class MessageSerializerOptions : CommonOptions + { + [Option('s', "serialization", HelpText = "The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe.", Default = SerializationMethod.SingleFileJSON)] + public SerializationMethod SerializationMethod { get; set; } + } + + + + [Verb("export", HelpText = "Fetch messages from a queue and write it to disk.")] + public class ExportOptions : MessageSerializerOptions + { + [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")] + public string QueueName { get; set; } + + [Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")] + public string OutputPath { get; set; } + + [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")] + public bool RemoveMessages { get; set; } + + [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] + public int? MaxCount { get; set; } + } + + + [Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")] + public class ImportOptions : MessageSerializerOptions + { + [Option('i', "input", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")] + public string Input { get; set; } + + [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] + public bool PublishToExchange { get; set; } + } + + + + public static int Main(string[] args) + { + return Parser.Default.ParseArguments(args) + .MapResult( + (ExportOptions o) => ExecuteVerb(o, RunExport), + (ImportOptions o) => ExecuteVerb(o, RunImport), + errs => + { + if (!Debugger.IsAttached) + return 1; + + Console.WriteLine("Press any Enter key to continue..."); + Console.ReadLine(); + return 1; + } + ); + } + + + private static int ExecuteVerb(T options, Action execute) where T : class + { + try + { + execute(options); + return 0; + } + catch (Exception e) + { + Console.WriteLine(e.Message); + return 1; + } + } + + + private static ConnectionFactory GetConnectionFactory(CommonOptions options) + { + return new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + } + + + private static IMessageSerializer GetMessageSerializer(MessageSerializerOptions options, string path) + { + switch (options.SerializationMethod) + { + case SerializationMethod.SingleFileJSON: + return new SingleFileJSONMessageSerializer(path); + + case SerializationMethod.EasyNetQHosepipe: + throw new NotImplementedException(); + + default: + throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); + } + } + + + private static void RunExport(ExportOptions options) + { + int messageCount; + + using (var messageSerializer = GetMessageSerializer(options, options.OutputPath)) + { + messageCount = new ExportCommand + { + ConnectionFactory = GetConnectionFactory(options), + MessageSerializer = messageSerializer, + + QueueName = options.QueueName, + RemoveMessages = options.RemoveMessages, + MaxCount = options.MaxCount + }.Execute(); + } + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported."); + } + + + private static void RunImport(ImportOptions options) + { + int messageCount; + + using (var messageSerializer = GetMessageSerializer(options, options.Input)) + { + messageCount = new ImportCommand + { + ConnectionFactory = GetConnectionFactory(options), + MessageSerializer = messageSerializer, + + DirectToQueue = !options.PublishToExchange + }.Execute(); + } + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); + } + } +} diff --git a/Tapeti.Cmd/Serialization/IMessageSerializer.cs b/Tapeti.Cmd/Serialization/IMessageSerializer.cs new file mode 100644 index 0000000..e8ce5a3 --- /dev/null +++ b/Tapeti.Cmd/Serialization/IMessageSerializer.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using RabbitMQ.Client; + +namespace Tapeti.Cmd.Serialization +{ + public class Message + { + public ulong DeliveryTag; + public bool Redelivered; + public string Exchange; + public string RoutingKey; + public string Queue; + public IBasicProperties Properties; + public byte[] Body; + } + + + public interface IMessageSerializer : IDisposable + { + void Serialize(Message message); + IEnumerable Deserialize(); + } +} diff --git a/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs b/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs new file mode 100644 index 0000000..7079c5f --- /dev/null +++ b/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs @@ -0,0 +1,234 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using RabbitMQ.Client; +using RabbitMQ.Client.Framing; + +namespace Tapeti.Cmd.Serialization +{ + public class SingleFileJSONMessageSerializer : IMessageSerializer + { + private readonly string path; + + + private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore + }; + + private readonly Lazy exportFile; + + + public SingleFileJSONMessageSerializer(string path) + { + this.path = path; + exportFile = new Lazy(() => new StreamWriter(path, false, Encoding.UTF8)); + } + + + public void Serialize(Message message) + { + var serializableMessage = new SerializableMessage(message); + var serialized = JsonConvert.SerializeObject(serializableMessage, SerializerSettings); + exportFile.Value.WriteLine(serialized); + } + + + public IEnumerable Deserialize() + { + using (var file = new StreamReader(path)) + { + while (!file.EndOfStream) + { + var serialized = file.ReadLine(); + if (string.IsNullOrEmpty(serialized)) + continue; + + var serializableMessage = JsonConvert.DeserializeObject(serialized); + if (serializableMessage == null) + continue; + + yield return serializableMessage.ToMessage(); + } + } + } + + + public void Dispose() + { + if (exportFile.IsValueCreated) + exportFile.Value.Dispose(); + } + + + + // ReSharper disable MemberCanBePrivate.Local - used for JSON serialization + // ReSharper disable NotAccessedField.Local + // ReSharper disable FieldCanBeMadeReadOnly.Local + private class SerializableMessage + { + public ulong DeliveryTag; + public bool Redelivered; + public string Exchange; + public string RoutingKey; + public string Queue; + + // ReSharper disable once FieldCanBeMadeReadOnly.Local - must be settable by JSON deserialization + public SerializableMessageProperties Properties; + + public JObject Body; + public byte[] RawBody; + + + // ReSharper disable once UnusedMember.Global - used by JSON deserialization + // ReSharper disable once UnusedMember.Local + public SerializableMessage() + { + Properties = new SerializableMessageProperties(); + } + + + public SerializableMessage(Message fromMessage) + { + DeliveryTag = fromMessage.DeliveryTag; + Redelivered = fromMessage.Redelivered; + Exchange = fromMessage.Exchange; + RoutingKey = fromMessage.RoutingKey; + Queue = fromMessage.Queue; + Properties = new SerializableMessageProperties(fromMessage.Properties); + + // If this is detected as a JSON message, include the object directly in the JSON line so that it is easier + // to read and process in the output file. Otherwise simply include the raw data and let Newtonsoft encode it. + // This does mean the message will be rewritten. If this is an issue, feel free to add a "raw" option to this tool + // that forces the RawBody to be used. It is open-source after all :-). + if (Properties.ContentType == "application/json") + { + try + { + Body = JObject.Parse(Encoding.UTF8.GetString(fromMessage.Body)); + RawBody = null; + } + catch + { + // Fall back to using the raw body + Body = null; + RawBody = fromMessage.Body; + } + } + else + { + Body = null; + RawBody = fromMessage.Body; + } + } + + + public Message ToMessage() + { + return new Message + { + DeliveryTag = DeliveryTag, + Redelivered = Redelivered, + Exchange = Exchange, + RoutingKey = RoutingKey, + Queue = Queue, + Properties = Properties.ToBasicProperties(), + Body = Body != null + ? Encoding.UTF8.GetBytes(Body.ToString(Formatting.None)) + : RawBody + }; + } + } + + + // IBasicProperties is finicky when it comes to writing it's properties, + // so we need this normalized class to read and write it from and to JSON + private class SerializableMessageProperties + { + public string AppId; + public string ClusterId; + public string ContentEncoding; + public string ContentType; + public string CorrelationId; + public byte? DeliveryMode; + public string Expiration; + public IDictionary Headers; + public string MessageId; + public byte? Priority; + public string ReplyTo; + public long? Timestamp; + public string Type; + public string UserId; + + + public SerializableMessageProperties() + { + } + + + public SerializableMessageProperties(IBasicProperties fromProperties) + { + AppId = fromProperties.AppId; + ClusterId = fromProperties.ClusterId; + ContentEncoding = fromProperties.ContentEncoding; + ContentType = fromProperties.ContentType; + CorrelationId = fromProperties.CorrelationId; + DeliveryMode = fromProperties.IsDeliveryModePresent() ? (byte?)fromProperties.DeliveryMode : null; + Expiration = fromProperties.Expiration; + MessageId = fromProperties.MessageId; + Priority = fromProperties.IsPriorityPresent() ? (byte?) fromProperties.Priority : null; + ReplyTo = fromProperties.ReplyTo; + Timestamp = fromProperties.IsTimestampPresent() ? (long?)fromProperties.Timestamp.UnixTime : null; + Type = fromProperties.Type; + UserId = fromProperties.UserId; + + if (fromProperties.IsHeadersPresent()) + { + Headers = new Dictionary(); + + // This assumes header values are UTF-8 encoded strings. This is true for Tapeti. + foreach (var pair in fromProperties.Headers) + Headers.Add(pair.Key, Encoding.UTF8.GetString((byte[])pair.Value)); + } + else + Headers = null; + } + + + public IBasicProperties ToBasicProperties() + { + var properties = new BasicProperties(); + + if (!string.IsNullOrEmpty(AppId)) properties.AppId = AppId; + if (!string.IsNullOrEmpty(ClusterId)) properties.ClusterId = ClusterId; + if (!string.IsNullOrEmpty(ContentEncoding)) properties.ContentEncoding = ContentEncoding; + if (!string.IsNullOrEmpty(ContentType)) properties.ContentType = ContentType; + if (DeliveryMode.HasValue) properties.DeliveryMode = DeliveryMode.Value; + if (!string.IsNullOrEmpty(Expiration)) properties.Expiration = Expiration; + if (!string.IsNullOrEmpty(MessageId)) properties.MessageId = MessageId; + if (Priority.HasValue) properties.Priority = Priority.Value; + if (!string.IsNullOrEmpty(ReplyTo)) properties.ReplyTo = ReplyTo; + if (Timestamp.HasValue) properties.Timestamp = new AmqpTimestamp(Timestamp.Value); + if (!string.IsNullOrEmpty(Type)) properties.Type = Type; + if (!string.IsNullOrEmpty(UserId)) properties.UserId = UserId; + + // ReSharper disable once InvertIf + if (Headers != null) + { + properties.Headers = new Dictionary(); + + foreach (var pair in Headers) + properties.Headers.Add(pair.Key, Encoding.UTF8.GetBytes(pair.Value)); + } + + return properties; + } + } + // ReSharper restore FieldCanBeMadeReadOnly.Local + // ReSharper restore NotAccessedField.Local + // ReSharper restore MemberCanBePrivate.Local + } +} diff --git a/Tapeti.Cmd/Tapeti.Cmd.csproj b/Tapeti.Cmd/Tapeti.Cmd.csproj new file mode 100644 index 0000000..6a80ffb --- /dev/null +++ b/Tapeti.Cmd/Tapeti.Cmd.csproj @@ -0,0 +1,18 @@ + + + + Exe + netcoreapp2.2 + 2.0.0 + Mark van Renswoude + Mark van Renswoude + Tapeti Command-line Utility + + + + + + + + + diff --git a/Tapeti.Cmd/build-release.bat b/Tapeti.Cmd/build-release.bat new file mode 100644 index 0000000..01d9716 --- /dev/null +++ b/Tapeti.Cmd/build-release.bat @@ -0,0 +1 @@ +dotnet publish -c Release -r win-x64 \ No newline at end of file diff --git a/Tapeti.sln b/Tapeti.sln index 04187e1..c3e5cf3 100644 --- a/Tapeti.sln +++ b/Tapeti.sln @@ -45,13 +45,17 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Core", "Core", "{8E757FF7-F EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions", "{57996ADC-18C5-4991-9F95-58D58D442461}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.CastleWindsor", "Tapeti.CastleWindsor\Tapeti.CastleWindsor.csproj", "{374AAE64-598B-4F67-8870-4A05168FF987}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.CastleWindsor", "Tapeti.CastleWindsor\Tapeti.CastleWindsor.csproj", "{374AAE64-598B-4F67-8870-4A05168FF987}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Autofac", "Tapeti.Autofac\Tapeti.Autofac.csproj", "{B3802005-C941-41B6-A9A5-20573A7C24AE}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Autofac", "Tapeti.Autofac\Tapeti.Autofac.csproj", "{B3802005-C941-41B6-A9A5-20573A7C24AE}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.UnityContainer", "Tapeti.UnityContainer\Tapeti.UnityContainer.csproj", "{BA8CA9A2-BAFF-42BB-8439-3DD9D1F6C32E}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.UnityContainer", "Tapeti.UnityContainer\Tapeti.UnityContainer.csproj", "{BA8CA9A2-BAFF-42BB-8439-3DD9D1F6C32E}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Ninject", "Tapeti.Ninject\Tapeti.Ninject.csproj", "{29478B10-FC53-4E93-ADEF-A775D9408131}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Ninject", "Tapeti.Ninject\Tapeti.Ninject.csproj", "{29478B10-FC53-4E93-ADEF-A775D9408131}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tools", "Tools", "{62002327-46B0-4B72-B95A-594CE7F8C80D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -143,6 +147,10 @@ Global {29478B10-FC53-4E93-ADEF-A775D9408131}.Debug|Any CPU.Build.0 = Debug|Any CPU {29478B10-FC53-4E93-ADEF-A775D9408131}.Release|Any CPU.ActiveCfg = Release|Any CPU {29478B10-FC53-4E93-ADEF-A775D9408131}.Release|Any CPU.Build.0 = Release|Any CPU + {C8728BFC-7F97-41BC-956B-690A57B634EC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C8728BFC-7F97-41BC-956B-690A57B634EC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C8728BFC-7F97-41BC-956B-690A57B634EC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C8728BFC-7F97-41BC-956B-690A57B634EC}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -168,6 +176,7 @@ Global {B3802005-C941-41B6-A9A5-20573A7C24AE} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F} {BA8CA9A2-BAFF-42BB-8439-3DD9D1F6C32E} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F} {29478B10-FC53-4E93-ADEF-A775D9408131} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F} + {C8728BFC-7F97-41BC-956B-690A57B634EC} = {62002327-46B0-4B72-B95A-594CE7F8C80D} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B09CC2BF-B2AF-4CB6-8728-5D1D8E5C50FA} diff --git a/Tapeti.sln.DotSettings b/Tapeti.sln.DotSettings index 406d866..4a2b131 100644 --- a/Tapeti.sln.DotSettings +++ b/Tapeti.sln.DotSettings @@ -2,6 +2,7 @@ False API ID + JSON KV SQL <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />