diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index 4c611cc..02fec70 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -1,7 +1,12 @@ using System; +using System.Collections.Generic; using System.Diagnostics; +using System.IO; +using System.Text; using CommandLine; +using CommandLine.Text; using RabbitMQ.Client; +using RabbitMQ.Client.Framing; using Tapeti.Cmd.Commands; using Tapeti.Cmd.Serialization; @@ -63,8 +68,14 @@ namespace Tapeti.Cmd [Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")] public class ImportOptions : MessageSerializerOptions { - [Option('i', "input", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")] - public string Input { get; set; } + [Option('i', "input", Group = "Input", HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")] + public string InputFile { get; set; } + + [Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] + public string InputMessage { get; set; } + + [Option('c', "pipe", Group = "Input", HelpText = "Messages are read from STDIN, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] + public bool InputPipe { get; set; } [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] public bool PublishToExchange { get; set; } @@ -103,15 +114,21 @@ namespace Tapeti.Cmd } + [Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")] + public class ExampleOptions + { + } + public static int Main(string[] args) { - return Parser.Default.ParseArguments(args) + return Parser.Default.ParseArguments(args) .MapResult( (ExportOptions o) => ExecuteVerb(o, RunExport), (ImportOptions o) => ExecuteVerb(o, RunImport), (ShovelOptions o) => ExecuteVerb(o, RunShovel), + (ExampleOptions o) => ExecuteVerb(o, RunExample), errs => { if (!Debugger.IsAttached) @@ -155,15 +172,18 @@ namespace Tapeti.Cmd } - private static IMessageSerializer GetMessageSerializer(MessageSerializerOptions options, string path) + private static IMessageSerializer GetMessageSerializer(ImportOptions options) { switch (options.SerializationMethod) { case SerializationMethod.SingleFileJSON: - return new SingleFileJSONMessageSerializer(path); + return new SingleFileJSONMessageSerializer(GetInputStream(options, out var disposeStream), disposeStream, Encoding.UTF8); case SerializationMethod.EasyNetQHosepipe: - return new EasyNetQMessageSerializer(path); + if (string.IsNullOrEmpty(options.InputFile)) + throw new ArgumentException("An input path must be provided when using EasyNetQHosepipe serialization"); + + return new EasyNetQMessageSerializer(options.InputFile); default: throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); @@ -171,11 +191,56 @@ namespace Tapeti.Cmd } + private static Stream GetInputStream(ImportOptions options, out bool disposeStream) + { + if (options.InputPipe) + { + disposeStream = false; + return Console.OpenStandardInput(); + } + + if (!string.IsNullOrEmpty(options.InputMessage)) + { + disposeStream = true; + return new MemoryStream(Encoding.UTF8.GetBytes(options.InputMessage)); + } + + disposeStream = true; + return new FileStream(options.InputFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); + } + + + private static IMessageSerializer GetMessageSerializer(ExportOptions options) + { + switch (options.SerializationMethod) + { + case SerializationMethod.SingleFileJSON: + return new SingleFileJSONMessageSerializer(GetOutputStream(options, out var disposeStream), disposeStream, Encoding.UTF8); + + case SerializationMethod.EasyNetQHosepipe: + if (string.IsNullOrEmpty(options.OutputPath)) + throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization"); + + return new EasyNetQMessageSerializer(options.OutputPath); + + default: + throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); + } + } + + + private static Stream GetOutputStream(ExportOptions options, out bool disposeStream) + { + disposeStream = true; + return new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read); + } + + private static void RunExport(ExportOptions options) { int messageCount; - using (var messageSerializer = GetMessageSerializer(options, options.OutputPath)) + using (var messageSerializer = GetMessageSerializer(options)) using (var connection = GetConnection(options)) using (var channel = connection.CreateModel()) { @@ -197,7 +262,7 @@ namespace Tapeti.Cmd { int messageCount; - using (var messageSerializer = GetMessageSerializer(options, options.Input)) + using (var messageSerializer = GetMessageSerializer(options)) using (var connection = GetConnection(options)) using (var channel = connection.CreateModel()) { @@ -289,5 +354,32 @@ namespace Tapeti.Cmd return factory.CreateConnection(); } + + + private static void RunExample(ExampleOptions options) + { + using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false))) + { + messageSerializer.Serialize(new Message + { + Exchange = "example", + Queue = "example.queue", + RoutingKey = "example.routing.key", + DeliveryTag = 42, + Properties = new BasicProperties + { + ContentType = "application/json", + DeliveryMode = 2, + Headers = new Dictionary + { + { "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") } + }, + ReplyTo = "reply.queue", + Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()) + }, + Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }") + }); + } + } } } diff --git a/Tapeti.Cmd/Properties/PublishProfiles/FolderProfile.pubxml b/Tapeti.Cmd/Properties/PublishProfiles/FolderProfile.pubxml new file mode 100644 index 0000000..ab9bb0c --- /dev/null +++ b/Tapeti.Cmd/Properties/PublishProfiles/FolderProfile.pubxml @@ -0,0 +1,16 @@ + + + + + FileSystem + Release + Any CPU + netcoreapp2.1 + bin\Release\netcoreapp2.1\publish\ + win-x64 + false + <_IsPortable>true + + \ No newline at end of file diff --git a/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs b/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs index 7079c5f..f8210fc 100644 --- a/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs +++ b/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs @@ -11,7 +11,9 @@ namespace Tapeti.Cmd.Serialization { public class SingleFileJSONMessageSerializer : IMessageSerializer { - private readonly string path; + private readonly Stream stream; + private readonly bool disposeStream; + private readonly Encoding encoding; private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings @@ -22,10 +24,13 @@ namespace Tapeti.Cmd.Serialization private readonly Lazy exportFile; - public SingleFileJSONMessageSerializer(string path) + public SingleFileJSONMessageSerializer(Stream stream, bool disposeStream, Encoding encoding) { - this.path = path; - exportFile = new Lazy(() => new StreamWriter(path, false, Encoding.UTF8)); + this.stream = stream; + this.disposeStream = disposeStream; + this.encoding = encoding; + + exportFile = new Lazy(() => new StreamWriter(stream, encoding)); } @@ -39,11 +44,11 @@ namespace Tapeti.Cmd.Serialization public IEnumerable Deserialize() { - using (var file = new StreamReader(path)) + using (var reader = new StreamReader(stream, encoding)) { - while (!file.EndOfStream) + while (!reader.EndOfStream) { - var serialized = file.ReadLine(); + var serialized = reader.ReadLine(); if (string.IsNullOrEmpty(serialized)) continue; @@ -61,6 +66,9 @@ namespace Tapeti.Cmd.Serialization { if (exportFile.IsValueCreated) exportFile.Value.Dispose(); + + if (disposeStream) + stream.Dispose(); } diff --git a/Tapeti.Cmd/Tapeti.Cmd.csproj b/Tapeti.Cmd/Tapeti.Cmd.csproj index 01626c4..d224a94 100644 --- a/Tapeti.Cmd/Tapeti.Cmd.csproj +++ b/Tapeti.Cmd/Tapeti.Cmd.csproj @@ -10,8 +10,8 @@ - - + + diff --git a/Tapeti.Cmd/build-release.bat b/Tapeti.Cmd/build-release.bat index c9ccd4f..9903ae7 100644 --- a/Tapeti.Cmd/build-release.bat +++ b/Tapeti.Cmd/build-release.bat @@ -1 +1,9 @@ -dotnet publish -c Release -r win-x64 --self-contained false \ No newline at end of file +mkdir publish + +REM Executable is generated using self-contained=true, which is just a wrapper for "dotnet Tapeti.Cmd.dll". +REM We don't need all the other DLL's so we'll build it twice and borrow the wrapper executable for a proper +REM framework-dependant build. +dotnet publish -c Release -r win-x64 --self-contained=true -o .\publish\selfcontained +dotnet publish -c Release -r win-x64 --self-contained=false -o .\publish + +copy .\publish\selfcontained\Tapeti.Cmd.exe .\publish\ \ No newline at end of file diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj index 20e9996..61b62da 100644 --- a/Tapeti.Serilog/Tapeti.Serilog.csproj +++ b/Tapeti.Serilog/Tapeti.Serilog.csproj @@ -11,7 +11,7 @@ - + diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst index c7cc559..6a81c14 100644 --- a/docs/tapeticmd.rst +++ b/docs/tapeticmd.rst @@ -7,7 +7,7 @@ The Tapeti command-line tool provides various operations for managing messages. Common parameters ----------------- -All operations support the following parameters. All are optional. +Most operations support the following parameters. All are optional. -h , --host Specifies the hostname of the RabbitMQ server. Default is localhost. @@ -65,8 +65,14 @@ Import Read messages from disk as previously exported and publish them to a queue. --i - *Required*. Path or filename (depending on the chosen serialization method) where the messages will be read from. +-i , --input + Path or filename (depending on the chosen serialization method) where the messages will be read from. + +-m , --message + Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input. Be sure to quote the entire message, and escape quotes within the message with another quote. + +-c, --pipe + Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input. -e, --exchange If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue. @@ -74,6 +80,7 @@ Read messages from disk as previously exported and publish them to a queue. -s , --serialization The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information. +Either input, message or pipe is required. Example: :: @@ -174,4 +181,22 @@ EasyNetQHosepipe '''''''''''''''' Provides compatibility with the EasyNetQ Hosepipe's dump/insert format. The source or target parameter must be a path. Each message consists of 3 files, ending in .message.txt, .properties.txt and .info.txt. -As this is only provided for emergency situations, see the source code if you want to know more about the format specification. \ No newline at end of file +As this is only provided for emergency situations, see the source code if you want to know more about the format specification. + + + +Generating an example +--------------------- + +The "example" operation is available to generate an example message in SingleFileJSON format. + +:: + + .\Tapeti.Cmd.exe example + + +To save the output to a file: + +:: + + .\Tapeti.Cmd.exe example > example.json \ No newline at end of file