New features for Tapeti.Cmd

- Send single message from command line
- Pipe messages to standard input
- Generate example JSON
This commit is contained in:
Mark van Renswoude 2020-03-29 11:32:51 +02:00
parent 145850c3c2
commit 65bbb00751
7 changed files with 172 additions and 23 deletions

View File

@ -1,7 +1,12 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using CommandLine;
using CommandLine.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
using Tapeti.Cmd.Commands;
using Tapeti.Cmd.Serialization;
@ -63,8 +68,14 @@ namespace Tapeti.Cmd
[Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")]
public class ImportOptions : MessageSerializerOptions
{
[Option('i', "input", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")]
public string Input { get; set; }
[Option('i', "input", Group = "Input", HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")]
public string InputFile { get; set; }
[Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public string InputMessage { get; set; }
[Option('c', "pipe", Group = "Input", HelpText = "Messages are read from STDIN, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public bool InputPipe { get; set; }
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
public bool PublishToExchange { get; set; }
@ -103,15 +114,21 @@ namespace Tapeti.Cmd
}
[Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")]
public class ExampleOptions
{
}
public static int Main(string[] args)
{
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions>(args)
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions, ExampleOptions>(args)
.MapResult(
(ExportOptions o) => ExecuteVerb(o, RunExport),
(ImportOptions o) => ExecuteVerb(o, RunImport),
(ShovelOptions o) => ExecuteVerb(o, RunShovel),
(ExampleOptions o) => ExecuteVerb(o, RunExample),
errs =>
{
if (!Debugger.IsAttached)
@ -155,15 +172,18 @@ namespace Tapeti.Cmd
}
private static IMessageSerializer GetMessageSerializer(MessageSerializerOptions options, string path)
private static IMessageSerializer GetMessageSerializer(ImportOptions options)
{
switch (options.SerializationMethod)
{
case SerializationMethod.SingleFileJSON:
return new SingleFileJSONMessageSerializer(path);
return new SingleFileJSONMessageSerializer(GetInputStream(options, out var disposeStream), disposeStream, Encoding.UTF8);
case SerializationMethod.EasyNetQHosepipe:
return new EasyNetQMessageSerializer(path);
if (string.IsNullOrEmpty(options.InputFile))
throw new ArgumentException("An input path must be provided when using EasyNetQHosepipe serialization");
return new EasyNetQMessageSerializer(options.InputFile);
default:
throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
@ -171,11 +191,56 @@ namespace Tapeti.Cmd
}
private static Stream GetInputStream(ImportOptions options, out bool disposeStream)
{
if (options.InputPipe)
{
disposeStream = false;
return Console.OpenStandardInput();
}
if (!string.IsNullOrEmpty(options.InputMessage))
{
disposeStream = true;
return new MemoryStream(Encoding.UTF8.GetBytes(options.InputMessage));
}
disposeStream = true;
return new FileStream(options.InputFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
}
private static IMessageSerializer GetMessageSerializer(ExportOptions options)
{
switch (options.SerializationMethod)
{
case SerializationMethod.SingleFileJSON:
return new SingleFileJSONMessageSerializer(GetOutputStream(options, out var disposeStream), disposeStream, Encoding.UTF8);
case SerializationMethod.EasyNetQHosepipe:
if (string.IsNullOrEmpty(options.OutputPath))
throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization");
return new EasyNetQMessageSerializer(options.OutputPath);
default:
throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
}
}
private static Stream GetOutputStream(ExportOptions options, out bool disposeStream)
{
disposeStream = true;
return new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read);
}
private static void RunExport(ExportOptions options)
{
int messageCount;
using (var messageSerializer = GetMessageSerializer(options, options.OutputPath))
using (var messageSerializer = GetMessageSerializer(options))
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
@ -197,7 +262,7 @@ namespace Tapeti.Cmd
{
int messageCount;
using (var messageSerializer = GetMessageSerializer(options, options.Input))
using (var messageSerializer = GetMessageSerializer(options))
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
@ -289,5 +354,32 @@ namespace Tapeti.Cmd
return factory.CreateConnection();
}
private static void RunExample(ExampleOptions options)
{
using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false)))
{
messageSerializer.Serialize(new Message
{
Exchange = "example",
Queue = "example.queue",
RoutingKey = "example.routing.key",
DeliveryTag = 42,
Properties = new BasicProperties
{
ContentType = "application/json",
DeliveryMode = 2,
Headers = new Dictionary<string, object>
{
{ "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") }
},
ReplyTo = "reply.queue",
Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds())
},
Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }")
});
}
}
}
}

View File

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
https://go.microsoft.com/fwlink/?LinkID=208121.
-->
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<PublishProtocol>FileSystem</PublishProtocol>
<Configuration>Release</Configuration>
<Platform>Any CPU</Platform>
<TargetFramework>netcoreapp2.1</TargetFramework>
<PublishDir>bin\Release\netcoreapp2.1\publish\</PublishDir>
<RuntimeIdentifier>win-x64</RuntimeIdentifier>
<SelfContained>false</SelfContained>
<_IsPortable>true</_IsPortable>
</PropertyGroup>
</Project>

View File

@ -11,7 +11,9 @@ namespace Tapeti.Cmd.Serialization
{
public class SingleFileJSONMessageSerializer : IMessageSerializer
{
private readonly string path;
private readonly Stream stream;
private readonly bool disposeStream;
private readonly Encoding encoding;
private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings
@ -22,10 +24,13 @@ namespace Tapeti.Cmd.Serialization
private readonly Lazy<StreamWriter> exportFile;
public SingleFileJSONMessageSerializer(string path)
public SingleFileJSONMessageSerializer(Stream stream, bool disposeStream, Encoding encoding)
{
this.path = path;
exportFile = new Lazy<StreamWriter>(() => new StreamWriter(path, false, Encoding.UTF8));
this.stream = stream;
this.disposeStream = disposeStream;
this.encoding = encoding;
exportFile = new Lazy<StreamWriter>(() => new StreamWriter(stream, encoding));
}
@ -39,11 +44,11 @@ namespace Tapeti.Cmd.Serialization
public IEnumerable<Message> Deserialize()
{
using (var file = new StreamReader(path))
using (var reader = new StreamReader(stream, encoding))
{
while (!file.EndOfStream)
while (!reader.EndOfStream)
{
var serialized = file.ReadLine();
var serialized = reader.ReadLine();
if (string.IsNullOrEmpty(serialized))
continue;
@ -61,6 +66,9 @@ namespace Tapeti.Cmd.Serialization
{
if (exportFile.IsValueCreated)
exportFile.Value.Dispose();
if (disposeStream)
stream.Dispose();
}

View File

@ -10,8 +10,8 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="CommandLineParser" Version="2.6.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="CommandLineParser" Version="2.7.82" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="RabbitMQ.Client" Version="5.1.2" />
</ItemGroup>

View File

@ -1 +1,9 @@
dotnet publish -c Release -r win-x64 --self-contained false
mkdir publish
REM Executable is generated using self-contained=true, which is just a wrapper for "dotnet Tapeti.Cmd.dll".
REM We don't need all the other DLL's so we'll build it twice and borrow the wrapper executable for a proper
REM framework-dependant build.
dotnet publish -c Release -r win-x64 --self-contained=true -o .\publish\selfcontained
dotnet publish -c Release -r win-x64 --self-contained=false -o .\publish
copy .\publish\selfcontained\Tapeti.Cmd.exe .\publish\

View File

@ -11,7 +11,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Serilog" Version="2.8.0" />
<PackageReference Include="Serilog" Version="2.9.0" />
</ItemGroup>
<ItemGroup>

View File

@ -7,7 +7,7 @@ The Tapeti command-line tool provides various operations for managing messages.
Common parameters
-----------------
All operations support the following parameters. All are optional.
Most operations support the following parameters. All are optional.
-h <hostname>, --host <hostname>
Specifies the hostname of the RabbitMQ server. Default is localhost.
@ -65,8 +65,14 @@ Import
Read messages from disk as previously exported and publish them to a queue.
-i <source>
*Required*. Path or filename (depending on the chosen serialization method) where the messages will be read from.
-i <source>, --input <source>
Path or filename (depending on the chosen serialization method) where the messages will be read from.
-m <message>, --message <message>
Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input. Be sure to quote the entire message, and escape quotes within the message with another quote.
-c, --pipe
Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.
-e, --exchange
If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.
@ -74,6 +80,7 @@ Read messages from disk as previously exported and publish them to a queue.
-s <method>, --serialization <method>
The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.
Either input, message or pipe is required.
Example:
::
@ -174,4 +181,22 @@ EasyNetQHosepipe
''''''''''''''''
Provides compatibility with the EasyNetQ Hosepipe's dump/insert format. The source or target parameter must be a path. Each message consists of 3 files, ending in .message.txt, .properties.txt and .info.txt.
As this is only provided for emergency situations, see the source code if you want to know more about the format specification.
As this is only provided for emergency situations, see the source code if you want to know more about the format specification.
Generating an example
---------------------
The "example" operation is available to generate an example message in SingleFileJSON format.
::
.\Tapeti.Cmd.exe example
To save the output to a file:
::
.\Tapeti.Cmd.exe example > example.json