Merge branch 'feature/stateless-requestresponse' into develop
This commit is contained in:
commit
7b8f396607
@ -0,0 +1,20 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>netcoreapp2.1</TargetFramework>
|
||||
<RootNamespace>_06_StatelessRequestResponse</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="SimpleInjector" Version="4.9.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Examples\ExampleLib\ExampleLib.csproj" />
|
||||
<ProjectReference Include="..\Examples\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
|
||||
<ProjectReference Include="..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
|
||||
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
28
06-StatelessRequestResponse/ExampleMessageController.cs
Normal file
28
06-StatelessRequestResponse/ExampleMessageController.cs
Normal file
@ -0,0 +1,28 @@
|
||||
using System;
|
||||
using ExampleLib;
|
||||
using Messaging.TapetiExample;
|
||||
using Tapeti.Annotations;
|
||||
|
||||
namespace _06_StatelessRequestResponse
|
||||
{
|
||||
[MessageController]
|
||||
[DynamicQueue("tapeti.example.06")]
|
||||
public class ExampleMessageController
|
||||
{
|
||||
private readonly IExampleState exampleState;
|
||||
|
||||
|
||||
public ExampleMessageController(IExampleState exampleState)
|
||||
{
|
||||
this.exampleState = exampleState;
|
||||
}
|
||||
|
||||
|
||||
[ResponseHandler]
|
||||
public void HandleQuoteResponse(QuoteResponseMessage message)
|
||||
{
|
||||
Console.WriteLine("Received response: " + message.Quote);
|
||||
exampleState.Done();
|
||||
}
|
||||
}
|
||||
}
|
51
06-StatelessRequestResponse/Program.cs
Normal file
51
06-StatelessRequestResponse/Program.cs
Normal file
@ -0,0 +1,51 @@
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using ExampleLib;
|
||||
using Messaging.TapetiExample;
|
||||
using SimpleInjector;
|
||||
using Tapeti;
|
||||
using Tapeti.DataAnnotations;
|
||||
using Tapeti.Default;
|
||||
using Tapeti.SimpleInjector;
|
||||
|
||||
namespace _06_StatelessRequestResponse
|
||||
{
|
||||
public class Program
|
||||
{
|
||||
public static void Main(string[] args)
|
||||
{
|
||||
var container = new Container();
|
||||
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
|
||||
|
||||
container.Register<ILogger, ConsoleLogger>();
|
||||
|
||||
var helper = new ExampleConsoleApp(dependencyResolver);
|
||||
helper.Run(MainAsync);
|
||||
}
|
||||
|
||||
|
||||
internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func<Task> waitForDone)
|
||||
{
|
||||
var config = new TapetiConfig(dependencyResolver)
|
||||
.WithDataAnnotations()
|
||||
.RegisterAllControllers()
|
||||
.Build();
|
||||
|
||||
|
||||
using (var connection = new TapetiConnection(config))
|
||||
{
|
||||
await connection.Subscribe();
|
||||
|
||||
var publisher = dependencyResolver.Resolve<IPublisher>();
|
||||
await publisher.PublishRequest<ExampleMessageController, QuoteRequestMessage, QuoteResponseMessage>(
|
||||
new QuoteRequestMessage
|
||||
{
|
||||
Amount = 1
|
||||
},
|
||||
c => c.HandleQuoteResponse);
|
||||
|
||||
await waitForDone();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
38
06-StatelessRequestResponse/ReceivingMessageController.cs
Normal file
38
06-StatelessRequestResponse/ReceivingMessageController.cs
Normal file
@ -0,0 +1,38 @@
|
||||
using Messaging.TapetiExample;
|
||||
using Tapeti.Annotations;
|
||||
|
||||
namespace _06_StatelessRequestResponse
|
||||
{
|
||||
[MessageController]
|
||||
[DynamicQueue("tapeti.example.06.receiver")]
|
||||
public class ReceivingMessageController
|
||||
{
|
||||
// No publisher required, responses can simply be returned
|
||||
public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message)
|
||||
{
|
||||
string quote;
|
||||
|
||||
switch (message.Amount)
|
||||
{
|
||||
case 1:
|
||||
// Well, they asked for it... :-)
|
||||
quote = "'";
|
||||
break;
|
||||
|
||||
case 2:
|
||||
quote = "\"";
|
||||
break;
|
||||
|
||||
default:
|
||||
// We have to return a response.
|
||||
quote = null;
|
||||
break;
|
||||
}
|
||||
|
||||
return new QuoteResponseMessage
|
||||
{
|
||||
Quote = quote
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
14
Tapeti.Annotations/ResponseHandlerAttribute.cs
Normal file
14
Tapeti.Annotations/ResponseHandlerAttribute.cs
Normal file
@ -0,0 +1,14 @@
|
||||
using System;
|
||||
|
||||
namespace Tapeti.Annotations
|
||||
{
|
||||
/// <inheritdoc />
|
||||
/// <summary>
|
||||
/// Indicates that the method only handles response messages which are sent directly
|
||||
/// to the queue. No binding will be created.
|
||||
/// </summary>
|
||||
[AttributeUsage(AttributeTargets.Method)]
|
||||
public class ResponseHandlerAttribute : Attribute
|
||||
{
|
||||
}
|
||||
}
|
@ -2,7 +2,7 @@
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>netcoreapp2.2</TargetFramework>
|
||||
<TargetFramework>netcoreapp2.1</TargetFramework>
|
||||
<Version>2.0.0</Version>
|
||||
<Authors>Mark van Renswoude</Authors>
|
||||
<Company>Mark van Renswoude</Company>
|
||||
|
@ -6,16 +6,18 @@ using ISerilogLogger = Serilog.ILogger;
|
||||
|
||||
namespace Tapeti.Serilog
|
||||
{
|
||||
/// <inheritdoc />
|
||||
/// <summary>
|
||||
/// Implements the Tapeti ILogger interface for Serilog output.
|
||||
/// </summary>
|
||||
public class TapetiSeriLogger: ILogger
|
||||
public class TapetiSeriLogger: IBindingLogger
|
||||
{
|
||||
private readonly ISerilogLogger seriLogger;
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
/// <summary>
|
||||
/// Create a Tapeti ILogger implementation to output to the specified Serilog.ILogger interface
|
||||
/// </summary>
|
||||
/// <param name="seriLogger">The Serilog.ILogger implementation to output Tapeti log message to</param>
|
||||
public TapetiSeriLogger(ISerilogLogger seriLogger)
|
||||
{
|
||||
this.seriLogger = seriLogger;
|
||||
@ -82,6 +84,39 @@ namespace Tapeti.Serilog
|
||||
contextLogger.Error(exception, "Tapeti: exception in message handler");
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void QueueDeclare(string queueName, bool durable, bool passive)
|
||||
{
|
||||
if (passive)
|
||||
seriLogger.Information("Tapeti: verifying durable queue {queueName}", queueName);
|
||||
else
|
||||
seriLogger.Information("Tapeti: declaring {queueType} queue {queueName}", durable ? "durable" : "dynamic", queueName);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
|
||||
{
|
||||
seriLogger.Information("Tapeti: binding {queueName} to exchange {exchange} with routing key {routingKey}",
|
||||
queueName,
|
||||
exchange,
|
||||
routingKey);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void QueueUnbind(string queueName, string exchange, string routingKey)
|
||||
{
|
||||
seriLogger.Information("Tapeti: removing binding for {queueName} to exchange {exchange} with routing key {routingKey}",
|
||||
queueName,
|
||||
exchange,
|
||||
routingKey);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void ExchangeDeclare(string exchange)
|
||||
{
|
||||
seriLogger.Information("Tapeti: declaring exchange {exchange}", exchange);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void QueueObsolete(string queueName, bool deleted, uint messageCount)
|
||||
{
|
||||
|
@ -55,7 +55,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Ninject", "Tapeti.Ni
|
||||
EndProject
|
||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tools", "Tools", "{62002327-46B0-4B72-B95A-594CE7F8C80D}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}"
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
@ -151,6 +153,10 @@ Global
|
||||
{C8728BFC-7F97-41BC-956B-690A57B634EC}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{C8728BFC-7F97-41BC-956B-690A57B634EC}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{C8728BFC-7F97-41BC-956B-690A57B634EC}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{152227AA-3165-4550-8997-6EA80C84516E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{152227AA-3165-4550-8997-6EA80C84516E}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
@ -177,6 +183,7 @@ Global
|
||||
{BA8CA9A2-BAFF-42BB-8439-3DD9D1F6C32E} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F}
|
||||
{29478B10-FC53-4E93-ADEF-A775D9408131} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F}
|
||||
{C8728BFC-7F97-41BC-956B-690A57B634EC} = {62002327-46B0-4B72-B95A-594CE7F8C80D}
|
||||
{152227AA-3165-4550-8997-6EA80C84516E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
|
||||
EndGlobalSection
|
||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||
SolutionGuid = {B09CC2BF-B2AF-4CB6-8728-5D1D8E5C50FA}
|
||||
|
@ -1,5 +1,6 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Reflection;
|
||||
|
||||
namespace Tapeti.Config
|
||||
{
|
||||
@ -81,6 +82,13 @@ namespace Tapeti.Config
|
||||
/// <param name="method"></param>
|
||||
/// <returns>The binding if found, null otherwise</returns>
|
||||
IControllerMethodBinding ForMethod(Delegate method);
|
||||
|
||||
/// <summary>
|
||||
/// Searches for a binding linked to the specified method.
|
||||
/// </summary>
|
||||
/// <param name="method"></param>
|
||||
/// <returns>The binding if found, null otherwise</returns>
|
||||
IControllerMethodBinding ForMethod(MethodInfo method);
|
||||
}
|
||||
|
||||
|
||||
|
@ -237,22 +237,29 @@ namespace Tapeti.Connection
|
||||
{
|
||||
var existingBindings = (await GetQueueBindings(queueName)).ToList();
|
||||
var currentBindings = bindings.ToList();
|
||||
var bindingLogger = logger as IBindingLogger;
|
||||
|
||||
await Queue(channel =>
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
return;
|
||||
|
||||
bindingLogger?.QueueDeclare(queueName, true, false);
|
||||
channel.QueueDeclare(queueName, true, false, false);
|
||||
|
||||
|
||||
foreach (var binding in currentBindings.Except(existingBindings))
|
||||
{
|
||||
DeclareExchange(channel, binding.Exchange);
|
||||
bindingLogger?.QueueBind(queueName, true, binding.Exchange, binding.RoutingKey);
|
||||
channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
|
||||
}
|
||||
|
||||
foreach (var deletedBinding in existingBindings.Except(currentBindings))
|
||||
{
|
||||
bindingLogger?.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey);
|
||||
channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -264,6 +271,7 @@ namespace Tapeti.Connection
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
return;
|
||||
|
||||
(logger as IBindingLogger)?.QueueDeclare(queueName, true, true);
|
||||
channel.QueueDeclarePassive(queueName);
|
||||
});
|
||||
}
|
||||
@ -285,7 +293,7 @@ namespace Tapeti.Connection
|
||||
});
|
||||
|
||||
deletedQueues.Add(queueName);
|
||||
logger.QueueObsolete(queueName, true, deletedMessages);
|
||||
(logger as IBindingLogger)?.QueueObsolete(queueName, true, deletedMessages);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -321,7 +329,7 @@ namespace Tapeti.Connection
|
||||
channel.QueueDelete(queueName, false, true);
|
||||
|
||||
deletedQueues.Add(queueName);
|
||||
logger.QueueObsolete(queueName, true, 0);
|
||||
(logger as IBindingLogger)?.QueueObsolete(queueName, true, 0);
|
||||
}
|
||||
catch (OperationInterruptedException e)
|
||||
{
|
||||
@ -344,7 +352,7 @@ namespace Tapeti.Connection
|
||||
channel.QueueUnbind(queueName, binding.Exchange, binding.RoutingKey);
|
||||
}
|
||||
|
||||
logger.QueueObsolete(queueName, false, queueInfo.Messages);
|
||||
(logger as IBindingLogger)?.QueueObsolete(queueName, false, queueInfo.Messages);
|
||||
}
|
||||
} while (retry);
|
||||
});
|
||||
@ -355,6 +363,7 @@ namespace Tapeti.Connection
|
||||
public async Task<string> DynamicQueueDeclare(CancellationToken cancellationToken, string queuePrefix = null)
|
||||
{
|
||||
string queueName = null;
|
||||
var bindingLogger = logger as IBindingLogger;
|
||||
|
||||
await Queue(channel =>
|
||||
{
|
||||
@ -364,10 +373,14 @@ namespace Tapeti.Connection
|
||||
if (!string.IsNullOrEmpty(queuePrefix))
|
||||
{
|
||||
queueName = queuePrefix + "." + Guid.NewGuid().ToString("N");
|
||||
bindingLogger?.QueueDeclare(queueName, false, false);
|
||||
channel.QueueDeclare(queueName);
|
||||
}
|
||||
else
|
||||
{
|
||||
queueName = channel.QueueDeclare().QueueName;
|
||||
bindingLogger?.QueueDeclare(queueName, false, false);
|
||||
}
|
||||
});
|
||||
|
||||
return queueName;
|
||||
@ -382,6 +395,7 @@ namespace Tapeti.Connection
|
||||
return;
|
||||
|
||||
DeclareExchange(channel, binding.Exchange);
|
||||
(logger as IBindingLogger)?.QueueBind(queueName, false, binding.Exchange, binding.RoutingKey);
|
||||
channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
|
||||
});
|
||||
}
|
||||
@ -554,6 +568,7 @@ namespace Tapeti.Connection
|
||||
if (declaredExchanges.Contains(exchange))
|
||||
return;
|
||||
|
||||
(logger as IBindingLogger)?.ExchangeDeclare(exchange);
|
||||
channel.ExchangeDeclare(exchange, "topic", true);
|
||||
declaredExchanges.Add(exchange);
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
using System;
|
||||
using System.Linq.Expressions;
|
||||
using System.Reflection;
|
||||
using System.Threading.Tasks;
|
||||
using Tapeti.Annotations;
|
||||
@ -18,7 +19,6 @@ namespace Tapeti.Connection
|
||||
private readonly IMessageSerializer messageSerializer;
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
public TapetiPublisher(ITapetiConfig config, Func<ITapetiClient> clientFactory)
|
||||
{
|
||||
this.config = config;
|
||||
@ -37,6 +37,65 @@ namespace Tapeti.Connection
|
||||
}
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Action<TResponse>>> responseMethodSelector) where TController : class
|
||||
{
|
||||
await PublishRequest(message, responseMethodSelector.Body);
|
||||
}
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Func<TResponse, Task>>> responseMethodSelector) where TController : class
|
||||
{
|
||||
await PublishRequest(message, responseMethodSelector.Body);
|
||||
}
|
||||
|
||||
|
||||
private async Task PublishRequest(object message, Expression responseMethodBody)
|
||||
{
|
||||
var callExpression = (responseMethodBody as UnaryExpression)?.Operand as MethodCallExpression;
|
||||
var targetMethodExpression = callExpression?.Object as ConstantExpression;
|
||||
|
||||
var responseHandler = targetMethodExpression?.Value as MethodInfo;
|
||||
if (responseHandler == null)
|
||||
throw new ArgumentException("Unable to determine the response method", nameof(responseMethodBody));
|
||||
|
||||
|
||||
var requestAttribute = message.GetType().GetCustomAttribute<RequestAttribute>();
|
||||
if (requestAttribute?.Response == null)
|
||||
throw new ArgumentException($"Request message {message.GetType().Name} must be marked with the Request attribute and a valid Response type", nameof(message));
|
||||
|
||||
var binding = config.Bindings.ForMethod(responseHandler);
|
||||
if (binding == null)
|
||||
throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler));
|
||||
|
||||
if (!binding.Accept(requestAttribute.Response))
|
||||
throw new ArgumentException($"responseHandler must accept message of type {requestAttribute.Response}", nameof(responseHandler));
|
||||
|
||||
var responseHandleAttribute = binding.Method.GetCustomAttribute<ResponseHandlerAttribute>();
|
||||
if (responseHandleAttribute == null)
|
||||
throw new ArgumentException("responseHandler must be marked with the ResponseHandler attribute", nameof(responseHandler));
|
||||
|
||||
if (binding.QueueName == null)
|
||||
throw new ArgumentException("responseHandler is not yet subscribed to a queue, TapetiConnection.Subscribe must be called before starting a request", nameof(responseHandler));
|
||||
|
||||
|
||||
var properties = new MessageProperties
|
||||
{
|
||||
ReplyTo = binding.QueueName
|
||||
};
|
||||
|
||||
await Publish(message, properties, IsMandatory(message));
|
||||
}
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task SendToQueue(string queueName, object message)
|
||||
{
|
||||
await PublishDirect(message, queueName, null, IsMandatory(message));
|
||||
}
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task Publish(object message, IMessageProperties properties, bool mandatory)
|
||||
{
|
||||
|
@ -7,7 +7,7 @@ namespace Tapeti.Default
|
||||
/// <summary>
|
||||
/// Default ILogger implementation for console applications.
|
||||
/// </summary>
|
||||
public class ConsoleLogger : ILogger
|
||||
public class ConsoleLogger : IBindingLogger
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public void Connect(IConnectContext connectContext)
|
||||
@ -52,6 +52,32 @@ namespace Tapeti.Default
|
||||
Console.WriteLine(exception);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void QueueDeclare(string queueName, bool durable, bool passive)
|
||||
{
|
||||
Console.WriteLine(passive
|
||||
? $"[Tapeti] Declaring {(durable ? "durable" : "dynamic")} queue {queueName}"
|
||||
: $"[Tapeti] Verifying durable queue {queueName}");
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
|
||||
{
|
||||
Console.WriteLine($"[Tapeti] Binding {queueName} to exchange {exchange} with routing key {routingKey}");
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void QueueUnbind(string queueName, string exchange, string routingKey)
|
||||
{
|
||||
Console.WriteLine($"[Tapeti] Removing binding for {queueName} to exchange {exchange} with routing key {routingKey}");
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void ExchangeDeclare(string exchange)
|
||||
{
|
||||
Console.WriteLine($"[Tapeti] Declaring exchange {exchange}");
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void QueueObsolete(string queueName, bool deleted, uint messageCount)
|
||||
{
|
||||
|
@ -44,7 +44,6 @@ namespace Tapeti.Default
|
||||
public IBindingResult Result => result;
|
||||
|
||||
|
||||
/// <inheritdoc />
|
||||
public ControllerBindingContext(IEnumerable<ParameterInfo> parameters, ParameterInfo result)
|
||||
{
|
||||
this.parameters = parameters.Select(parameter => new ControllerBindingParameter(parameter)).ToList();
|
||||
|
@ -33,10 +33,5 @@ namespace Tapeti.Default
|
||||
public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
|
||||
{
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void QueueObsolete(string queueName, bool deleted, uint messageCount)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -110,6 +110,48 @@ namespace Tapeti
|
||||
/// <param name="messageContext"></param>
|
||||
/// <param name="consumeResult">Indicates the action taken by the exception handler</param>
|
||||
void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult);
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Optional interface which can be implemented by an ILogger implementation to log all operations
|
||||
/// related to declaring queues and bindings.
|
||||
/// </summary>
|
||||
public interface IBindingLogger : ILogger
|
||||
{
|
||||
/// <summary>
|
||||
/// Called before a queue is declared for durable queues and dynamic queues with a prefix. Called after
|
||||
/// a queue is declared for dynamic queues without a name with the queue name as determined by the RabbitMQ server.
|
||||
/// Will always be called even if the queue already existed, as that information is not returned by the RabbitMQ server/client.
|
||||
/// </summary>
|
||||
/// <param name="queueName">The name of the queue that is declared</param>
|
||||
/// <param name="durable">Indicates if the queue is durable or dynamic</param>
|
||||
/// <param name="passive">Indicates whether the queue was declared as passive (to verify durable queues)</param>
|
||||
void QueueDeclare(string queueName, bool durable, bool passive);
|
||||
|
||||
/// <summary>
|
||||
/// Called before a binding is added to a queue.
|
||||
/// </summary>
|
||||
/// <param name="queueName">The name of the queue the binding is created for</param>
|
||||
/// <param name="durable">Indicates if the queue is durable or dynamic</param>
|
||||
/// <param name="exchange">The exchange for the binding</param>
|
||||
/// <param name="routingKey">The routing key for the binding</param>
|
||||
void QueueBind(string queueName, bool durable, string exchange, string routingKey);
|
||||
|
||||
/// <summary>
|
||||
/// Called before a binding is removed from a durable queue.
|
||||
/// </summary>
|
||||
/// <param name="queueName">The name of the queue the binding is removed from</param>
|
||||
/// <param name="exchange">The exchange of the binding</param>
|
||||
/// <param name="routingKey">The routing key of the binding</param>
|
||||
void QueueUnbind(string queueName, string exchange, string routingKey);
|
||||
|
||||
/// <summary>
|
||||
/// Called before an exchange is declared. Will always be called once for each exchange involved in a dynamic queue,
|
||||
/// durable queue with auto-declare bindings enabled or published messages, even if the exchange already existed.
|
||||
/// </summary>
|
||||
/// <param name="exchange">The name of the exchange that is declared</param>
|
||||
void ExchangeDeclare(string exchange);
|
||||
|
||||
/// <summary>
|
||||
/// Called when a queue is determined to be obsolete.
|
||||
|
@ -1,4 +1,6 @@
|
||||
using System.Threading.Tasks;
|
||||
using System;
|
||||
using System.Linq.Expressions;
|
||||
using System.Threading.Tasks;
|
||||
using Tapeti.Config;
|
||||
|
||||
// ReSharper disable once UnusedMember.Global
|
||||
@ -15,6 +17,40 @@ namespace Tapeti
|
||||
/// </summary>
|
||||
/// <param name="message">The message to send</param>
|
||||
Task Publish(object message);
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Publish the specified request message and handle the response with the controller method as specified
|
||||
/// by the responseMethodSelector expression. The response method or controller must have a valid queue attribute.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The response method is called on a new instance of the controller, as is the case with a regular message.
|
||||
/// To preserve state, use the Tapeti.Flow extension instead.
|
||||
/// </remarks>
|
||||
/// <param name="responseMethodSelector">An expression defining the method which handles the response. Example: c => c.HandleResponse</param>
|
||||
/// <param name="message">The message to send</param>
|
||||
Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Action<TResponse>>> responseMethodSelector) where TController : class;
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Publish the specified request message and handle the response with the controller method as specified
|
||||
/// by the responseMethodSelector expression. The response method or controller must have a valid queue attribute.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The response method is called on a new instance of the controller, as is the case with a regular message.
|
||||
/// To preserve state, use the Tapeti.Flow extension instead.
|
||||
/// </remarks>
|
||||
/// <param name="responseMethodSelector">An expression defining the method which handles the response. Example: c => c.HandleResponse</param>
|
||||
/// <param name="message">The message to send</param>
|
||||
Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Func<TResponse, Task>>> responseMethodSelector) where TController : class;
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Sends a message directly to the specified queue. Not recommended for general use.
|
||||
/// </summary>
|
||||
/// <param name="queueName">The name of the queue to publish the message to</param>
|
||||
/// <param name="message">The message to send</param>
|
||||
Task SendToQueue(string queueName, object message);
|
||||
}
|
||||
|
||||
|
||||
|
@ -299,6 +299,12 @@ namespace Tapeti
|
||||
}
|
||||
|
||||
|
||||
public IControllerMethodBinding ForMethod(MethodInfo method)
|
||||
{
|
||||
return methodLookup.TryGetValue(method, out var binding) ? binding : null;
|
||||
}
|
||||
|
||||
|
||||
public void Lock()
|
||||
{
|
||||
methodLookup = this
|
||||
|
@ -63,6 +63,10 @@ namespace Tapeti
|
||||
};
|
||||
|
||||
|
||||
if (method.GetCustomAttribute<ResponseHandlerAttribute>() != null)
|
||||
context.SetBindingTargetMode(BindingTargetMode.Direct);
|
||||
|
||||
|
||||
var allowBinding = false;
|
||||
builderAccess.ApplyBindingMiddleware(context, () => { allowBinding = true; });
|
||||
|
||||
@ -97,7 +101,6 @@ namespace Tapeti
|
||||
MessageMiddleware = context.Middleware.Where(m => m is IControllerMessageMiddleware).Cast<IControllerMessageMiddleware>().ToList(),
|
||||
CleanupMiddleware = context.Middleware.Where(m => m is IControllerCleanupMiddleware).Cast<IControllerCleanupMiddleware>().ToList()
|
||||
}));
|
||||
|
||||
}
|
||||
|
||||
return builder;
|
||||
|
14
docs/README.md
Normal file
14
docs/README.md
Normal file
@ -0,0 +1,14 @@
|
||||
The documentation can be built locally using Sphinx. Install Python 3 (choco install python on Windows),
|
||||
then install sphinx and the ReadTheDocs theme:
|
||||
|
||||
```pip install sphinx sphinx_rtd_theme```
|
||||
|
||||
To build the HTML output, run:
|
||||
|
||||
```.\make.bat html```
|
||||
|
||||
|
||||
|
||||
To use the auto reloading server (rundev.bat), install the sphinx-autobuild package:
|
||||
|
||||
```pip install sphinx-autobuild```
|
@ -46,7 +46,7 @@ The start method can have any name, but must be annotated with the ``[Start]`` a
|
||||
public DateTime RequestStart { get; set; }
|
||||
|
||||
[Start]
|
||||
IYieldPoint StartFlow()
|
||||
public IYieldPoint StartFlow()
|
||||
{
|
||||
RequestStart = DateTime.UtcNow();
|
||||
}
|
||||
@ -67,7 +67,7 @@ Often you'll want to pass some initial information to the flow. The Start method
|
||||
public DateTime RequestStart { get; set; }
|
||||
|
||||
[Start]
|
||||
IYieldPoint StartFlow(string colorFilter)
|
||||
public IYieldPoint StartFlow(string colorFilter)
|
||||
{
|
||||
RequestStart = DateTime.UtcNow();
|
||||
}
|
||||
@ -103,7 +103,7 @@ If the response handler is not asynchronous, use ``YieldWithRequestSync`` instea
|
||||
}
|
||||
|
||||
[Start]
|
||||
IYieldPoint StartFlow(string colorFilter)
|
||||
public IYieldPoint StartFlow(string colorFilter)
|
||||
{
|
||||
RequestStart = DateTime.UtcNow();
|
||||
|
||||
|
@ -11,3 +11,4 @@ Tapeti documentation
|
||||
dataannotations
|
||||
flow
|
||||
transient
|
||||
tapeticmd
|
177
docs/tapeticmd.rst
Normal file
177
docs/tapeticmd.rst
Normal file
@ -0,0 +1,177 @@
|
||||
Tapeti.Cmd
|
||||
==========
|
||||
|
||||
The Tapeti command-line tool provides various operations for managing messages. It tries to be compatible with all type of messages, but has been tested only against JSON messages, specifically those sent by Tapeti.
|
||||
|
||||
|
||||
Common parameters
|
||||
-----------------
|
||||
|
||||
All operations support the following parameters. All are optional.
|
||||
|
||||
-h <hostname>, --host <hostname>
|
||||
Specifies the hostname of the RabbitMQ server. Default is localhost.
|
||||
|
||||
--port <port>
|
||||
Specifies the AMQP port of the RabbitMQ server. Default is 5672.
|
||||
|
||||
-v <virtualhost>, --virtualhost <virtualhost>
|
||||
Specifies the virtual host to use. Default is /.
|
||||
|
||||
-u <username>, --username <username>
|
||||
Specifies the username to authenticate the connection. Default is guest.
|
||||
|
||||
-p <password>, --password <username>
|
||||
Specifies the password to authenticate the connection. Default is guest.
|
||||
|
||||
|
||||
Example:
|
||||
::
|
||||
|
||||
.\Tapeti.Cmd.exe <operation> -h rabbitmq-server -u tapeti -p topsecret
|
||||
|
||||
|
||||
|
||||
Export
|
||||
------
|
||||
|
||||
Fetches messages from a queue and writes them to disk.
|
||||
|
||||
-q <queue>, --queue <queue>
|
||||
*Required*. The queue to read the messages from.
|
||||
|
||||
-o <target>, --output <target>
|
||||
*Required*. Path or filename (depending on the chosen serialization method) where the messages will be output to.
|
||||
|
||||
-r, --remove
|
||||
If specified messages are acknowledged and removed from the queue. If not messages are kept.
|
||||
|
||||
-n <count>, --maxcount <count>
|
||||
Maximum number of messages to retrieve from the queue. If not specified all messages are exported.
|
||||
|
||||
-s <method>, --serialization <method>
|
||||
The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.
|
||||
|
||||
|
||||
Example:
|
||||
::
|
||||
|
||||
.\Tapeti.Cmd.exe export -q tapeti.example.01 -o dump.json
|
||||
|
||||
|
||||
|
||||
Import
|
||||
------
|
||||
|
||||
Read messages from disk as previously exported and publish them to a queue.
|
||||
|
||||
-i <source>
|
||||
*Required*. Path or filename (depending on the chosen serialization method) where the messages will be read from.
|
||||
|
||||
-e, --exchange
|
||||
If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.
|
||||
|
||||
-s <method>, --serialization <method>
|
||||
The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.
|
||||
|
||||
|
||||
Example:
|
||||
::
|
||||
|
||||
.\Tapeti.Cmd.exe import -i dump.json
|
||||
|
||||
|
||||
|
||||
Shovel
|
||||
------
|
||||
|
||||
Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.
|
||||
|
||||
-q <queue>, --queue <queue>
|
||||
*Required*. The queue to read the messages from.
|
||||
|
||||
-t <queue>, --targetqueue <queue>
|
||||
The target queue to publish the messages to. Defaults to the source queue if a different target host, port or virtualhost is specified. Otherwise it must be different from the source queue.
|
||||
|
||||
-r, --remove
|
||||
If specified messages are acknowledged and removed from the queue. If not messages are kept.
|
||||
|
||||
-n <count>, --maxcount <count>
|
||||
Maximum number of messages to retrieve from the queue. If not specified all messages are exported.
|
||||
|
||||
--targethost <host>
|
||||
Hostname of the target RabbitMQ server. Defaults to the source host. Note that you may still specify a different targetusername for example.
|
||||
|
||||
--targetport <port>
|
||||
AMQP port of the target RabbitMQ server. Defaults to the source port.
|
||||
|
||||
--targetvirtualhost <virtualhost>
|
||||
Virtual host used for the target RabbitMQ connection. Defaults to the source virtualhost.
|
||||
|
||||
--targetusername <username>
|
||||
Username used to connect to the target RabbitMQ server. Defaults to the source username.
|
||||
|
||||
--targetpassword <password>
|
||||
Password used to connect to the target RabbitMQ server. Defaults to the source password.
|
||||
|
||||
|
||||
|
||||
Example:
|
||||
::
|
||||
|
||||
.\Tapeti.Cmd.exe shovel -q tapeti.example.01 -t tapeti.example.06
|
||||
|
||||
|
||||
Serialization methods
|
||||
---------------------
|
||||
|
||||
For importing and exporting messages, Tapeti.Cmd supports two serialization methods.
|
||||
|
||||
SingleFileJSON
|
||||
''''''''''''''
|
||||
The default serialization method. All messages are contained in a single file, where each line is a JSON document describing the message properties and it's content.
|
||||
|
||||
An example message (formatted as multi-line to be more readable, but keep in mind that it must be a single line in the export file to be imported properly):
|
||||
|
||||
::
|
||||
|
||||
{
|
||||
"DeliveryTag": 1,
|
||||
"Redelivered": true,
|
||||
"Exchange": "tapeti",
|
||||
"RoutingKey": "quote.request",
|
||||
"Queue": "tapeti.example.01",
|
||||
"Properties": {
|
||||
"AppId": null,
|
||||
"ClusterId": null,
|
||||
"ContentEncoding": null,
|
||||
"ContentType": "application/json",
|
||||
"CorrelationId": null,
|
||||
"DeliveryMode": 2,
|
||||
"Expiration": null,
|
||||
"Headers": {
|
||||
"classType": "Messaging.TapetiExample.QuoteRequestMessage:Messaging.TapetiExample"
|
||||
},
|
||||
"MessageId": null,
|
||||
"Priority": null,
|
||||
"ReplyTo": null,
|
||||
"Timestamp": 1581600132,
|
||||
"Type": null,
|
||||
"UserId": null
|
||||
},
|
||||
"Body": {
|
||||
"Amount": 2
|
||||
},
|
||||
"RawBody": "<JSON encoded byte array>"
|
||||
}
|
||||
|
||||
The properties correspond to the RabbitMQ client's IBasicProperties and can be omitted if empty.
|
||||
|
||||
Either Body or RawBody is present. Body is used if the ContentType is set to application/json, and will contain the original message as an inline JSON object for easy manipulation. For other content types, the RawBody contains the original encoded body.
|
||||
|
||||
|
||||
EasyNetQHosepipe
|
||||
''''''''''''''''
|
||||
Provides compatibility with the EasyNetQ Hosepipe's dump/insert format. The source or target parameter must be a path. Each message consists of 3 files, ending in .message.txt, .properties.txt and .info.txt.
|
||||
|
||||
As this is only provided for emergency situations, see the source code if you want to know more about the format specification.
|
Loading…
Reference in New Issue
Block a user