Merge branch 'release/2.2'
This commit is contained in:
commit
c6700b2697
@ -1,7 +1,12 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Text;
|
||||
using CommandLine;
|
||||
using CommandLine.Text;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Framing;
|
||||
using Tapeti.Cmd.Commands;
|
||||
using Tapeti.Cmd.Serialization;
|
||||
|
||||
@ -63,8 +68,14 @@ namespace Tapeti.Cmd
|
||||
[Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")]
|
||||
public class ImportOptions : MessageSerializerOptions
|
||||
{
|
||||
[Option('i', "input", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")]
|
||||
public string Input { get; set; }
|
||||
[Option('i', "input", Group = "Input", HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")]
|
||||
public string InputFile { get; set; }
|
||||
|
||||
[Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
|
||||
public string InputMessage { get; set; }
|
||||
|
||||
[Option('c', "pipe", Group = "Input", HelpText = "Messages are read from STDIN, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
|
||||
public bool InputPipe { get; set; }
|
||||
|
||||
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
|
||||
public bool PublishToExchange { get; set; }
|
||||
@ -103,15 +114,21 @@ namespace Tapeti.Cmd
|
||||
}
|
||||
|
||||
|
||||
[Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")]
|
||||
public class ExampleOptions
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static int Main(string[] args)
|
||||
{
|
||||
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions>(args)
|
||||
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions, ExampleOptions>(args)
|
||||
.MapResult(
|
||||
(ExportOptions o) => ExecuteVerb(o, RunExport),
|
||||
(ImportOptions o) => ExecuteVerb(o, RunImport),
|
||||
(ShovelOptions o) => ExecuteVerb(o, RunShovel),
|
||||
(ExampleOptions o) => ExecuteVerb(o, RunExample),
|
||||
errs =>
|
||||
{
|
||||
if (!Debugger.IsAttached)
|
||||
@ -155,15 +172,18 @@ namespace Tapeti.Cmd
|
||||
}
|
||||
|
||||
|
||||
private static IMessageSerializer GetMessageSerializer(MessageSerializerOptions options, string path)
|
||||
private static IMessageSerializer GetMessageSerializer(ImportOptions options)
|
||||
{
|
||||
switch (options.SerializationMethod)
|
||||
{
|
||||
case SerializationMethod.SingleFileJSON:
|
||||
return new SingleFileJSONMessageSerializer(path);
|
||||
return new SingleFileJSONMessageSerializer(GetInputStream(options, out var disposeStream), disposeStream, Encoding.UTF8);
|
||||
|
||||
case SerializationMethod.EasyNetQHosepipe:
|
||||
return new EasyNetQMessageSerializer(path);
|
||||
if (string.IsNullOrEmpty(options.InputFile))
|
||||
throw new ArgumentException("An input path must be provided when using EasyNetQHosepipe serialization");
|
||||
|
||||
return new EasyNetQMessageSerializer(options.InputFile);
|
||||
|
||||
default:
|
||||
throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
|
||||
@ -171,11 +191,56 @@ namespace Tapeti.Cmd
|
||||
}
|
||||
|
||||
|
||||
private static Stream GetInputStream(ImportOptions options, out bool disposeStream)
|
||||
{
|
||||
if (options.InputPipe)
|
||||
{
|
||||
disposeStream = false;
|
||||
return Console.OpenStandardInput();
|
||||
}
|
||||
|
||||
if (!string.IsNullOrEmpty(options.InputMessage))
|
||||
{
|
||||
disposeStream = true;
|
||||
return new MemoryStream(Encoding.UTF8.GetBytes(options.InputMessage));
|
||||
}
|
||||
|
||||
disposeStream = true;
|
||||
return new FileStream(options.InputFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
|
||||
}
|
||||
|
||||
|
||||
private static IMessageSerializer GetMessageSerializer(ExportOptions options)
|
||||
{
|
||||
switch (options.SerializationMethod)
|
||||
{
|
||||
case SerializationMethod.SingleFileJSON:
|
||||
return new SingleFileJSONMessageSerializer(GetOutputStream(options, out var disposeStream), disposeStream, Encoding.UTF8);
|
||||
|
||||
case SerializationMethod.EasyNetQHosepipe:
|
||||
if (string.IsNullOrEmpty(options.OutputPath))
|
||||
throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization");
|
||||
|
||||
return new EasyNetQMessageSerializer(options.OutputPath);
|
||||
|
||||
default:
|
||||
throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static Stream GetOutputStream(ExportOptions options, out bool disposeStream)
|
||||
{
|
||||
disposeStream = true;
|
||||
return new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read);
|
||||
}
|
||||
|
||||
|
||||
private static void RunExport(ExportOptions options)
|
||||
{
|
||||
int messageCount;
|
||||
|
||||
using (var messageSerializer = GetMessageSerializer(options, options.OutputPath))
|
||||
using (var messageSerializer = GetMessageSerializer(options))
|
||||
using (var connection = GetConnection(options))
|
||||
using (var channel = connection.CreateModel())
|
||||
{
|
||||
@ -197,7 +262,7 @@ namespace Tapeti.Cmd
|
||||
{
|
||||
int messageCount;
|
||||
|
||||
using (var messageSerializer = GetMessageSerializer(options, options.Input))
|
||||
using (var messageSerializer = GetMessageSerializer(options))
|
||||
using (var connection = GetConnection(options))
|
||||
using (var channel = connection.CreateModel())
|
||||
{
|
||||
@ -289,5 +354,32 @@ namespace Tapeti.Cmd
|
||||
|
||||
return factory.CreateConnection();
|
||||
}
|
||||
|
||||
|
||||
private static void RunExample(ExampleOptions options)
|
||||
{
|
||||
using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false)))
|
||||
{
|
||||
messageSerializer.Serialize(new Message
|
||||
{
|
||||
Exchange = "example",
|
||||
Queue = "example.queue",
|
||||
RoutingKey = "example.routing.key",
|
||||
DeliveryTag = 42,
|
||||
Properties = new BasicProperties
|
||||
{
|
||||
ContentType = "application/json",
|
||||
DeliveryMode = 2,
|
||||
Headers = new Dictionary<string, object>
|
||||
{
|
||||
{ "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") }
|
||||
},
|
||||
ReplyTo = "reply.queue",
|
||||
Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds())
|
||||
},
|
||||
Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }")
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
16
Tapeti.Cmd/Properties/PublishProfiles/FolderProfile.pubxml
Normal file
16
Tapeti.Cmd/Properties/PublishProfiles/FolderProfile.pubxml
Normal file
@ -0,0 +1,16 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<!--
|
||||
https://go.microsoft.com/fwlink/?LinkID=208121.
|
||||
-->
|
||||
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
||||
<PropertyGroup>
|
||||
<PublishProtocol>FileSystem</PublishProtocol>
|
||||
<Configuration>Release</Configuration>
|
||||
<Platform>Any CPU</Platform>
|
||||
<TargetFramework>netcoreapp2.1</TargetFramework>
|
||||
<PublishDir>bin\Release\netcoreapp2.1\publish\</PublishDir>
|
||||
<RuntimeIdentifier>win-x64</RuntimeIdentifier>
|
||||
<SelfContained>false</SelfContained>
|
||||
<_IsPortable>true</_IsPortable>
|
||||
</PropertyGroup>
|
||||
</Project>
|
@ -11,7 +11,9 @@ namespace Tapeti.Cmd.Serialization
|
||||
{
|
||||
public class SingleFileJSONMessageSerializer : IMessageSerializer
|
||||
{
|
||||
private readonly string path;
|
||||
private readonly Stream stream;
|
||||
private readonly bool disposeStream;
|
||||
private readonly Encoding encoding;
|
||||
|
||||
|
||||
private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings
|
||||
@ -22,10 +24,13 @@ namespace Tapeti.Cmd.Serialization
|
||||
private readonly Lazy<StreamWriter> exportFile;
|
||||
|
||||
|
||||
public SingleFileJSONMessageSerializer(string path)
|
||||
public SingleFileJSONMessageSerializer(Stream stream, bool disposeStream, Encoding encoding)
|
||||
{
|
||||
this.path = path;
|
||||
exportFile = new Lazy<StreamWriter>(() => new StreamWriter(path, false, Encoding.UTF8));
|
||||
this.stream = stream;
|
||||
this.disposeStream = disposeStream;
|
||||
this.encoding = encoding;
|
||||
|
||||
exportFile = new Lazy<StreamWriter>(() => new StreamWriter(stream, encoding));
|
||||
}
|
||||
|
||||
|
||||
@ -39,11 +44,11 @@ namespace Tapeti.Cmd.Serialization
|
||||
|
||||
public IEnumerable<Message> Deserialize()
|
||||
{
|
||||
using (var file = new StreamReader(path))
|
||||
using (var reader = new StreamReader(stream, encoding))
|
||||
{
|
||||
while (!file.EndOfStream)
|
||||
while (!reader.EndOfStream)
|
||||
{
|
||||
var serialized = file.ReadLine();
|
||||
var serialized = reader.ReadLine();
|
||||
if (string.IsNullOrEmpty(serialized))
|
||||
continue;
|
||||
|
||||
@ -61,6 +66,9 @@ namespace Tapeti.Cmd.Serialization
|
||||
{
|
||||
if (exportFile.IsValueCreated)
|
||||
exportFile.Value.Dispose();
|
||||
|
||||
if (disposeStream)
|
||||
stream.Dispose();
|
||||
}
|
||||
|
||||
|
||||
|
@ -10,8 +10,8 @@
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="CommandLineParser" Version="2.6.0" />
|
||||
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
|
||||
<PackageReference Include="CommandLineParser" Version="2.7.82" />
|
||||
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
|
||||
<PackageReference Include="RabbitMQ.Client" Version="5.1.2" />
|
||||
</ItemGroup>
|
||||
|
||||
|
@ -1 +1,9 @@
|
||||
dotnet publish -c Release -r win-x64 --self-contained false
|
||||
mkdir publish
|
||||
|
||||
REM Executable is generated using self-contained=true, which is just a wrapper for "dotnet Tapeti.Cmd.dll".
|
||||
REM We don't need all the other DLL's so we'll build it twice and borrow the wrapper executable for a proper
|
||||
REM framework-dependant build.
|
||||
dotnet publish -c Release -r win-x64 --self-contained=true -o .\publish\selfcontained
|
||||
dotnet publish -c Release -r win-x64 --self-contained=false -o .\publish
|
||||
|
||||
copy .\publish\selfcontained\Tapeti.Cmd.exe .\publish\
|
@ -11,7 +11,7 @@
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Serilog" Version="2.8.0" />
|
||||
<PackageReference Include="Serilog" Version="2.9.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
@ -50,6 +50,15 @@ namespace Tapeti.Config
|
||||
/// already be present when the connection is made.
|
||||
/// </summary>
|
||||
bool DeclareDurableQueues { get; }
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled.
|
||||
/// Defaults to true. Disable if you have queues with additional properties like a deadletter
|
||||
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
|
||||
/// while verifying.
|
||||
/// </summary>
|
||||
bool VerifyDurableQueues { get; }
|
||||
}
|
||||
|
||||
|
||||
|
@ -92,6 +92,24 @@ namespace Tapeti.Config
|
||||
/// feature to work, since AMQP does not provide a way to query existing bindings.
|
||||
/// </remarks>
|
||||
ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled);
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Disables verifying durable queues at startup. Enabled by default.
|
||||
/// Disable if you have queues with additional properties like a deadletter
|
||||
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
|
||||
/// while verifying.
|
||||
/// </summary>
|
||||
ITapetiConfigBuilder DisableVerifyDurableQueues();
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled.
|
||||
/// Defaults to true. Disable if you have queues with additional properties like a deadletter
|
||||
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
|
||||
/// while verifying.
|
||||
/// </summary>
|
||||
ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled);
|
||||
}
|
||||
|
||||
|
||||
|
@ -24,7 +24,7 @@ namespace Tapeti.Connection
|
||||
internal class TapetiClient : ITapetiClient
|
||||
{
|
||||
private const int ReconnectDelay = 5000;
|
||||
private const int MandatoryReturnTimeout = 30000;
|
||||
private const int MandatoryReturnTimeout = 300000;
|
||||
private const int MinimumConnectedReconnectDelay = 1000;
|
||||
|
||||
private readonly TapetiConnectionParams connectionParams;
|
||||
|
@ -97,9 +97,14 @@ namespace Tapeti.Connection
|
||||
var routingKeyStrategy = config.DependencyResolver.Resolve<IRoutingKeyStrategy>();
|
||||
var exchangeStrategy = config.DependencyResolver.Resolve<IExchangeStrategy>();
|
||||
|
||||
var bindingTarget = config.Features.DeclareDurableQueues
|
||||
? (CustomBindingTarget)new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
|
||||
: new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
|
||||
CustomBindingTarget bindingTarget;
|
||||
|
||||
if (config.Features.DeclareDurableQueues)
|
||||
bindingTarget = new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
|
||||
else if (config.Features.VerifyDurableQueues)
|
||||
bindingTarget = new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
|
||||
else
|
||||
bindingTarget = new NoVerifyBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
|
||||
|
||||
await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget)));
|
||||
await bindingTarget.Apply();
|
||||
@ -358,5 +363,29 @@ namespace Tapeti.Connection
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class NoVerifyBindingTarget : CustomBindingTarget
|
||||
{
|
||||
public NoVerifyBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
public override Task BindDurable(Type messageClass, string queueName)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public override Task BindDurableDirect(string queueName)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public override Task BindDurableObsolete(string queueName)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -167,6 +167,22 @@ namespace Tapeti
|
||||
}
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
public ITapetiConfigBuilder DisableVerifyDurableQueues()
|
||||
{
|
||||
GetConfig().SetVerifyDurableQueues(false);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
public ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled)
|
||||
{
|
||||
GetConfig().SetVerifyDurableQueues(enabled);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Registers the default implementation of various Tapeti interfaces into the IoC container.
|
||||
/// </summary>
|
||||
@ -256,13 +272,19 @@ namespace Tapeti
|
||||
{
|
||||
features.DeclareDurableQueues = enabled;
|
||||
}
|
||||
|
||||
public void SetVerifyDurableQueues(bool enabled)
|
||||
{
|
||||
features.VerifyDurableQueues = enabled;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
internal class ConfigFeatures : ITapetiConfigFeatues
|
||||
{
|
||||
public bool PublisherConfirms { get; internal set; } = true;
|
||||
public bool DeclareDurableQueues { get; internal set; } = true;
|
||||
public bool DeclareDurableQueues { get; internal set; }
|
||||
public bool VerifyDurableQueues { get; internal set; } = true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -7,7 +7,7 @@ The Tapeti command-line tool provides various operations for managing messages.
|
||||
Common parameters
|
||||
-----------------
|
||||
|
||||
All operations support the following parameters. All are optional.
|
||||
Most operations support the following parameters. All are optional.
|
||||
|
||||
-h <hostname>, --host <hostname>
|
||||
Specifies the hostname of the RabbitMQ server. Default is localhost.
|
||||
@ -65,8 +65,14 @@ Import
|
||||
|
||||
Read messages from disk as previously exported and publish them to a queue.
|
||||
|
||||
-i <source>
|
||||
*Required*. Path or filename (depending on the chosen serialization method) where the messages will be read from.
|
||||
-i <source>, --input <source>
|
||||
Path or filename (depending on the chosen serialization method) where the messages will be read from.
|
||||
|
||||
-m <message>, --message <message>
|
||||
Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input. Be sure to quote the entire message, and escape quotes within the message with another quote.
|
||||
|
||||
-c, --pipe
|
||||
Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.
|
||||
|
||||
-e, --exchange
|
||||
If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.
|
||||
@ -74,6 +80,7 @@ Read messages from disk as previously exported and publish them to a queue.
|
||||
-s <method>, --serialization <method>
|
||||
The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.
|
||||
|
||||
Either input, message or pipe is required.
|
||||
|
||||
Example:
|
||||
::
|
||||
@ -175,3 +182,21 @@ EasyNetQHosepipe
|
||||
Provides compatibility with the EasyNetQ Hosepipe's dump/insert format. The source or target parameter must be a path. Each message consists of 3 files, ending in .message.txt, .properties.txt and .info.txt.
|
||||
|
||||
As this is only provided for emergency situations, see the source code if you want to know more about the format specification.
|
||||
|
||||
|
||||
|
||||
Generating an example
|
||||
---------------------
|
||||
|
||||
The "example" operation is available to generate an example message in SingleFileJSON format.
|
||||
|
||||
::
|
||||
|
||||
.\Tapeti.Cmd.exe example
|
||||
|
||||
|
||||
To save the output to a file:
|
||||
|
||||
::
|
||||
|
||||
.\Tapeti.Cmd.exe example > example.json
|
Loading…
Reference in New Issue
Block a user