1
0
mirror of synced 2024-11-22 01:13:49 +00:00

[ci skip] Bit of refactoring and bugfixing, mostly documentation

This commit is contained in:
Mark van Renswoude 2019-08-14 20:48:40 +02:00
parent 6c32665c8a
commit 314a67db00
39 changed files with 871 additions and 268 deletions

View File

@ -10,7 +10,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="System.Data.SqlClient" Version="4.5.0" /> <PackageReference Include="System.Data.SqlClient" Version="4.6.1" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -2,6 +2,11 @@
namespace Tapeti.Flow.Annotations namespace Tapeti.Flow.Annotations
{ {
/// <inheritdoc />
/// <summary>
/// Marks a message handler as a response message handler which continues a Tapeti Flow.
/// The method only receives direct messages, and does not create a routing key based binding to the queue.
/// </summary>
[AttributeUsage(AttributeTargets.Method)] [AttributeUsage(AttributeTargets.Method)]
public class ContinuationAttribute : Attribute public class ContinuationAttribute : Attribute
{ {

View File

@ -3,6 +3,11 @@ using JetBrains.Annotations;
namespace Tapeti.Flow.Annotations namespace Tapeti.Flow.Annotations
{ {
/// <inheritdoc />
/// <summary>
/// Marks this method as the start of a Tapeti Flow. Use IFlowStarter.Start to begin a new flow and
/// call this method. Must return an IYieldPoint.
/// </summary>
[AttributeUsage(AttributeTargets.Method)] [AttributeUsage(AttributeTargets.Method)]
[MeansImplicitUse] [MeansImplicitUse]
public class StartAttribute : Attribute public class StartAttribute : Attribute

View File

@ -2,11 +2,20 @@
namespace Tapeti.Flow namespace Tapeti.Flow
{ {
/// <summary>
/// ITapetiConfigBuilder extension for enabling Flow.
/// </summary>
public static class ConfigExtensions public static class ConfigExtensions
{ {
/// <summary>
/// Enables Tapeti Flow.
/// </summary>
/// <param name="config"></param>
/// <param name="flowRepository">An optional IFlowRepository implementation to persist flow state. If not provided, flow state will be lost when the application restarts.</param>
/// <returns></returns>
public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository flowRepository = null) public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository flowRepository = null)
{ {
config.Use(new FlowMiddleware(flowRepository)); config.Use(new FlowExtension(flowRepository));
return config; return config;
} }
} }

View File

@ -1,7 +1,13 @@
namespace Tapeti.Flow namespace Tapeti.Flow
{ {
/// <summary>
/// Key names as used in the message context store. For internal use.
/// </summary>
public static class ContextItems public static class ContextItems
{ {
/// <summary>
/// Key given to the FlowContext object as stored in the message context.
/// </summary>
public const string FlowContext = "Tapeti.Flow.FlowContext"; public const string FlowContext = "Tapeti.Flow.FlowContext";
} }
} }

View File

@ -15,9 +15,6 @@ namespace Tapeti.Flow.Default
if (context.Method.GetCustomAttribute<StartAttribute>() != null) if (context.Method.GetCustomAttribute<StartAttribute>() != null)
return; return;
if (context.Method.GetCustomAttribute<ContinuationAttribute>() != null)
context.SetBindingTargetMode(BindingTargetMode.Direct);
RegisterYieldPointResult(context); RegisterYieldPointResult(context);
RegisterContinuationFilter(context); RegisterContinuationFilter(context);
@ -33,7 +30,8 @@ namespace Tapeti.Flow.Default
if (continuationAttribute == null) if (continuationAttribute == null)
return; return;
context.Use(new FlowMiddleware()); context.SetBindingTargetMode(BindingTargetMode.Direct);
context.Use(new FlowContinuationMiddleware());
if (context.Result.HasHandler) if (context.Result.HasHandler)
return; return;

View File

@ -6,11 +6,11 @@ using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
{ {
public class FlowMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware public class FlowContinuationMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware
{ {
public async Task Filter(IControllerMessageContext context, Func<Task> next) public async Task Filter(IControllerMessageContext context, Func<Task> next)
{ {
var flowContext = await CreateFlowContext(context); var flowContext = await EnrichWithFlowContext(context);
if (flowContext?.ContinuationMetadata == null) if (flowContext?.ContinuationMetadata == null)
return; return;
@ -44,7 +44,7 @@ namespace Tapeti.Flow.Default
} }
public async Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func<Task> next) public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next)
{ {
await next(); await next();
@ -62,7 +62,7 @@ namespace Tapeti.Flow.Default
private static async Task<FlowContext> CreateFlowContext(IControllerMessageContext context) private static async Task<FlowContext> EnrichWithFlowContext(IControllerMessageContext context)
{ {
if (context.Get(ContextItems.FlowContext, out FlowContext flowContext)) if (context.Get(ContextItems.FlowContext, out FlowContext flowContext))
return flowContext; return flowContext;

View File

@ -12,6 +12,10 @@ using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
{ {
/// <inheritdoc cref="IFlowProvider"/> />
/// <summary>
/// Default implementation for IFlowProvider.
/// </summary>
public class FlowProvider : IFlowProvider, IFlowHandler public class FlowProvider : IFlowProvider, IFlowHandler
{ {
private readonly ITapetiConfig config; private readonly ITapetiConfig config;
@ -25,28 +29,33 @@ namespace Tapeti.Flow.Default
} }
/// <inheritdoc />
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler) public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler)
{ {
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
} }
/// <inheritdoc />
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler)
{ {
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
} }
/// <inheritdoc />
public IFlowParallelRequestBuilder YieldWithParallelRequest() public IFlowParallelRequestBuilder YieldWithParallelRequest()
{ {
return new ParallelRequestBuilder(config, SendRequest); return new ParallelRequestBuilder(config, SendRequest);
} }
/// <inheritdoc />
public IYieldPoint EndWithResponse<TResponse>(TResponse message) public IYieldPoint EndWithResponse<TResponse>(TResponse message)
{ {
return new DelegateYieldPoint(context => SendResponse(context, message)); return new DelegateYieldPoint(context => SendResponse(context, message));
} }
/// <inheritdoc />
public IYieldPoint End() public IYieldPoint End()
{ {
return new DelegateYieldPoint(EndFlow); return new DelegateYieldPoint(EndFlow);
@ -179,6 +188,7 @@ namespace Tapeti.Flow.Default
}; };
} }
/// <inheritdoc />
public async Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint) public async Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint)
{ {
if (!(yieldPoint is DelegateYieldPoint executableYieldPoint)) if (!(yieldPoint is DelegateYieldPoint executableYieldPoint))

View File

@ -47,17 +47,19 @@ namespace Tapeti.Flow.Default
var controller = config.DependencyResolver.Resolve<TController>(); var controller = config.DependencyResolver.Resolve<TController>();
var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters)); var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters));
var context = new ControllerMessageContext /*
var context = new ControllerMessageContext()
{ {
Config = config, Config = config,
Controller = controller Controller = controller
}; };
*/
var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>();
try try
{ {
await flowHandler.Execute(context, yieldPoint); //await flowHandler.Execute(context, yieldPoint);
//handlingResult.ConsumeResponse = ConsumeResponse.Ack; //handlingResult.ConsumeResponse = ConsumeResponse.Ack;
} }
finally finally

View File

@ -4,20 +4,34 @@ using System.Linq;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
{ {
/// <summary>
/// Represents the state stored for active flows.
/// </summary>
public class FlowState public class FlowState
{ {
private FlowMetadata metadata; private FlowMetadata metadata;
private Dictionary<Guid, ContinuationMetadata> continuations; private Dictionary<Guid, ContinuationMetadata> continuations;
/// <summary>
/// Contains metadata about the flow.
/// </summary>
public FlowMetadata Metadata public FlowMetadata Metadata
{ {
get => metadata ?? (metadata = new FlowMetadata()); get => metadata ?? (metadata = new FlowMetadata());
set => metadata = value; set => metadata = value;
} }
/// <summary>
/// Contains the serialized state which is restored when a flow continues.
/// </summary>
public string Data { get; set; } public string Data { get; set; }
/// <summary>
/// Contains metadata about continuations awaiting a response.
/// </summary>
public Dictionary<Guid, ContinuationMetadata> Continuations public Dictionary<Guid, ContinuationMetadata> Continuations
{ {
get => continuations ?? (continuations = new Dictionary<Guid, ContinuationMetadata>()); get => continuations ?? (continuations = new Dictionary<Guid, ContinuationMetadata>());
@ -25,6 +39,9 @@ namespace Tapeti.Flow.Default
} }
/// <summary>
/// Creates a deep clone of this FlowState.
/// </summary>
public FlowState Clone() public FlowState Clone()
{ {
return new FlowState { return new FlowState {
@ -36,11 +53,20 @@ namespace Tapeti.Flow.Default
} }
/// <summary>
/// Contains metadata about the flow.
/// </summary>
public class FlowMetadata public class FlowMetadata
{ {
/// <summary>
/// Contains information about the expected response for this flow.
/// </summary>
public ReplyMetadata Reply { get; set; } public ReplyMetadata Reply { get; set; }
/// <summary>
/// Creates a deep clone of this FlowMetadata.
/// </summary>
public FlowMetadata Clone() public FlowMetadata Clone()
{ {
return new FlowMetadata return new FlowMetadata
@ -51,15 +77,36 @@ namespace Tapeti.Flow.Default
} }
/// <summary>
/// Contains information about the expected response for this flow.
/// </summary>
public class ReplyMetadata public class ReplyMetadata
{ {
/// <summary>
/// The queue to which the response should be sent.
/// </summary>
public string ReplyTo { get; set; } public string ReplyTo { get; set; }
/// <summary>
/// The correlation ID included in the original request.
/// </summary>
public string CorrelationId { get; set; } public string CorrelationId { get; set; }
/// <summary>
/// The expected response message class.
/// </summary>
public string ResponseTypeName { get; set; } public string ResponseTypeName { get; set; }
/// <summary>
/// Indicates whether the response should be sent a mandatory.
/// False for requests originating from a dynamic queue.
/// </summary>
public bool Mandatory { get; set; } public bool Mandatory { get; set; }
/// <summary>
/// Creates a deep clone of this ReplyMetadata.
/// </summary>
public ReplyMetadata Clone() public ReplyMetadata Clone()
{ {
return new ReplyMetadata return new ReplyMetadata
@ -73,13 +120,30 @@ namespace Tapeti.Flow.Default
} }
/// <summary>
/// Contains metadata about a continuation awaiting a response.
/// </summary>
public class ContinuationMetadata public class ContinuationMetadata
{ {
/// <summary>
/// The name of the method which will handle the response.
/// </summary>
public string MethodName { get; set; } public string MethodName { get; set; }
/// <summary>
/// The name of the method which is called when all responses have been processed.
/// </summary>
public string ConvergeMethodName { get; set; } public string ConvergeMethodName { get; set; }
/// <summary>
/// Determines if the converge method is synchronous or asynchronous.
/// </summary>
public bool ConvergeMethodSync { get; set; } public bool ConvergeMethodSync { get; set; }
/// <summary>
/// Creates a deep clone of this ContinuationMetadata.
/// </summary>
public ContinuationMetadata Clone() public ContinuationMetadata Clone()
{ {
return new ContinuationMetadata return new ContinuationMetadata

View File

@ -1,13 +1,16 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Flow.FlowHelpers; using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
{ {
/// <inheritdoc />
/// <summary>
/// Default implementation of IFlowStore.
/// </summary>
public class FlowStore : IFlowStore public class FlowStore : IFlowStore
{ {
private readonly ConcurrentDictionary<Guid, FlowState> flowStates = new ConcurrentDictionary<Guid, FlowState>(); private readonly ConcurrentDictionary<Guid, FlowState> flowStates = new ConcurrentDictionary<Guid, FlowState>();

View File

@ -4,15 +4,21 @@ using Tapeti.Flow.Default;
namespace Tapeti.Flow namespace Tapeti.Flow
{ {
public class FlowMiddleware : ITapetiExtension /// <inheritdoc />
/// <summary>
/// Provides the Flow middleware.
/// </summary>
public class FlowExtension : ITapetiExtension
{ {
private readonly IFlowRepository flowRepository; private readonly IFlowRepository flowRepository;
public FlowMiddleware(IFlowRepository flowRepository) /// <inheritdoc />
public FlowExtension(IFlowRepository flowRepository)
{ {
this.flowRepository = flowRepository; this.flowRepository = flowRepository;
} }
/// <inheritdoc />
public void RegisterDefaults(IDependencyContainer container) public void RegisterDefaults(IDependencyContainer container)
{ {
container.RegisterDefault<IFlowProvider, FlowProvider>(); container.RegisterDefault<IFlowProvider, FlowProvider>();
@ -22,6 +28,7 @@ namespace Tapeti.Flow
container.RegisterDefaultSingleton<IFlowStore, FlowStore>(); container.RegisterDefaultSingleton<IFlowStore, FlowStore>();
} }
/// <inheritdoc />
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver) public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
{ {
yield return new FlowBindingMiddleware(); yield return new FlowBindingMiddleware();

View File

@ -7,49 +7,162 @@ using Tapeti.Config;
namespace Tapeti.Flow namespace Tapeti.Flow
{ {
/// <summary>
/// Provides methods to build an IYieldPoint to indicate if and how Flow should continue.
/// </summary>
public interface IFlowProvider public interface IFlowProvider
{ {
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler); IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler);
// One does not simply overload methods with Task vs non-Task Funcs. "Ambiguous call".
// Apparantly this is because a return type of a method is not part of its signature, /// <summary>
// according to: http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter /// Publish a request message and continue the flow when the response arrives.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for synchronous response handlers.
/// </summary>
/// <remarks>
/// The reason why this requires the extra 'Sync' in the name: one does not simply overload methods
/// with Task vs non-Task Funcs. "Ambiguous call". Apparantly this is because a return type
/// of a method is not part of its signature,according to:
/// http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter
/// </remarks>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler); IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler);
/// <summary>
/// Create a request builder to publish one or more requests messages. Call Yield on the resulting builder
/// to acquire an IYieldPoint.
/// </summary>
IFlowParallelRequestBuilder YieldWithParallelRequest(); IFlowParallelRequestBuilder YieldWithParallelRequest();
/// <summary>
/// End the flow by publishing the specified response message. Only allowed, and required, when the
/// current flow was started by a message handler for a Request message.
/// </summary>
/// <param name="message"></param>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint EndWithResponse<TResponse>(TResponse message); IYieldPoint EndWithResponse<TResponse>(TResponse message);
/// <summary>
/// End the flow and dispose any state.
/// </summary>
IYieldPoint End(); IYieldPoint End();
} }
/// <summary> /// <summary>
/// Allows starting a flow outside of a message handler. /// Allows starting a flow outside of a message handler.
/// </summary> /// </summary>
public interface IFlowStarter public interface IFlowStarter
{ {
/// <summary>
/// Starts a new flow.
/// </summary>
/// <param name="methodSelector"></param>
Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class; Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class;
/// <summary>
/// Starts a new flow.
/// </summary>
/// <param name="methodSelector"></param>
Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class; Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class;
/// <summary>
/// Starts a new flow and passes the parameter to the method.
/// </summary>
/// <param name="methodSelector"></param>
/// <param name="parameter"></param>
Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class; Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class;
/// <summary>
/// Starts a new flow and passes the parameter to the method.
/// </summary>
/// <param name="methodSelector"></param>
/// <param name="parameter"></param>
Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class; Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class;
} }
/// <summary> /// <summary>
/// Internal interface. Do not call directly. /// Internal interface. Do not call directly.
/// </summary> /// </summary>
public interface IFlowHandler public interface IFlowHandler
{ {
/// <summary>
/// Executes the YieldPoint for the given message context.
/// </summary>
/// <param name="context"></param>
/// <param name="yieldPoint"></param>
Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint); Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint);
} }
/// <summary>
/// Builder to publish one or more request messages and continuing the flow when the responses arrive.
/// </summary>
public interface IFlowParallelRequestBuilder public interface IFlowParallelRequestBuilder
{ {
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint.
/// It can instead store state in the controller for the continuation passed to the Yield method.
/// Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler); IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint.
/// It can instead store state in the controller for the continuation passed to the Yield method.
/// Used for synchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler); IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
/// <summary>
/// Constructs an IYieldPoint to continue the flow when responses arrive.
/// The continuation method is called when all responses have arrived.
/// Response handlers and the continuation method are guaranteed thread-safe access to the
/// controller and can store state.
/// Used for asynchronous continuation methods.
/// </summary>
/// <param name="continuation"></param>
IYieldPoint Yield(Func<Task<IYieldPoint>> continuation); IYieldPoint Yield(Func<Task<IYieldPoint>> continuation);
/// <summary>
/// Constructs an IYieldPoint to continue the flow when responses arrive.
/// The continuation method is called when all responses have arrived.
/// Response handlers and the continuation method are guaranteed thread-safe access to the
/// controller and can store state.
/// Used for synchronous continuation methods.
/// </summary>
/// <param name="continuation"></param>
IYieldPoint YieldSync(Func<IYieldPoint> continuation); IYieldPoint YieldSync(Func<IYieldPoint> continuation);
} }
/// <summary>
/// Defines if and how the Flow should continue. Construct using any of the IFlowProvider methods.
/// </summary>
public interface IYieldPoint public interface IYieldPoint
{ {
} }

View File

@ -6,19 +6,57 @@ using Tapeti.Flow.Default;
namespace Tapeti.Flow namespace Tapeti.Flow
{ {
/// <summary>
/// Provides a way to store and load flow state.
/// </summary>
public interface IFlowStore public interface IFlowStore
{ {
/// <summary>
/// Must be called during application startup before subscribing or starting a flow.
/// If using an IFlowRepository that requires an update (such as creating tables) make
/// sure it is called before calling Load.
/// </summary>
Task Load(); Task Load();
/// <summary>
/// Looks up the FlowID corresponding to a ContinuationID. For internal use.
/// </summary>
/// <param name="continuationID"></param>
Task<Guid?> FindFlowID(Guid continuationID); Task<Guid?> FindFlowID(Guid continuationID);
/// <summary>
/// Acquires a lock on the flow with the specified FlowID.
/// </summary>
/// <param name="flowID"></param>
Task<IFlowStateLock> LockFlowState(Guid flowID); Task<IFlowStateLock> LockFlowState(Guid flowID);
} }
/// <inheritdoc />
/// <summary>
/// Represents a lock on the flow state, to provide thread safety.
/// </summary>
public interface IFlowStateLock : IDisposable public interface IFlowStateLock : IDisposable
{ {
/// <summary>
/// The unique ID of the flow state.
/// </summary>
Guid FlowID { get; } Guid FlowID { get; }
/// <summary>
/// Acquires a copy of the flow state.
/// </summary>
Task<FlowState> GetFlowState(); Task<FlowState> GetFlowState();
/// <summary>
/// Stores the new flow state.
/// </summary>
/// <param name="flowState"></param>
Task StoreFlowState(FlowState flowState); Task StoreFlowState(FlowState flowState);
/// <summary>
/// Disposes of the flow state corresponding to this Flow ID.
/// </summary>
Task DeleteFlowState(); Task DeleteFlowState();
} }
} }

View File

@ -6,7 +6,7 @@
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702;1591</NoWarn> <NoWarn>1701;1702</NoWarn>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>

View File

@ -10,7 +10,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Serilog" Version="2.7.1" /> <PackageReference Include="Serilog" Version="2.8.0" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -10,7 +10,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.3.0" /> <PackageReference Include="SimpleInjector" Version="4.6.2" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -9,9 +9,12 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
<PackageReference Include="xunit" Version="2.3.1" /> <PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> <PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -29,7 +29,7 @@ namespace Tapeti.Transient
/// <inheritdoc /> /// <inheritdoc />
public async Task Apply(IBindingTarget target) public async Task Apply(IBindingTarget target)
{ {
QueueName = await target.BindDirectDynamic(dynamicQueuePrefix); QueueName = await target.BindDynamicDirect(dynamicQueuePrefix);
router.TransientResponseQueueName = QueueName; router.TransientResponseQueueName = QueueName;
} }
@ -47,5 +47,12 @@ namespace Tapeti.Transient
router.HandleMessage(context); router.HandleMessage(context);
return Task.CompletedTask; return Task.CompletedTask;
} }
/// <inheritdoc />
public Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
{
return Task.CompletedTask;
}
} }
} }

View File

@ -33,6 +33,15 @@ namespace Tapeti.Config
/// </summary> /// </summary>
/// <param name="context"></param> /// <param name="context"></param>
Task Invoke(IMessageContext context); Task Invoke(IMessageContext context);
/// <summary>
/// Called after the handler is invoked and any exception handling has been done.
/// </summary>
/// <param name="context"></param>
/// <param name="consumeResult"></param>
/// <returns></returns>
Task Cleanup(IMessageContext context, ConsumeResult consumeResult);
} }
@ -67,7 +76,7 @@ namespace Tapeti.Config
/// Used for direct-to-queue messages. /// Used for direct-to-queue messages.
/// </summary> /// </summary>
/// <param name="queueName">The name of the durable queue</param> /// <param name="queueName">The name of the durable queue</param>
Task BindDirectDurable(string queueName); Task BindDurableDirect(string queueName);
/// <summary> /// <summary>
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key. /// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
@ -76,7 +85,7 @@ namespace Tapeti.Config
/// <param name="messageClass">The message class which will be handled on the queue. It is not actually bound to the queue.</param> /// <param name="messageClass">The message class which will be handled on the queue. It is not actually bound to the queue.</param>
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param> /// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <returns>The generated name of the dynamic queue</returns> /// <returns>The generated name of the dynamic queue</returns>
Task<string> BindDirectDynamic(Type messageClass = null, string queuePrefix = null); Task<string> BindDynamicDirect(Type messageClass = null, string queuePrefix = null);
/// <summary> /// <summary>
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key. /// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
@ -84,6 +93,6 @@ namespace Tapeti.Config
/// </summary> /// </summary>
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param> /// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <returns>The generated name of the dynamic queue</returns> /// <returns>The generated name of the dynamic queue</returns>
Task<string> BindDirectDynamic(string queuePrefix = null); Task<string> BindDynamicDirect(string queuePrefix = null);
} }
} }

View File

@ -43,10 +43,16 @@ namespace Tapeti.Config
public interface IControllerBindingContext public interface IControllerBindingContext
{ {
/// <summary> /// <summary>
/// The message class for this method. /// The message class for this method. Can be null if not yet set by the default MessageBinding or other middleware.
/// If required, call next first to ensure it is available.
/// </summary> /// </summary>
Type MessageClass { get; } Type MessageClass { get; }
/// <summary>
/// Determines if SetMessageClass has already been called.
/// </summary>
bool HasMessageClass { get; }
/// <summary> /// <summary>
/// The controller class for this binding. /// The controller class for this binding.
/// </summary> /// </summary>
@ -69,7 +75,7 @@ namespace Tapeti.Config
/// <summary> /// <summary>
/// Sets the message class for this method. Can only be called once, which is always done first by the default MessageBinding. /// Sets the message class for this method. Can only be called once, which is normally done by the default MessageBinding.
/// </summary> /// </summary>
/// <param name="messageClass"></param> /// <param name="messageClass"></param>
void SetMessageClass(Type messageClass); void SetMessageClass(Type messageClass);

View File

@ -3,11 +3,10 @@ using System.Threading.Tasks;
namespace Tapeti.Config namespace Tapeti.Config
{ {
/// <inheritdoc />
/// <summary> /// <summary>
/// Denotes middleware that runs after controller methods. /// Denotes middleware that runs after controller methods.
/// </summary> /// </summary>
public interface IControllerCleanupMiddleware : IControllerMiddlewareBase public interface IControllerCleanupMiddleware
{ {
/// <summary> /// <summary>
/// Called after the message handler method, even if exceptions occured. /// Called after the message handler method, even if exceptions occured.
@ -15,6 +14,6 @@ namespace Tapeti.Config
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="consumeResult"></param> /// <param name="consumeResult"></param>
/// <param name="next">Always call to allow the next in the chain to clean up</param> /// <param name="next">Always call to allow the next in the chain to clean up</param>
Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func<Task> next); Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next);
} }
} }

View File

@ -16,22 +16,5 @@
/// Provides access to the binding which is currently processing the message. /// Provides access to the binding which is currently processing the message.
/// </remarks> /// </remarks>
new IControllerMethodBinding Binding { get; } new IControllerMethodBinding Binding { get; }
/// <summary>
/// Stores a key-value pair in the context for passing information between the various
/// controller middleware stages (IControllerMiddlewareBase descendants).
/// </summary>
/// <param name="key">A unique key. It is recommended to prefix it with the package name which hosts the middleware to prevent conflicts</param>
/// <param name="value">Will be disposed if the value implements IDisposable</param>
void Store(string key, object value);
/// <summary>
/// Retrieves a previously stored value.
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns>True if the value was found, False otherwise</returns>
bool Get<T>(string key, out T value) where T : class;
} }
} }

View File

@ -42,5 +42,22 @@ namespace Tapeti.Config
/// Provides access to the binding which is currently processing the message. /// Provides access to the binding which is currently processing the message.
/// </remarks> /// </remarks>
IBinding Binding { get; } IBinding Binding { get; }
/// <summary>
/// Stores a key-value pair in the context for passing information between the various
/// middleware stages (mostly for IControllerMiddlewareBase descendants).
/// </summary>
/// <param name="key">A unique key. It is recommended to prefix it with the package name which hosts the middleware to prevent conflicts</param>
/// <param name="value">Will be disposed if the value implements IDisposable</param>
void Store(string key, object value);
/// <summary>
/// Retrieves a previously stored value.
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns>True if the value was found, False otherwise</returns>
bool Get<T>(string key, out T value) where T : class;
} }
} }

View File

@ -107,7 +107,7 @@ namespace Tapeti.Config
IDependencyResolver DependencyResolver { get; } IDependencyResolver DependencyResolver { get; }
/// <summary> /// <summary>
/// Applies the currently registered binding middleware to /// Applies the currently registered binding middleware to the specified context.
/// </summary> /// </summary>
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="lastHandler"></param> /// <param name="lastHandler"></param>

View File

@ -97,7 +97,8 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public async Task Publish(byte[] body, IMessageProperties properties, string exchange, string routingKey, bool mandatory) public async Task Publish(byte[] body, IMessageProperties properties, string exchange, string routingKey, bool mandatory)
{ {
var publishProperties = new RabbitMQMessageProperties(new BasicProperties(), properties); if (string.IsNullOrEmpty(routingKey))
throw new ArgumentNullException(nameof(routingKey));
await taskQueue.Value.Add(async () => await taskQueue.Value.Add(async () =>
{ {
@ -131,7 +132,18 @@ namespace Tapeti.Connection
else else
mandatory = false; mandatory = false;
channel.BasicPublish(exchange, routingKey, mandatory, publishProperties.BasicProperties, body); try
{
var publishProperties = new RabbitMQMessageProperties(channel.CreateBasicProperties(), properties);
channel.BasicPublish(exchange ?? "", routingKey, mandatory, publishProperties.BasicProperties, body);
}
catch
{
messageInfo.CompletionSource.SetCanceled();
publishResultTask = null;
throw;
}
}); });

View File

@ -1,7 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Runtime.ExceptionServices;
using Tapeti.Config; using Tapeti.Config;
using Tapeti.Default; using Tapeti.Default;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -56,9 +55,6 @@ namespace Tapeti.Connection
} }
catch (Exception dispatchException) catch (Exception dispatchException)
{ {
// TODO check if this is still necessary:
// var exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch));
using (var emptyContext = new MessageContext using (var emptyContext = new MessageContext
{ {
Config = config, Config = config,
@ -119,15 +115,18 @@ namespace Tapeti.Connection
try try
{ {
await MiddlewareHelper.GoAsync(config.Middleware.Message, await MiddlewareHelper.GoAsync(config.Middleware.Message,
(handler, next) => handler.Handle(context, next), async (handler, next) => await handler.Handle(context, next),
async () => { await binding.Invoke(context); }); async () => { await binding.Invoke(context); });
await binding.Cleanup(context, ConsumeResult.Success);
return ConsumeResult.Success; return ConsumeResult.Success;
} }
catch (Exception invokeException) catch (Exception invokeException)
{ {
var exceptionContext = new ExceptionStrategyContext(context, invokeException); var exceptionContext = new ExceptionStrategyContext(context, invokeException);
HandleException(exceptionContext); HandleException(exceptionContext);
await binding.Cleanup(context, exceptionContext.ConsumeResult);
return exceptionContext.ConsumeResult; return exceptionContext.ConsumeResult;
} }
} }

View File

@ -96,7 +96,7 @@ namespace Tapeti.Connection
public abstract Task BindDurable(Type messageClass, string queueName); public abstract Task BindDurable(Type messageClass, string queueName);
public abstract Task BindDirectDurable(string queueName); public abstract Task BindDurableDirect(string queueName);
public async Task<string> BindDynamic(Type messageClass, string queuePrefix = null) public async Task<string> BindDynamic(Type messageClass, string queuePrefix = null)
@ -115,14 +115,14 @@ namespace Tapeti.Connection
} }
public async Task<string> BindDirectDynamic(Type messageClass, string queuePrefix = null) public async Task<string> BindDynamicDirect(Type messageClass, string queuePrefix = null)
{ {
var result = await DeclareDynamicQueue(messageClass, queuePrefix); var result = await DeclareDynamicQueue(messageClass, queuePrefix);
return result.QueueName; return result.QueueName;
} }
public async Task<string> BindDirectDynamic(string queuePrefix = null) public async Task<string> BindDynamicDirect(string queuePrefix = null)
{ {
// If we don't know the routing key, always create a new queue to ensure there is no overlap. // If we don't know the routing key, always create a new queue to ensure there is no overlap.
// Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either. // Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either.
@ -215,7 +215,7 @@ namespace Tapeti.Connection
} }
public override Task BindDirectDurable(string queueName) public override Task BindDurableDirect(string queueName)
{ {
if (!durableQueues.ContainsKey(queueName)) if (!durableQueues.ContainsKey(queueName))
durableQueues.Add(queueName, new List<Type>()); durableQueues.Add(queueName, new List<Type>());
@ -259,7 +259,7 @@ namespace Tapeti.Connection
await VerifyDurableQueue(queueName); await VerifyDurableQueue(queueName);
} }
public override async Task BindDirectDurable(string queueName) public override async Task BindDurableDirect(string queueName)
{ {
await VerifyDurableQueue(queueName); await VerifyDurableQueue(queueName);
} }

View File

@ -0,0 +1,179 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using Tapeti.Config;
namespace Tapeti.Default
{
/// <inheritdoc />
/// <summary>
/// Default implementation for IControllerBindingContext
/// </summary>
public class ControllerBindingContext : IControllerBindingContext
{
private BindingTargetMode? bindingTargetMode;
private readonly List<IControllerMiddlewareBase> middleware = new List<IControllerMiddlewareBase>();
private readonly List<ControllerBindingParameter> parameters;
private readonly ControllerBindingResult result;
/// <summary>
/// Determines how the binding target is configured.
/// </summary>
public BindingTargetMode BindingTargetMode => bindingTargetMode ?? BindingTargetMode.Default;
/// <summary>
/// Provides access to the registered middleware for this method.
/// </summary>
public IReadOnlyList<IControllerMiddlewareBase> Middleware => middleware;
/// <inheritdoc />
public Type MessageClass { get; set; }
/// <inheritdoc />
public bool HasMessageClass => MessageClass != null;
/// <inheritdoc />
public Type Controller { get; set; }
/// <inheritdoc />
public MethodInfo Method { get; set; }
/// <inheritdoc />
public IReadOnlyList<IBindingParameter> Parameters => parameters;
/// <inheritdoc />
public IBindingResult Result => result;
/// <inheritdoc />
public ControllerBindingContext(IEnumerable<ParameterInfo> parameters, ParameterInfo result)
{
this.parameters = parameters.Select(parameter => new ControllerBindingParameter(parameter)).ToList();
this.result = new ControllerBindingResult(result);
}
/// <inheritdoc />
public void SetMessageClass(Type messageClass)
{
if (HasMessageClass)
throw new InvalidOperationException("SetMessageClass can only be called once");
MessageClass = messageClass;
}
/// <inheritdoc />
public void SetBindingTargetMode(BindingTargetMode mode)
{
if (bindingTargetMode.HasValue)
throw new InvalidOperationException("SetBindingTargetMode can only be called once");
bindingTargetMode = mode;
}
/// <inheritdoc />
public void Use(IControllerMiddlewareBase handler)
{
middleware.Add(handler);
}
/// <summary>
/// Returns the configured bindings for the parameters.
/// </summary>
public IEnumerable<ValueFactory> GetParameterHandlers()
{
return parameters.Select(p => p.Binding);
}
/// <summary>
/// Returns the configured result handler.
/// </summary>
/// <returns></returns>
public ResultHandler GetResultHandler()
{
return result.Handler;
}
}
/// <inheritdoc />
/// <summary>
/// Default implementation for IBindingParameter
/// </summary>
public class ControllerBindingParameter : IBindingParameter
{
/// <summary>
/// Provides access to the configured binding.
/// </summary>
public ValueFactory Binding { get; set; }
/// <inheritdoc />
public ParameterInfo Info { get; }
/// <inheritdoc />
public bool HasBinding => Binding != null;
/// <inheritdoc />
public ControllerBindingParameter(ParameterInfo info)
{
Info = info;
}
/// <inheritdoc />
public void SetBinding(ValueFactory valueFactory)
{
if (Binding != null)
throw new InvalidOperationException("SetBinding can only be called once");
Binding = valueFactory;
}
}
/// <inheritdoc />
/// <summary>
/// Default implementation for IBindingResult
/// </summary>
public class ControllerBindingResult : IBindingResult
{
/// <summary>
/// Provides access to the configured handler.
/// </summary>
public ResultHandler Handler { get; set; }
/// <inheritdoc />
public ParameterInfo Info { get; }
/// <inheritdoc />
public bool HasHandler => Handler != null;
/// <inheritdoc />
public ControllerBindingResult(ParameterInfo info)
{
Info = info;
}
/// <inheritdoc />
public void SetHandler(ResultHandler resultHandler)
{
if (Handler != null)
throw new InvalidOperationException("SetHandler can only be called once");
Handler = resultHandler;
}
}
}

View File

@ -5,46 +5,60 @@ using Tapeti.Config;
namespace Tapeti.Default namespace Tapeti.Default
{ {
/// <inheritdoc cref="IControllerMessageContext" /> /// <inheritdoc cref="IControllerMessageContext" />
public class ControllerMessageContext : MessageContext, IControllerMessageContext public class ControllerMessageContext : IControllerMessageContext
{ {
private readonly Dictionary<string, object> items = new Dictionary<string, object>(); private readonly IMessageContext decoratedContext;
/// <inheritdoc /> /// <inheritdoc />
public object Controller { get; set; } public object Controller { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public new IControllerMethodBinding Binding { get; set; } public ITapetiConfig Config => decoratedContext.Config;
/// <inheritdoc />
public string Queue => decoratedContext.Queue;
/// <inheritdoc />
public string Exchange => decoratedContext.Exchange;
/// <inheritdoc />
public string RoutingKey => decoratedContext.RoutingKey;
/// <inheritdoc />
public object Message => decoratedContext.Message;
/// <inheritdoc />
public IMessageProperties Properties => decoratedContext.Properties;
IBinding IMessageContext.Binding => decoratedContext.Binding;
IControllerMethodBinding IControllerMessageContext.Binding => decoratedContext.Binding as IControllerMethodBinding;
/// <inheritdoc /> /// <inheritdoc />
public override void Dispose() public ControllerMessageContext(IMessageContext decoratedContext)
{ {
foreach (var item in items.Values) this.decoratedContext = decoratedContext;
(item as IDisposable)?.Dispose(); }
base.Dispose();
/// <inheritdoc />
public void Dispose()
{
} }
/// <inheritdoc /> /// <inheritdoc />
public void Store(string key, object value) public void Store(string key, object value)
{ {
items.Add(key, value); decoratedContext.Store(key, value);
} }
/// <inheritdoc /> /// <inheritdoc />
public bool Get<T>(string key, out T value) where T : class public bool Get<T>(string key, out T value) where T : class
{ {
if (!items.TryGetValue(key, out var objectValue)) return decoratedContext.Get(key, out value);
{
value = default(T);
return false;
}
value = (T)objectValue;
return true;
} }
} }
} }

View File

@ -1,7 +1,10 @@
using System; using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
using Tapeti.Helpers;
namespace Tapeti.Default namespace Tapeti.Default
{ {
@ -13,9 +16,68 @@ namespace Tapeti.Default
/// </summary> /// </summary>
public class ControllerMethodBinding : IBinding public class ControllerMethodBinding : IBinding
{ {
private readonly Type controller; /// <summary>
private readonly MethodInfo method; /// Contains all the required information to bind a controller method to a queue.
private readonly QueueInfo queueInfo; /// </summary>
public struct BindingInfo
{
/// <summary>
/// The controller type associated with this binding.
/// </summary>
public Type ControllerType;
/// <summary>
/// The method called when this binding is invoked.
/// </summary>
public MethodInfo Method;
/// <summary>
/// The queue this binding consumes.
/// </summary>
public QueueInfo QueueInfo;
/// <summary>
/// The message class handled by this binding's method.
/// </summary>
public Type MessageClass;
/// <summary>
/// Indicates whether this method accepts messages to the exchange by routing key, or direct-to-queue only.
/// </summary>
public BindingTargetMode BindingTargetMode;
/// <summary>
/// Value factories for the method parameters.
/// </summary>
public IEnumerable<ValueFactory> ParameterFactories;
/// <summary>
/// The return value handler.
/// </summary>
public ResultHandler ResultHandler;
/// <summary>
/// Filter middleware as registered by the binding middleware.
/// </summary>
public IReadOnlyList<IControllerFilterMiddleware> FilterMiddleware;
/// <summary>
/// Message middleware as registered by the binding middleware.
/// </summary>
public IReadOnlyList<IControllerMessageMiddleware> MessageMiddleware;
/// <summary>
/// Cleanup middleware as registered by the binding middleware.
/// </summary>
public IReadOnlyList<IControllerCleanupMiddleware> CleanupMiddleware;
}
private readonly IDependencyResolver dependencyResolver;
private readonly BindingInfo bindingInfo;
private readonly MessageHandlerFunc messageHandler;
/// <inheritdoc /> /// <inheritdoc />
@ -23,37 +85,170 @@ namespace Tapeti.Default
/// <inheritdoc /> /// <inheritdoc />
public ControllerMethodBinding(Type controller, MethodInfo method, QueueInfo queueInfo) public ControllerMethodBinding(IDependencyResolver dependencyResolver, BindingInfo bindingInfo)
{ {
this.controller = controller; this.dependencyResolver = dependencyResolver;
this.method = method; this.bindingInfo = bindingInfo;
this.queueInfo = queueInfo;
messageHandler = WrapMethod(bindingInfo.Method, bindingInfo.ParameterFactories, bindingInfo.ResultHandler);
} }
/// <inheritdoc /> /// <inheritdoc />
public Task Apply(IBindingTarget target) public async Task Apply(IBindingTarget target)
{ {
// TODO ControllerMethodBinding switch (bindingInfo.BindingTargetMode)
throw new NotImplementedException(); {
case BindingTargetMode.Default:
if (bindingInfo.QueueInfo.Dynamic)
QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
else
{
await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
QueueName = bindingInfo.QueueInfo.Name;
}
break;
case BindingTargetMode.Direct:
if (bindingInfo.QueueInfo.Dynamic)
QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
else
{
await target.BindDurableDirect(bindingInfo.QueueInfo.Name);
QueueName = bindingInfo.QueueInfo.Name;
}
break;
default:
throw new ArgumentOutOfRangeException(nameof(bindingInfo.BindingTargetMode), bindingInfo.BindingTargetMode, "Invalid BindingTargetMode");
}
} }
/// <inheritdoc /> /// <inheritdoc />
public bool Accept(Type messageClass) public bool Accept(Type messageClass)
{ {
throw new NotImplementedException(); return messageClass == bindingInfo.MessageClass;
} }
/// <inheritdoc /> /// <inheritdoc />
public Task Invoke(IMessageContext context) public async Task Invoke(IMessageContext context)
{ {
throw new NotImplementedException(); var controller = dependencyResolver.Resolve(bindingInfo.ControllerType);
using (var controllerContext = new ControllerMessageContext(context)
{
Controller = controller
})
{
if (!await FilterAllowed(controllerContext))
return;
await MiddlewareHelper.GoAsync(
bindingInfo.MessageMiddleware,
async (handler, next) => await handler.Handle(controllerContext, next),
async () => await messageHandler(controllerContext));
} }
}
/// <inheritdoc />
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
{
await MiddlewareHelper.GoAsync(
bindingInfo.CleanupMiddleware,
async (handler, next) => await handler.Cleanup(context, ConsumeResult.Success, next),
() => Task.CompletedTask);
}
private async Task<bool> FilterAllowed(IControllerMessageContext context)
{
var allowed = false;
await MiddlewareHelper.GoAsync(
bindingInfo.FilterMiddleware,
async (handler, next) => await handler.Filter(context, next),
() =>
{
allowed = true;
return Task.CompletedTask;
});
return allowed;
}
private delegate Task MessageHandlerFunc(IControllerMessageContext context);
private static MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameterFactories, ResultHandler resultHandler)
{
if (resultHandler != null)
return WrapResultHandlerMethod(method, parameterFactories, resultHandler);
if (method.ReturnType == typeof(void))
return WrapNullMethod(method, parameterFactories);
if (method.ReturnType == typeof(Task))
return WrapTaskMethod(method, parameterFactories);
if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
return WrapGenericTaskMethod(method, parameterFactories);
return WrapObjectMethod(method, parameterFactories);
}
private static MessageHandlerFunc WrapResultHandlerMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories, ResultHandler resultHandler)
{
return context =>
{
var result = method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
return resultHandler(context, result);
};
}
private static MessageHandlerFunc WrapNullMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
{
return context =>
{
method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
return Task.CompletedTask;
};
}
private static MessageHandlerFunc WrapTaskMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
{
return context => (Task)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
}
private static MessageHandlerFunc WrapGenericTaskMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
{
return context =>
{
return (Task<object>)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
};
}
private static MessageHandlerFunc WrapObjectMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
{
return context =>
{
return Task.FromResult(method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()));
};
}
/// <summary> /// <summary>
/// /// Contains information about the queue linked to the controller method.
/// </summary> /// </summary>
public class QueueInfo public class QueueInfo
{ {

View File

@ -12,6 +12,8 @@ namespace Tapeti.Default
{ {
/// <inheritdoc /> /// <inheritdoc />
public void Handle(IControllerBindingContext context, Action next) public void Handle(IControllerBindingContext context, Action next)
{
if (!context.HasMessageClass)
{ {
if (context.Parameters.Count == 0) if (context.Parameters.Count == 0)
throw new TopologyConfigurationException($"First parameter of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class"); throw new TopologyConfigurationException($"First parameter of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class");
@ -22,6 +24,7 @@ namespace Tapeti.Default
parameter.SetBinding(messageContext => messageContext.Message); parameter.SetBinding(messageContext => messageContext.Message);
context.SetMessageClass(parameter.Info.ParameterType); context.SetMessageClass(parameter.Info.ParameterType);
}
next(); next();
} }

View File

@ -1,10 +1,15 @@
using Tapeti.Config; using System;
using System.Collections.Generic;
using Tapeti.Config;
namespace Tapeti.Default namespace Tapeti.Default
{ {
/// <inheritdoc /> /// <inheritdoc />
public class MessageContext : IMessageContext public class MessageContext : IMessageContext
{ {
private readonly Dictionary<string, object> items = new Dictionary<string, object>();
/// <inheritdoc /> /// <inheritdoc />
public ITapetiConfig Config { get; set; } public ITapetiConfig Config { get; set; }
@ -26,9 +31,33 @@ namespace Tapeti.Default
/// <inheritdoc /> /// <inheritdoc />
public IBinding Binding { get; set; } public IBinding Binding { get; set; }
/// <inheritdoc /> /// <inheritdoc />
public virtual void Dispose() public void Dispose()
{ {
foreach (var item in items.Values)
(item as IDisposable)?.Dispose();
}
/// <inheritdoc />
public void Store(string key, object value)
{
items.Add(key, value);
}
/// <inheritdoc />
public bool Get<T>(string key, out T value) where T : class
{
if (!items.TryGetValue(key, out var objectValue))
{
value = default(T);
return false;
}
value = (T)objectValue;
return true;
} }
} }
} }

View File

@ -61,16 +61,16 @@ namespace Tapeti.Default
/// <inheritdoc /> /// <inheritdoc />
public RabbitMQMessageProperties(IBasicProperties BasicProperties) public RabbitMQMessageProperties(IBasicProperties basicProperties)
{ {
this.BasicProperties = BasicProperties; BasicProperties = basicProperties;
} }
/// <inheritdoc /> /// <inheritdoc />
public RabbitMQMessageProperties(IBasicProperties BasicProperties, IMessageProperties source) public RabbitMQMessageProperties(IBasicProperties basicProperties, IMessageProperties source)
{ {
this.BasicProperties = BasicProperties; BasicProperties = basicProperties;
if (source == null) if (source == null)
return; return;

View File

@ -10,8 +10,8 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> <PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="RabbitMQ.Client" Version="5.0.1" /> <PackageReference Include="RabbitMQ.Client" Version="5.1.0" />
<PackageReference Include="System.Configuration.ConfigurationManager" Version="4.5.0" /> <PackageReference Include="System.Configuration.ConfigurationManager" Version="4.5.0" />
</ItemGroup> </ItemGroup>

View File

@ -32,6 +32,12 @@ namespace Tapeti
public TapetiConfig(IDependencyResolver dependencyResolver) public TapetiConfig(IDependencyResolver dependencyResolver)
{ {
config = new Config(dependencyResolver); config = new Config(dependencyResolver);
Use(new DependencyResolverBinding());
Use(new PublishResultBinding());
// Registered last so it runs first and the MessageClass is known to other middleware
Use(new MessageBinding());
} }
@ -41,12 +47,6 @@ namespace Tapeti
if (config == null) if (config == null)
throw new InvalidOperationException("TapetiConfig.Build must only be called once"); throw new InvalidOperationException("TapetiConfig.Build must only be called once");
Use(new DependencyResolverBinding());
Use(new PublishResultBinding());
// Registered last so it runs first and the MessageClass is known to other middleware
Use(new MessageBinding());
RegisterDefaults(); RegisterDefaults();
(config.DependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<ITapetiConfig>(config); (config.DependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<ITapetiConfig>(config);
@ -201,7 +201,7 @@ namespace Tapeti
if (config == null) if (config == null)
throw new InvalidOperationException("TapetiConfig can not be updated after Build"); throw new InvalidOperationException("TapetiConfig can not be updated after Build");
return null; return config;
} }
@ -308,130 +308,4 @@ namespace Tapeti
} }
} }
} }
/*
public delegate Task MessageHandlerFunc(IMessageContext context, object message);
protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method)
{
var allowBinding= false;
MiddlewareHelper.Go(bindingMiddleware,
(handler, next) => handler.Handle(context, next),
() =>
{
allowBinding = true;
});
if (!allowBinding)
return null;
if (context.MessageClass == null)
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} does not resolve to a message class");
var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList();
// ReSharper disable once InvertIf
if (invalidBindings.Count > 0)
{
var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name));
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}");
}
var resultHandler = ((IBindingResultAccess) context.Result).GetHandler();
return WrapMethod(method, context.Parameters.Select(p => ((IBindingParameterAccess)p).GetBinding()), resultHandler);
}
protected MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameters, ResultHandler resultHandler)
{
if (resultHandler != null)
return WrapResultHandlerMethod(method, parameters, resultHandler);
if (method.ReturnType == typeof(void))
return WrapNullMethod(method, parameters);
if (method.ReturnType == typeof(Task))
return WrapTaskMethod(method, parameters);
if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
return WrapGenericTaskMethod(method, parameters);
return WrapObjectMethod(method, parameters);
}
protected MessageHandlerFunc WrapResultHandlerMethod(MethodInfo method, IEnumerable<ValueFactory> parameters, ResultHandler resultHandler)
{
return (context, message) =>
{
var result = method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
return resultHandler(context, result);
};
}
protected MessageHandlerFunc WrapNullMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
{
return (context, message) =>
{
method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
return Task.CompletedTask;
};
}
protected MessageHandlerFunc WrapTaskMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
{
return (context, message) => (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
}
protected MessageHandlerFunc WrapGenericTaskMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
{
return (context, message) =>
{
return (Task<object>)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
};
}
protected MessageHandlerFunc WrapObjectMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
{
return (context, message) =>
{
return Task.FromResult(method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()));
};
}
protected void AddStaticRegistration(IBindingQueueInfo binding)
{
if (staticRegistrations.ContainsKey(binding.QueueInfo.Name))
{
var existing = staticRegistrations[binding.QueueInfo.Name];
// TODO allow multiple only if there is a filter which guarantees uniqueness? and/or move to independant validation middleware
//if (existing.Any(h => h.MessageClass == binding.MessageClass))
// throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}");
existing.Add(binding);
}
else
staticRegistrations.Add(binding.QueueInfo.Name, new List<IBinding> { binding });
}
protected string GetDynamicQueueName(string prefix)
{
if (String.IsNullOrEmpty(prefix))
return "";
return prefix + "." + Guid.NewGuid().ToString("N");
}
}
*/
} }

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Reflection; using System.Reflection;
using Tapeti.Annotations; using Tapeti.Annotations;
@ -44,40 +45,53 @@ namespace Tapeti
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false)
.Select(m => (MethodInfo)m)) .Select(m => (MethodInfo)m))
{ {
// TODO create binding for method
/*
var context = new BindingContext(method);
var messageHandler = GetMessageHandler(context, method);
if (messageHandler == null)
continue;
*/
var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo; var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo;
if (methodQueueInfo == null || !methodQueueInfo.IsValid) if (methodQueueInfo == null || !methodQueueInfo.IsValid)
throw new TopologyConfigurationException( throw new TopologyConfigurationException(
$"Method {method.Name} or controller {controller.Name} requires a queue attribute"); $"Method {method.Name} or controller {controller.Name} requires a queue attribute");
/*
var handlerInfo = new Binding var context = new ControllerBindingContext(method.GetParameters(), method.ReturnParameter)
{ {
Controller = controller, Controller = controller,
Method = method, Method = method
QueueInfo = methodQueueInfo,
QueueBindingMode = context.QueueBindingMode,
MessageClass = context.MessageClass,
MessageHandler = messageHandler,
MessageMiddleware = context.MessageMiddleware,
MessageFilterMiddleware = context.MessageFilterMiddleware
}; };
if (methodQueueInfo.Dynamic.GetValueOrDefault())
AddDynamicRegistration(handlerInfo);
else
AddStaticRegistration(handlerInfo);
*/
builder.RegisterBinding(new ControllerMethodBinding(controller, method, methodQueueInfo)); var allowBinding = false;
builderAccess.ApplyBindingMiddleware(context, () => { allowBinding = true; });
if (!allowBinding)
continue;
if (context.MessageClass == null)
throw new TopologyConfigurationException($"Method {method.Name} in controller {controller.Name} does not resolve to a message class");
var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList();
if (invalidBindings.Count > 0)
{
var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name));
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}");
}
builder.RegisterBinding(new ControllerMethodBinding(builderAccess.DependencyResolver, new ControllerMethodBinding.BindingInfo
{
ControllerType = controller,
Method = method,
QueueInfo = methodQueueInfo,
MessageClass = context.MessageClass,
BindingTargetMode = context.BindingTargetMode,
ParameterFactories = context.GetParameterHandlers(),
ResultHandler = context.GetResultHandler(),
FilterMiddleware = context.Middleware.Where(m => m is IControllerFilterMiddleware).Cast<IControllerFilterMiddleware>().ToList(),
MessageMiddleware = context.Middleware.Where(m => m is IControllerMessageMiddleware).Cast<IControllerMessageMiddleware>().ToList(),
CleanupMiddleware = context.Middleware.Where(m => m is IControllerCleanupMiddleware).Cast<IControllerCleanupMiddleware>().ToList()
}));
} }
return builder; return builder;

View File

@ -18,6 +18,8 @@ namespace Test
this.flowProvider = flowProvider; this.flowProvider = flowProvider;
} }
[Start]
public IYieldPoint StartFlow(PingMessage message) public IYieldPoint StartFlow(PingMessage message)
{ {
Console.WriteLine("PingMessage received, calling flowProvider.End() directly"); Console.WriteLine("PingMessage received, calling flowProvider.End() directly");

View File

@ -1,12 +1,10 @@
using System; using System;
using System.Runtime.CompilerServices;
using SimpleInjector; using SimpleInjector;
using Tapeti; using Tapeti;
using Tapeti.DataAnnotations; using Tapeti.DataAnnotations;
using Tapeti.Flow; using Tapeti.Flow;
using Tapeti.SimpleInjector; using Tapeti.SimpleInjector;
using System.Threading; using System.Threading;
using Tapeti.Annotations;
using Tapeti.Transient; using Tapeti.Transient;
namespace Test namespace Test