1
0
mirror of synced 2024-11-22 01:13:49 +00:00

Merge branch 'release/2.2' into develop

This commit is contained in:
Frederik 2020-06-09 09:18:27 +02:00
commit 97c61eb276
12 changed files with 255 additions and 28 deletions

View File

@ -1,7 +1,12 @@
using System; using System;
using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.IO;
using System.Text;
using CommandLine; using CommandLine;
using CommandLine.Text;
using RabbitMQ.Client; using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
using Tapeti.Cmd.Commands; using Tapeti.Cmd.Commands;
using Tapeti.Cmd.Serialization; using Tapeti.Cmd.Serialization;
@ -63,8 +68,14 @@ namespace Tapeti.Cmd
[Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")] [Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")]
public class ImportOptions : MessageSerializerOptions public class ImportOptions : MessageSerializerOptions
{ {
[Option('i', "input", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")] [Option('i', "input", Group = "Input", HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")]
public string Input { get; set; } public string InputFile { get; set; }
[Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public string InputMessage { get; set; }
[Option('c', "pipe", Group = "Input", HelpText = "Messages are read from STDIN, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public bool InputPipe { get; set; }
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
public bool PublishToExchange { get; set; } public bool PublishToExchange { get; set; }
@ -103,15 +114,21 @@ namespace Tapeti.Cmd
} }
[Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")]
public class ExampleOptions
{
}
public static int Main(string[] args) public static int Main(string[] args)
{ {
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions>(args) return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions, ExampleOptions>(args)
.MapResult( .MapResult(
(ExportOptions o) => ExecuteVerb(o, RunExport), (ExportOptions o) => ExecuteVerb(o, RunExport),
(ImportOptions o) => ExecuteVerb(o, RunImport), (ImportOptions o) => ExecuteVerb(o, RunImport),
(ShovelOptions o) => ExecuteVerb(o, RunShovel), (ShovelOptions o) => ExecuteVerb(o, RunShovel),
(ExampleOptions o) => ExecuteVerb(o, RunExample),
errs => errs =>
{ {
if (!Debugger.IsAttached) if (!Debugger.IsAttached)
@ -155,15 +172,18 @@ namespace Tapeti.Cmd
} }
private static IMessageSerializer GetMessageSerializer(MessageSerializerOptions options, string path) private static IMessageSerializer GetMessageSerializer(ImportOptions options)
{ {
switch (options.SerializationMethod) switch (options.SerializationMethod)
{ {
case SerializationMethod.SingleFileJSON: case SerializationMethod.SingleFileJSON:
return new SingleFileJSONMessageSerializer(path); return new SingleFileJSONMessageSerializer(GetInputStream(options, out var disposeStream), disposeStream, Encoding.UTF8);
case SerializationMethod.EasyNetQHosepipe: case SerializationMethod.EasyNetQHosepipe:
return new EasyNetQMessageSerializer(path); if (string.IsNullOrEmpty(options.InputFile))
throw new ArgumentException("An input path must be provided when using EasyNetQHosepipe serialization");
return new EasyNetQMessageSerializer(options.InputFile);
default: default:
throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
@ -171,11 +191,56 @@ namespace Tapeti.Cmd
} }
private static Stream GetInputStream(ImportOptions options, out bool disposeStream)
{
if (options.InputPipe)
{
disposeStream = false;
return Console.OpenStandardInput();
}
if (!string.IsNullOrEmpty(options.InputMessage))
{
disposeStream = true;
return new MemoryStream(Encoding.UTF8.GetBytes(options.InputMessage));
}
disposeStream = true;
return new FileStream(options.InputFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
}
private static IMessageSerializer GetMessageSerializer(ExportOptions options)
{
switch (options.SerializationMethod)
{
case SerializationMethod.SingleFileJSON:
return new SingleFileJSONMessageSerializer(GetOutputStream(options, out var disposeStream), disposeStream, Encoding.UTF8);
case SerializationMethod.EasyNetQHosepipe:
if (string.IsNullOrEmpty(options.OutputPath))
throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization");
return new EasyNetQMessageSerializer(options.OutputPath);
default:
throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
}
}
private static Stream GetOutputStream(ExportOptions options, out bool disposeStream)
{
disposeStream = true;
return new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read);
}
private static void RunExport(ExportOptions options) private static void RunExport(ExportOptions options)
{ {
int messageCount; int messageCount;
using (var messageSerializer = GetMessageSerializer(options, options.OutputPath)) using (var messageSerializer = GetMessageSerializer(options))
using (var connection = GetConnection(options)) using (var connection = GetConnection(options))
using (var channel = connection.CreateModel()) using (var channel = connection.CreateModel())
{ {
@ -197,7 +262,7 @@ namespace Tapeti.Cmd
{ {
int messageCount; int messageCount;
using (var messageSerializer = GetMessageSerializer(options, options.Input)) using (var messageSerializer = GetMessageSerializer(options))
using (var connection = GetConnection(options)) using (var connection = GetConnection(options))
using (var channel = connection.CreateModel()) using (var channel = connection.CreateModel())
{ {
@ -289,5 +354,32 @@ namespace Tapeti.Cmd
return factory.CreateConnection(); return factory.CreateConnection();
} }
private static void RunExample(ExampleOptions options)
{
using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false)))
{
messageSerializer.Serialize(new Message
{
Exchange = "example",
Queue = "example.queue",
RoutingKey = "example.routing.key",
DeliveryTag = 42,
Properties = new BasicProperties
{
ContentType = "application/json",
DeliveryMode = 2,
Headers = new Dictionary<string, object>
{
{ "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") }
},
ReplyTo = "reply.queue",
Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds())
},
Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }")
});
}
}
} }
} }

View File

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
https://go.microsoft.com/fwlink/?LinkID=208121.
-->
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<PublishProtocol>FileSystem</PublishProtocol>
<Configuration>Release</Configuration>
<Platform>Any CPU</Platform>
<TargetFramework>netcoreapp2.1</TargetFramework>
<PublishDir>bin\Release\netcoreapp2.1\publish\</PublishDir>
<RuntimeIdentifier>win-x64</RuntimeIdentifier>
<SelfContained>false</SelfContained>
<_IsPortable>true</_IsPortable>
</PropertyGroup>
</Project>

View File

@ -11,7 +11,9 @@ namespace Tapeti.Cmd.Serialization
{ {
public class SingleFileJSONMessageSerializer : IMessageSerializer public class SingleFileJSONMessageSerializer : IMessageSerializer
{ {
private readonly string path; private readonly Stream stream;
private readonly bool disposeStream;
private readonly Encoding encoding;
private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings
@ -22,10 +24,13 @@ namespace Tapeti.Cmd.Serialization
private readonly Lazy<StreamWriter> exportFile; private readonly Lazy<StreamWriter> exportFile;
public SingleFileJSONMessageSerializer(string path) public SingleFileJSONMessageSerializer(Stream stream, bool disposeStream, Encoding encoding)
{ {
this.path = path; this.stream = stream;
exportFile = new Lazy<StreamWriter>(() => new StreamWriter(path, false, Encoding.UTF8)); this.disposeStream = disposeStream;
this.encoding = encoding;
exportFile = new Lazy<StreamWriter>(() => new StreamWriter(stream, encoding));
} }
@ -39,11 +44,11 @@ namespace Tapeti.Cmd.Serialization
public IEnumerable<Message> Deserialize() public IEnumerable<Message> Deserialize()
{ {
using (var file = new StreamReader(path)) using (var reader = new StreamReader(stream, encoding))
{ {
while (!file.EndOfStream) while (!reader.EndOfStream)
{ {
var serialized = file.ReadLine(); var serialized = reader.ReadLine();
if (string.IsNullOrEmpty(serialized)) if (string.IsNullOrEmpty(serialized))
continue; continue;
@ -61,6 +66,9 @@ namespace Tapeti.Cmd.Serialization
{ {
if (exportFile.IsValueCreated) if (exportFile.IsValueCreated)
exportFile.Value.Dispose(); exportFile.Value.Dispose();
if (disposeStream)
stream.Dispose();
} }

View File

@ -10,8 +10,8 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="CommandLineParser" Version="2.6.0" /> <PackageReference Include="CommandLineParser" Version="2.7.82" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="RabbitMQ.Client" Version="5.1.2" /> <PackageReference Include="RabbitMQ.Client" Version="5.1.2" />
</ItemGroup> </ItemGroup>

View File

@ -1 +1,9 @@
dotnet publish -c Release -r win-x64 --self-contained false mkdir publish
REM Executable is generated using self-contained=true, which is just a wrapper for "dotnet Tapeti.Cmd.dll".
REM We don't need all the other DLL's so we'll build it twice and borrow the wrapper executable for a proper
REM framework-dependant build.
dotnet publish -c Release -r win-x64 --self-contained=true -o .\publish\selfcontained
dotnet publish -c Release -r win-x64 --self-contained=false -o .\publish
copy .\publish\selfcontained\Tapeti.Cmd.exe .\publish\

View File

@ -11,7 +11,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Serilog" Version="2.8.0" /> <PackageReference Include="Serilog" Version="2.9.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -50,6 +50,15 @@ namespace Tapeti.Config
/// already be present when the connection is made. /// already be present when the connection is made.
/// </summary> /// </summary>
bool DeclareDurableQueues { get; } bool DeclareDurableQueues { get; }
/// <summary>
/// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled.
/// Defaults to true. Disable if you have queues with additional properties like a deadletter
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
/// while verifying.
/// </summary>
bool VerifyDurableQueues { get; }
} }

View File

@ -92,6 +92,24 @@ namespace Tapeti.Config
/// feature to work, since AMQP does not provide a way to query existing bindings. /// feature to work, since AMQP does not provide a way to query existing bindings.
/// </remarks> /// </remarks>
ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled); ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled);
/// <summary>
/// Disables verifying durable queues at startup. Enabled by default.
/// Disable if you have queues with additional properties like a deadletter
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
/// while verifying.
/// </summary>
ITapetiConfigBuilder DisableVerifyDurableQueues();
/// <summary>
/// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled.
/// Defaults to true. Disable if you have queues with additional properties like a deadletter
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
/// while verifying.
/// </summary>
ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled);
} }

View File

@ -24,7 +24,7 @@ namespace Tapeti.Connection
internal class TapetiClient : ITapetiClient internal class TapetiClient : ITapetiClient
{ {
private const int ReconnectDelay = 5000; private const int ReconnectDelay = 5000;
private const int MandatoryReturnTimeout = 30000; private const int MandatoryReturnTimeout = 300000;
private const int MinimumConnectedReconnectDelay = 1000; private const int MinimumConnectedReconnectDelay = 1000;
private readonly TapetiConnectionParams connectionParams; private readonly TapetiConnectionParams connectionParams;

View File

@ -97,9 +97,14 @@ namespace Tapeti.Connection
var routingKeyStrategy = config.DependencyResolver.Resolve<IRoutingKeyStrategy>(); var routingKeyStrategy = config.DependencyResolver.Resolve<IRoutingKeyStrategy>();
var exchangeStrategy = config.DependencyResolver.Resolve<IExchangeStrategy>(); var exchangeStrategy = config.DependencyResolver.Resolve<IExchangeStrategy>();
var bindingTarget = config.Features.DeclareDurableQueues CustomBindingTarget bindingTarget;
? (CustomBindingTarget)new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
: new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); if (config.Features.DeclareDurableQueues)
bindingTarget = new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
else if (config.Features.VerifyDurableQueues)
bindingTarget = new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
else
bindingTarget = new NoVerifyBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget))); await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget)));
await bindingTarget.Apply(); await bindingTarget.Apply();
@ -358,5 +363,29 @@ namespace Tapeti.Connection
} }
} }
} }
private class NoVerifyBindingTarget : CustomBindingTarget
{
public NoVerifyBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
{
}
public override Task BindDurable(Type messageClass, string queueName)
{
return Task.CompletedTask;
}
public override Task BindDurableDirect(string queueName)
{
return Task.CompletedTask;
}
public override Task BindDurableObsolete(string queueName)
{
return Task.CompletedTask;
}
}
} }
} }

View File

@ -167,6 +167,22 @@ namespace Tapeti
} }
/// <inheritdoc />
public ITapetiConfigBuilder DisableVerifyDurableQueues()
{
GetConfig().SetVerifyDurableQueues(false);
return this;
}
/// <inheritdoc />
public ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled)
{
GetConfig().SetVerifyDurableQueues(enabled);
return this;
}
/// <summary> /// <summary>
/// Registers the default implementation of various Tapeti interfaces into the IoC container. /// Registers the default implementation of various Tapeti interfaces into the IoC container.
/// </summary> /// </summary>
@ -256,13 +272,19 @@ namespace Tapeti
{ {
features.DeclareDurableQueues = enabled; features.DeclareDurableQueues = enabled;
} }
public void SetVerifyDurableQueues(bool enabled)
{
features.VerifyDurableQueues = enabled;
}
} }
internal class ConfigFeatures : ITapetiConfigFeatues internal class ConfigFeatures : ITapetiConfigFeatues
{ {
public bool PublisherConfirms { get; internal set; } = true; public bool PublisherConfirms { get; internal set; } = true;
public bool DeclareDurableQueues { get; internal set; } = true; public bool DeclareDurableQueues { get; internal set; }
public bool VerifyDurableQueues { get; internal set; } = true;
} }

View File

@ -7,7 +7,7 @@ The Tapeti command-line tool provides various operations for managing messages.
Common parameters Common parameters
----------------- -----------------
All operations support the following parameters. All are optional. Most operations support the following parameters. All are optional.
-h <hostname>, --host <hostname> -h <hostname>, --host <hostname>
Specifies the hostname of the RabbitMQ server. Default is localhost. Specifies the hostname of the RabbitMQ server. Default is localhost.
@ -65,8 +65,14 @@ Import
Read messages from disk as previously exported and publish them to a queue. Read messages from disk as previously exported and publish them to a queue.
-i <source> -i <source>, --input <source>
*Required*. Path or filename (depending on the chosen serialization method) where the messages will be read from. Path or filename (depending on the chosen serialization method) where the messages will be read from.
-m <message>, --message <message>
Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input. Be sure to quote the entire message, and escape quotes within the message with another quote.
-c, --pipe
Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.
-e, --exchange -e, --exchange
If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue. If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.
@ -74,6 +80,7 @@ Read messages from disk as previously exported and publish them to a queue.
-s <method>, --serialization <method> -s <method>, --serialization <method>
The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information. The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.
Either input, message or pipe is required.
Example: Example:
:: ::
@ -175,3 +182,21 @@ EasyNetQHosepipe
Provides compatibility with the EasyNetQ Hosepipe's dump/insert format. The source or target parameter must be a path. Each message consists of 3 files, ending in .message.txt, .properties.txt and .info.txt. Provides compatibility with the EasyNetQ Hosepipe's dump/insert format. The source or target parameter must be a path. Each message consists of 3 files, ending in .message.txt, .properties.txt and .info.txt.
As this is only provided for emergency situations, see the source code if you want to know more about the format specification. As this is only provided for emergency situations, see the source code if you want to know more about the format specification.
Generating an example
---------------------
The "example" operation is available to generate an example message in SingleFileJSON format.
::
.\Tapeti.Cmd.exe example
To save the output to a file:
::
.\Tapeti.Cmd.exe example > example.json