1
0
mirror of synced 2024-11-21 17:03:50 +00:00

Implemented stateless request-response support

This commit is contained in:
Mark van Renswoude 2020-02-12 11:34:51 +01:00
parent bbb5f6c218
commit 2745d18779
14 changed files with 1661 additions and 1392 deletions

View File

@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>_06_StatelessRequestResponse</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.9.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Examples\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Examples\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
<ProjectReference Include="..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,28 @@
using System;
using ExampleLib;
using Messaging.TapetiExample;
using Tapeti.Annotations;
namespace _06_StatelessRequestResponse
{
[MessageController]
[DynamicQueue("tapeti.example.06")]
public class ExampleMessageController
{
private readonly IExampleState exampleState;
public ExampleMessageController(IExampleState exampleState)
{
this.exampleState = exampleState;
}
[ResponseHandler]
public void HandleQuoteResponse(QuoteResponseMessage message)
{
Console.WriteLine("Received response: " + message.Quote);
exampleState.Done();
}
}
}

View File

@ -0,0 +1,51 @@
using System;
using System.Threading.Tasks;
using ExampleLib;
using Messaging.TapetiExample;
using SimpleInjector;
using Tapeti;
using Tapeti.DataAnnotations;
using Tapeti.Default;
using Tapeti.SimpleInjector;
namespace _06_StatelessRequestResponse
{
public class Program
{
public static void Main(string[] args)
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
container.Register<ILogger, ConsoleLogger>();
var helper = new ExampleConsoleApp(dependencyResolver);
helper.Run(MainAsync);
}
internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func<Task> waitForDone)
{
var config = new TapetiConfig(dependencyResolver)
.WithDataAnnotations()
.RegisterAllControllers()
.Build();
using (var connection = new TapetiConnection(config))
{
await connection.Subscribe();
var publisher = dependencyResolver.Resolve<IPublisher>();
await publisher.PublishRequest<ExampleMessageController, QuoteRequestMessage, QuoteResponseMessage>(
new QuoteRequestMessage
{
Amount = 1
},
c => c.HandleQuoteResponse);
await waitForDone();
}
}
}
}

View File

@ -0,0 +1,38 @@
using Messaging.TapetiExample;
using Tapeti.Annotations;
namespace _06_StatelessRequestResponse
{
[MessageController]
[DynamicQueue("tapeti.example.06.receiver")]
public class ReceivingMessageController
{
// No publisher required, responses can simply be returned
public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message)
{
string quote;
switch (message.Amount)
{
case 1:
// Well, they asked for it... :-)
quote = "'";
break;
case 2:
quote = "\"";
break;
default:
// We have to return a response.
quote = null;
break;
}
return new QuoteResponseMessage
{
Quote = quote
};
}
}
}

View File

@ -0,0 +1,14 @@
using System;
namespace Tapeti.Annotations
{
/// <inheritdoc />
/// <summary>
/// Indicates that the method only handles response messages which are sent directly
/// to the queue. No binding will be created.
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
public class ResponseHandlerAttribute : Attribute
{
}
}

View File

@ -2,7 +2,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework>
<TargetFramework>netcoreapp2.1</TargetFramework>
<Version>2.0.0</Version>
<Authors>Mark van Renswoude</Authors>
<Company>Mark van Renswoude</Company>

View File

@ -55,7 +55,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Ninject", "Tapeti.Ni
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tools", "Tools", "{62002327-46B0-4B72-B95A-594CE7F8C80D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -151,6 +153,10 @@ Global
{C8728BFC-7F97-41BC-956B-690A57B634EC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C8728BFC-7F97-41BC-956B-690A57B634EC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C8728BFC-7F97-41BC-956B-690A57B634EC}.Release|Any CPU.Build.0 = Release|Any CPU
{152227AA-3165-4550-8997-6EA80C84516E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{152227AA-3165-4550-8997-6EA80C84516E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -177,6 +183,7 @@ Global
{BA8CA9A2-BAFF-42BB-8439-3DD9D1F6C32E} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F}
{29478B10-FC53-4E93-ADEF-A775D9408131} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F}
{C8728BFC-7F97-41BC-956B-690A57B634EC} = {62002327-46B0-4B72-B95A-594CE7F8C80D}
{152227AA-3165-4550-8997-6EA80C84516E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B09CC2BF-B2AF-4CB6-8728-5D1D8E5C50FA}

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Reflection;
namespace Tapeti.Config
{
@ -81,6 +82,13 @@ namespace Tapeti.Config
/// <param name="method"></param>
/// <returns>The binding if found, null otherwise</returns>
IControllerMethodBinding ForMethod(Delegate method);
/// <summary>
/// Searches for a binding linked to the specified method.
/// </summary>
/// <param name="method"></param>
/// <returns>The binding if found, null otherwise</returns>
IControllerMethodBinding ForMethod(MethodInfo method);
}

View File

@ -1,4 +1,5 @@
using System;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading.Tasks;
using Tapeti.Annotations;
@ -18,7 +19,6 @@ namespace Tapeti.Connection
private readonly IMessageSerializer messageSerializer;
/// <inheritdoc />
public TapetiPublisher(ITapetiConfig config, Func<ITapetiClient> clientFactory)
{
this.config = config;
@ -37,6 +37,65 @@ namespace Tapeti.Connection
}
/// <inheritdoc />
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Action<TResponse>>> responseMethodSelector) where TController : class
{
await PublishRequest(message, responseMethodSelector.Body);
}
/// <inheritdoc />
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Func<TResponse, Task>>> responseMethodSelector) where TController : class
{
await PublishRequest(message, responseMethodSelector.Body);
}
private async Task PublishRequest(object message, Expression responseMethodBody)
{
var callExpression = (responseMethodBody as UnaryExpression)?.Operand as MethodCallExpression;
var targetMethodExpression = callExpression?.Object as ConstantExpression;
var responseHandler = targetMethodExpression?.Value as MethodInfo;
if (responseHandler == null)
throw new ArgumentException("Unable to determine the response method", nameof(responseMethodBody));
var requestAttribute = message.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response == null)
throw new ArgumentException($"Request message {message.GetType().Name} must be marked with the Request attribute and a valid Response type", nameof(message));
var binding = config.Bindings.ForMethod(responseHandler);
if (binding == null)
throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler));
if (!binding.Accept(requestAttribute.Response))
throw new ArgumentException($"responseHandler must accept message of type {requestAttribute.Response}", nameof(responseHandler));
var responseHandleAttribute = binding.Method.GetCustomAttribute<ResponseHandlerAttribute>();
if (responseHandleAttribute == null)
throw new ArgumentException("responseHandler must be marked with the ResponseHandler attribute", nameof(responseHandler));
if (binding.QueueName == null)
throw new ArgumentException("responseHandler is not yet subscribed to a queue, TapetiConnection.Subscribe must be called before starting a request", nameof(responseHandler));
var properties = new MessageProperties
{
ReplyTo = binding.QueueName
};
await Publish(message, properties, IsMandatory(message));
}
/// <inheritdoc />
public async Task SendToQueue(string queueName, object message)
{
await PublishDirect(message, queueName, null, IsMandatory(message));
}
/// <inheritdoc />
public async Task Publish(object message, IMessageProperties properties, bool mandatory)
{

View File

@ -44,7 +44,6 @@ namespace Tapeti.Default
public IBindingResult Result => result;
/// <inheritdoc />
public ControllerBindingContext(IEnumerable<ParameterInfo> parameters, ParameterInfo result)
{
this.parameters = parameters.Select(parameter => new ControllerBindingParameter(parameter)).ToList();

View File

@ -1,4 +1,6 @@
using System.Threading.Tasks;
using System;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Tapeti.Config;
// ReSharper disable once UnusedMember.Global
@ -15,6 +17,40 @@ namespace Tapeti
/// </summary>
/// <param name="message">The message to send</param>
Task Publish(object message);
/// <summary>
/// Publish the specified request message and handle the response with the controller method as specified
/// by the responseMethodSelector expression. The response method or controller must have a valid queue attribute.
/// </summary>
/// <remarks>
/// The response method is called on a new instance of the controller, as is the case with a regular message.
/// To preserve state, use the Tapeti.Flow extension instead.
/// </remarks>
/// <param name="responseMethodSelector">An expression defining the method which handles the response. Example: c => c.HandleResponse</param>
/// <param name="message">The message to send</param>
Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Action<TResponse>>> responseMethodSelector) where TController : class;
/// <summary>
/// Publish the specified request message and handle the response with the controller method as specified
/// by the responseMethodSelector expression. The response method or controller must have a valid queue attribute.
/// </summary>
/// <remarks>
/// The response method is called on a new instance of the controller, as is the case with a regular message.
/// To preserve state, use the Tapeti.Flow extension instead.
/// </remarks>
/// <param name="responseMethodSelector">An expression defining the method which handles the response. Example: c => c.HandleResponse</param>
/// <param name="message">The message to send</param>
Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Func<TResponse, Task>>> responseMethodSelector) where TController : class;
/// <summary>
/// Sends a message directly to the specified queue. Not recommended for general use.
/// </summary>
/// <param name="queueName">The name of the queue to publish the message to</param>
/// <param name="message">The message to send</param>
Task SendToQueue(string queueName, object message);
}

View File

@ -299,6 +299,12 @@ namespace Tapeti
}
public IControllerMethodBinding ForMethod(MethodInfo method)
{
return methodLookup.TryGetValue(method, out var binding) ? binding : null;
}
public void Lock()
{
methodLookup = this

View File

@ -63,6 +63,10 @@ namespace Tapeti
};
if (method.GetCustomAttribute<ResponseHandlerAttribute>() != null)
context.SetBindingTargetMode(BindingTargetMode.Direct);
var allowBinding = false;
builderAccess.ApplyBindingMiddleware(context, () => { allowBinding = true; });
@ -97,7 +101,6 @@ namespace Tapeti
MessageMiddleware = context.Middleware.Where(m => m is IControllerMessageMiddleware).Cast<IControllerMessageMiddleware>().ToList(),
CleanupMiddleware = context.Middleware.Where(m => m is IControllerCleanupMiddleware).Cast<IControllerCleanupMiddleware>().ToList()
}));
}
return builder;

View File

@ -46,7 +46,7 @@ The start method can have any name, but must be annotated with the ``[Start]`` a
public DateTime RequestStart { get; set; }
[Start]
IYieldPoint StartFlow()
public IYieldPoint StartFlow()
{
RequestStart = DateTime.UtcNow();
}
@ -67,7 +67,7 @@ Often you'll want to pass some initial information to the flow. The Start method
public DateTime RequestStart { get; set; }
[Start]
IYieldPoint StartFlow(string colorFilter)
public IYieldPoint StartFlow(string colorFilter)
{
RequestStart = DateTime.UtcNow();
}
@ -103,7 +103,7 @@ If the response handler is not asynchronous, use ``YieldWithRequestSync`` instea
}
[Start]
IYieldPoint StartFlow(string colorFilter)
public IYieldPoint StartFlow(string colorFilter)
{
RequestStart = DateTime.UtcNow();