diff --git a/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs b/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs index e91c358..8b2ed85 100644 --- a/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs +++ b/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs @@ -12,12 +12,12 @@ namespace Tapeti.DataAnnotations internal class DataAnnotationsMessageMiddleware : IMessageMiddleware { /// - public Task Handle(IMessageContext context, Func next) + public async Task Handle(IMessageContext context, Func next) { var validationContext = new ValidationContext(context.Message); Validator.ValidateObject(context.Message, validationContext, true); - return next(); + await next(); } } } diff --git a/Tapeti.DataAnnotations/DataAnnotationsPublishMiddleware.cs b/Tapeti.DataAnnotations/DataAnnotationsPublishMiddleware.cs index dd5fd25..514989c 100644 --- a/Tapeti.DataAnnotations/DataAnnotationsPublishMiddleware.cs +++ b/Tapeti.DataAnnotations/DataAnnotationsPublishMiddleware.cs @@ -12,12 +12,12 @@ namespace Tapeti.DataAnnotations internal class DataAnnotationsPublishMiddleware : IPublishMiddleware { /// - public Task Handle(IPublishContext context, Func next) + public async Task Handle(IPublishContext context, Func next) { var validationContext = new ValidationContext(context.Message); Validator.ValidateObject(context.Message, validationContext, true); - return next(); + await next(); } } } diff --git a/Tapeti.Flow/Default/DelegateYieldPoint.cs b/Tapeti.Flow/Default/DelegateYieldPoint.cs index 2ed0926..dc1aa35 100644 --- a/Tapeti.Flow/Default/DelegateYieldPoint.cs +++ b/Tapeti.Flow/Default/DelegateYieldPoint.cs @@ -14,9 +14,9 @@ namespace Tapeti.Flow.Default } - public Task Execute(FlowContext context) + public async Task Execute(FlowContext context) { - return onExecute(context); + await onExecute(context); } } } diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 00f8469..4dfb3ba 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -77,14 +77,14 @@ namespace Tapeti.Flow.Default private static Task HandleYieldPoint(IControllerMessageContext context, IYieldPoint yieldPoint) { var flowHandler = context.Config.DependencyResolver.Resolve(); - return flowHandler.Execute(context, yieldPoint); + return flowHandler.Execute(new FlowHandlerContext(context), yieldPoint); } private static Task HandleParallelResponse(IControllerMessageContext context) { var flowHandler = context.Config.DependencyResolver.Resolve(); - return flowHandler.Execute(context, new DelegateYieldPoint(async flowContext => + return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext => { await flowContext.Store(); })); diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs index 0746a31..96235a8 100644 --- a/Tapeti.Flow/Default/FlowContext.cs +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -1,12 +1,11 @@ using System; using System.Threading.Tasks; -using Tapeti.Config; namespace Tapeti.Flow.Default { internal class FlowContext : IDisposable { - public IControllerMessageContext MessageContext { get; set; } + public IFlowHandlerContext HandlerContext { get; set; } public IFlowStateLock FlowStateLock { get; set; } public FlowState FlowState { get; set; } @@ -21,11 +20,11 @@ namespace Tapeti.Flow.Default { storeCalled = true; - if (MessageContext == null) throw new ArgumentNullException(nameof(MessageContext)); + if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext)); if (FlowState == null) throw new ArgumentNullException(nameof(FlowState)); if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock)); - FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(MessageContext.Controller); + FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(HandlerContext.Controller); await FlowStateLock.StoreFlowState(FlowState); } diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index 852fcfd..cb6fe78 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -92,7 +92,7 @@ namespace Tapeti.Flow.Default flowContext = new FlowContext { - MessageContext = context, + HandlerContext = new FlowHandlerContext(context), FlowStateLock = flowStateLock, FlowState = flowState, @@ -124,7 +124,7 @@ namespace Tapeti.Flow.Default throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for converge method {methodName}"); var flowHandler = context.Config.DependencyResolver.Resolve(); - await flowHandler.Execute(context, yieldPoint); + await flowHandler.Execute(new FlowHandlerContext(context), yieldPoint); } } } diff --git a/Tapeti.Flow/Default/FlowHandlerContext.cs b/Tapeti.Flow/Default/FlowHandlerContext.cs new file mode 100644 index 0000000..360318c --- /dev/null +++ b/Tapeti.Flow/Default/FlowHandlerContext.cs @@ -0,0 +1,49 @@ +using System; +using System.Reflection; +using Tapeti.Config; + +namespace Tapeti.Flow.Default +{ + /// + /// + /// Default implementation for IFlowHandlerContext + /// + internal class FlowHandlerContext : IFlowHandlerContext + { + /// + public FlowHandlerContext() + { + } + + + /// + public FlowHandlerContext(IControllerMessageContext source) + { + if (source == null) + return; + + Config = source.Config; + Controller = source.Controller; + Method = source.Binding.Method; + ControllerMessageContext = source; + } + + + /// + public void Dispose() + { + } + + /// + public ITapetiConfig Config { get; set; } + + /// + public object Controller { get; set; } + + /// + public MethodInfo Method { get; set; } + + /// + public IControllerMessageContext ControllerMessageContext { get; set; } + } +} diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index d493617..ce66b0a 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -97,7 +97,7 @@ namespace Tapeti.Flow.Default private async Task SendResponse(FlowContext context, object message) { var reply = context.FlowState == null - ? GetReply(context.MessageContext) + ? GetReply(context.HandlerContext) : context.FlowState.Metadata.Reply; if (reply == null) @@ -155,24 +155,24 @@ namespace Tapeti.Flow.Default } - private static ReplyMetadata GetReply(IMessageContext context) + private static ReplyMetadata GetReply(IFlowHandlerContext context) { - var requestAttribute = context.Message?.GetType().GetCustomAttribute(); + var requestAttribute = context.ControllerMessageContext?.Message?.GetType().GetCustomAttribute(); if (requestAttribute?.Response == null) return null; return new ReplyMetadata { - CorrelationId = context.Properties.CorrelationId, - ReplyTo = context.Properties.ReplyTo, + CorrelationId = context.ControllerMessageContext.Properties.CorrelationId, + ReplyTo = context.ControllerMessageContext.Properties.ReplyTo, ResponseTypeName = requestAttribute.Response.FullName, - Mandatory = context.Properties.Persistent.GetValueOrDefault(true) + Mandatory = context.ControllerMessageContext.Properties.Persistent.GetValueOrDefault(true) }; } private static async Task CreateNewFlowState(FlowContext flowContext) { - var flowStore = flowContext.MessageContext.Config.DependencyResolver.Resolve(); + var flowStore = flowContext.HandlerContext.Config.DependencyResolver.Resolve(); var flowID = Guid.NewGuid(); flowContext.FlowStateLock = await flowStore.LockFlowState(flowID); @@ -184,25 +184,27 @@ namespace Tapeti.Flow.Default { Metadata = new FlowMetadata { - Reply = GetReply(flowContext.MessageContext) + Reply = GetReply(flowContext.HandlerContext) } }; } + /// - public async Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint) + public async Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint) { if (!(yieldPoint is DelegateYieldPoint executableYieldPoint)) - throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Binding.Method.Name}"); + throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Method.Name}"); - if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext)) + var messageContext = context.ControllerMessageContext; + if (messageContext == null || !messageContext.Get(ContextItems.FlowContext, out FlowContext flowContext)) { flowContext = new FlowContext { - MessageContext = context + HandlerContext = context }; - context.Store(ContextItems.FlowContext, flowContext); + messageContext?.Store(ContextItems.FlowContext, flowContext); } try @@ -213,7 +215,7 @@ namespace Tapeti.Flow.Default { // Useful for debugging e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName; - e.Data["Tapeti.Controller.Method"] = context.Binding.Method.Name; + e.Data["Tapeti.Controller.Method"] = context.Method.Name; throw; } @@ -293,7 +295,7 @@ namespace Tapeti.Flow.Default return new DelegateYieldPoint(context => { - if (convergeMethod.Method.DeclaringType != context.MessageContext.Controller.GetType()) + if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); return Task.WhenAll(requests.Select(requestInfo => diff --git a/Tapeti.Flow/Default/FlowStarter.cs b/Tapeti.Flow/Default/FlowStarter.cs index 5199c42..68c8813 100644 --- a/Tapeti.Flow/Default/FlowStarter.cs +++ b/Tapeti.Flow/Default/FlowStarter.cs @@ -13,85 +13,56 @@ namespace Tapeti.Flow.Default internal class FlowStarter : IFlowStarter { private readonly ITapetiConfig config; - private readonly ILogger logger; /// - public FlowStarter(ITapetiConfig config, ILogger logger) + public FlowStarter(ITapetiConfig config) { this.config = config; - this.logger = logger; } /// - public Task Start(Expression>> methodSelector) where TController : class + public async Task Start(Expression>> methodSelector) where TController : class { - return CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] { }); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] { }); } /// - public Task Start(Expression>>> methodSelector) where TController : class + public async Task Start(Expression>>> methodSelector) where TController : class { - return CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, new object[] {}); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, new object[] {}); } /// - public Task Start(Expression>> methodSelector, TParameter parameter) where TController : class + public async Task Start(Expression>> methodSelector, TParameter parameter) where TController : class { - return CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] {parameter}); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] {parameter}); } /// - public Task Start(Expression>>> methodSelector, TParameter parameter) where TController : class + public async Task Start(Expression>>> methodSelector, TParameter parameter) where TController : class { - return CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, new object[] {parameter}); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, new object[] {parameter}); } - private async Task CallControllerMethod(MethodBase method, Func> getYieldPointResult, object[] parameters) where TController : class + private async Task CallControllerMethod(MethodInfo method, Func> getYieldPointResult, object[] parameters) where TController : class { var controller = config.DependencyResolver.Resolve(); var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters)); - /* - var context = new ControllerMessageContext() + var context = new FlowHandlerContext { Config = config, - Controller = controller + Controller = controller, + Method = method }; - */ var flowHandler = config.DependencyResolver.Resolve(); - - try - { - //await flowHandler.Execute(context, yieldPoint); - //handlingResult.ConsumeResponse = ConsumeResponse.Ack; - } - finally - { - //await RunCleanup(context, handlingResult.ToHandlingResult()); - } + await flowHandler.Execute(context, yieldPoint); } - /* - private async Task RunCleanup(MessageContext context, HandlingResult handlingResult) - { - foreach (var handler in config.CleanupMiddleware) - { - try - { - await handler.Handle(context, handlingResult); - } - catch (Exception eCleanup) - { - logger.HandlerException(eCleanup); - } - } - } - */ - private static MethodInfo GetExpressionMethod(Expression>> methodSelector) { @@ -105,6 +76,7 @@ namespace Tapeti.Flow.Default return method; } + private static MethodInfo GetExpressionMethod(Expression>> methodSelector) { var callExpression = (methodSelector.Body as UnaryExpression)?.Operand as MethodCallExpression; diff --git a/Tapeti.Flow/IFlowHandlerContext.cs b/Tapeti.Flow/IFlowHandlerContext.cs new file mode 100644 index 0000000..08cce12 --- /dev/null +++ b/Tapeti.Flow/IFlowHandlerContext.cs @@ -0,0 +1,37 @@ +using System; +using System.Reflection; +using Tapeti.Config; + +namespace Tapeti.Flow +{ + /// + /// + /// Provides information about the handler for the flow. + /// + public interface IFlowHandlerContext : IDisposable + { + /// + /// Provides access to the Tapeti config. + /// + ITapetiConfig Config { get; } + + + /// + /// An instance of the controller which starts or continues the flow. + /// + object Controller { get; } + + + /// + /// Information about the method which starts or continues the flow. + /// + MethodInfo Method { get; } + + + /// + /// Access to the controller message context if this is a continuated flow. + /// Will be null when in a starting flow. + /// + IControllerMessageContext ControllerMessageContext { get; } + } +} diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index 4cbfeec..5e56c21 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -109,7 +109,7 @@ namespace Tapeti.Flow /// /// /// - Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint); + Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint); }