[ci skip] Reimplemented FlowStarter

This commit is contained in:
Mark van Renswoude 2019-08-15 12:04:03 +02:00
parent d211d33108
commit 8ec85ac99f
11 changed files with 132 additions and 73 deletions

View File

@ -12,12 +12,12 @@ namespace Tapeti.DataAnnotations
internal class DataAnnotationsMessageMiddleware : IMessageMiddleware
{
/// <inheritdoc />
public Task Handle(IMessageContext context, Func<Task> next)
public async Task Handle(IMessageContext context, Func<Task> next)
{
var validationContext = new ValidationContext(context.Message);
Validator.ValidateObject(context.Message, validationContext, true);
return next();
await next();
}
}
}

View File

@ -12,12 +12,12 @@ namespace Tapeti.DataAnnotations
internal class DataAnnotationsPublishMiddleware : IPublishMiddleware
{
/// <inheritdoc />
public Task Handle(IPublishContext context, Func<Task> next)
public async Task Handle(IPublishContext context, Func<Task> next)
{
var validationContext = new ValidationContext(context.Message);
Validator.ValidateObject(context.Message, validationContext, true);
return next();
await next();
}
}
}

View File

@ -14,9 +14,9 @@ namespace Tapeti.Flow.Default
}
public Task Execute(FlowContext context)
public async Task Execute(FlowContext context)
{
return onExecute(context);
await onExecute(context);
}
}
}

View File

@ -77,14 +77,14 @@ namespace Tapeti.Flow.Default
private static Task HandleYieldPoint(IControllerMessageContext context, IYieldPoint yieldPoint)
{
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(context, yieldPoint);
return flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);
}
private static Task HandleParallelResponse(IControllerMessageContext context)
{
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(context, new DelegateYieldPoint(async flowContext =>
return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext =>
{
await flowContext.Store();
}));

View File

@ -1,12 +1,11 @@
using System;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Flow.Default
{
internal class FlowContext : IDisposable
{
public IControllerMessageContext MessageContext { get; set; }
public IFlowHandlerContext HandlerContext { get; set; }
public IFlowStateLock FlowStateLock { get; set; }
public FlowState FlowState { get; set; }
@ -21,11 +20,11 @@ namespace Tapeti.Flow.Default
{
storeCalled = true;
if (MessageContext == null) throw new ArgumentNullException(nameof(MessageContext));
if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext));
if (FlowState == null) throw new ArgumentNullException(nameof(FlowState));
if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock));
FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(MessageContext.Controller);
FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(HandlerContext.Controller);
await FlowStateLock.StoreFlowState(FlowState);
}

View File

@ -92,7 +92,7 @@ namespace Tapeti.Flow.Default
flowContext = new FlowContext
{
MessageContext = context,
HandlerContext = new FlowHandlerContext(context),
FlowStateLock = flowStateLock,
FlowState = flowState,
@ -124,7 +124,7 @@ namespace Tapeti.Flow.Default
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for converge method {methodName}");
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(context, yieldPoint);
await flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);
}
}
}

View File

@ -0,0 +1,49 @@
using System;
using System.Reflection;
using Tapeti.Config;
namespace Tapeti.Flow.Default
{
/// <inheritdoc />
/// <summary>
/// Default implementation for IFlowHandlerContext
/// </summary>
internal class FlowHandlerContext : IFlowHandlerContext
{
/// <inheritdoc />
public FlowHandlerContext()
{
}
/// <inheritdoc />
public FlowHandlerContext(IControllerMessageContext source)
{
if (source == null)
return;
Config = source.Config;
Controller = source.Controller;
Method = source.Binding.Method;
ControllerMessageContext = source;
}
/// <inheritdoc />
public void Dispose()
{
}
/// <inheritdoc />
public ITapetiConfig Config { get; set; }
/// <inheritdoc />
public object Controller { get; set; }
/// <inheritdoc />
public MethodInfo Method { get; set; }
/// <inheritdoc />
public IControllerMessageContext ControllerMessageContext { get; set; }
}
}

View File

@ -97,7 +97,7 @@ namespace Tapeti.Flow.Default
private async Task SendResponse(FlowContext context, object message)
{
var reply = context.FlowState == null
? GetReply(context.MessageContext)
? GetReply(context.HandlerContext)
: context.FlowState.Metadata.Reply;
if (reply == null)
@ -155,24 +155,24 @@ namespace Tapeti.Flow.Default
}
private static ReplyMetadata GetReply(IMessageContext context)
private static ReplyMetadata GetReply(IFlowHandlerContext context)
{
var requestAttribute = context.Message?.GetType().GetCustomAttribute<RequestAttribute>();
var requestAttribute = context.ControllerMessageContext?.Message?.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response == null)
return null;
return new ReplyMetadata
{
CorrelationId = context.Properties.CorrelationId,
ReplyTo = context.Properties.ReplyTo,
CorrelationId = context.ControllerMessageContext.Properties.CorrelationId,
ReplyTo = context.ControllerMessageContext.Properties.ReplyTo,
ResponseTypeName = requestAttribute.Response.FullName,
Mandatory = context.Properties.Persistent.GetValueOrDefault(true)
Mandatory = context.ControllerMessageContext.Properties.Persistent.GetValueOrDefault(true)
};
}
private static async Task CreateNewFlowState(FlowContext flowContext)
{
var flowStore = flowContext.MessageContext.Config.DependencyResolver.Resolve<IFlowStore>();
var flowStore = flowContext.HandlerContext.Config.DependencyResolver.Resolve<IFlowStore>();
var flowID = Guid.NewGuid();
flowContext.FlowStateLock = await flowStore.LockFlowState(flowID);
@ -184,25 +184,27 @@ namespace Tapeti.Flow.Default
{
Metadata = new FlowMetadata
{
Reply = GetReply(flowContext.MessageContext)
Reply = GetReply(flowContext.HandlerContext)
}
};
}
/// <inheritdoc />
public async Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint)
public async Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint)
{
if (!(yieldPoint is DelegateYieldPoint executableYieldPoint))
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Binding.Method.Name}");
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Method.Name}");
if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext))
var messageContext = context.ControllerMessageContext;
if (messageContext == null || !messageContext.Get(ContextItems.FlowContext, out FlowContext flowContext))
{
flowContext = new FlowContext
{
MessageContext = context
HandlerContext = context
};
context.Store(ContextItems.FlowContext, flowContext);
messageContext?.Store(ContextItems.FlowContext, flowContext);
}
try
@ -213,7 +215,7 @@ namespace Tapeti.Flow.Default
{
// Useful for debugging
e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName;
e.Data["Tapeti.Controller.Method"] = context.Binding.Method.Name;
e.Data["Tapeti.Controller.Method"] = context.Method.Name;
throw;
}
@ -293,7 +295,7 @@ namespace Tapeti.Flow.Default
return new DelegateYieldPoint(context =>
{
if (convergeMethod.Method.DeclaringType != context.MessageContext.Controller.GetType())
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType())
throw new YieldPointException("Converge method must be in the same controller class");
return Task.WhenAll(requests.Select(requestInfo =>

View File

@ -13,85 +13,56 @@ namespace Tapeti.Flow.Default
internal class FlowStarter : IFlowStarter
{
private readonly ITapetiConfig config;
private readonly ILogger logger;
/// <inheritdoc />
public FlowStarter(ITapetiConfig config, ILogger logger)
public FlowStarter(ITapetiConfig config)
{
this.config = config;
this.logger = logger;
}
/// <inheritdoc />
public Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class
public async Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class
{
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] { });
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] { });
}
/// <inheritdoc />
public Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class
public async Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class
{
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {});
}
/// <inheritdoc />
public Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class
public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class
{
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] {parameter});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] {parameter});
}
/// <inheritdoc />
public Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class
public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class
{
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {parameter});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {parameter});
}
private async Task CallControllerMethod<TController>(MethodBase method, Func<object, Task<IYieldPoint>> getYieldPointResult, object[] parameters) where TController : class
private async Task CallControllerMethod<TController>(MethodInfo method, Func<object, Task<IYieldPoint>> getYieldPointResult, object[] parameters) where TController : class
{
var controller = config.DependencyResolver.Resolve<TController>();
var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters));
/*
var context = new ControllerMessageContext()
var context = new FlowHandlerContext
{
Config = config,
Controller = controller
Controller = controller,
Method = method
};
*/
var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>();
try
{
//await flowHandler.Execute(context, yieldPoint);
//handlingResult.ConsumeResponse = ConsumeResponse.Ack;
}
finally
{
//await RunCleanup(context, handlingResult.ToHandlingResult());
}
await flowHandler.Execute(context, yieldPoint);
}
/*
private async Task RunCleanup(MessageContext context, HandlingResult handlingResult)
{
foreach (var handler in config.CleanupMiddleware)
{
try
{
await handler.Handle(context, handlingResult);
}
catch (Exception eCleanup)
{
logger.HandlerException(eCleanup);
}
}
}
*/
private static MethodInfo GetExpressionMethod<TController, TResult>(Expression<Func<TController, Func<TResult>>> methodSelector)
{
@ -105,6 +76,7 @@ namespace Tapeti.Flow.Default
return method;
}
private static MethodInfo GetExpressionMethod<TController, TResult, TParameter>(Expression<Func<TController, Func<TParameter, TResult>>> methodSelector)
{
var callExpression = (methodSelector.Body as UnaryExpression)?.Operand as MethodCallExpression;

View File

@ -0,0 +1,37 @@
using System;
using System.Reflection;
using Tapeti.Config;
namespace Tapeti.Flow
{
/// <inheritdoc />
/// <summary>
/// Provides information about the handler for the flow.
/// </summary>
public interface IFlowHandlerContext : IDisposable
{
/// <summary>
/// Provides access to the Tapeti config.
/// </summary>
ITapetiConfig Config { get; }
/// <summary>
/// An instance of the controller which starts or continues the flow.
/// </summary>
object Controller { get; }
/// <summary>
/// Information about the method which starts or continues the flow.
/// </summary>
MethodInfo Method { get; }
/// <summary>
/// Access to the controller message context if this is a continuated flow.
/// Will be null when in a starting flow.
/// </summary>
IControllerMessageContext ControllerMessageContext { get; }
}
}

View File

@ -109,7 +109,7 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="context"></param>
/// <param name="yieldPoint"></param>
Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint);
Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint);
}