From 0573ffc93c5762489277c4bc89e79d5f7920d5ec Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 17 Apr 2023 16:40:26 +0200 Subject: [PATCH] Fixed timing issue when a parallel flow response arrives before the flow is stored --- Tapeti.Flow/Default/FlowBindingMiddleware.cs | 4 +- Tapeti.Flow/Default/FlowProvider.cs | 67 ++++++++++++++------ 2 files changed, 49 insertions(+), 22 deletions(-) diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index c0f9565..6da725f 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -94,7 +94,9 @@ namespace Tapeti.Flow.Default var flowHandler = context.Config.DependencyResolver.Resolve(); return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext => { - await flowContext.Store(context.Binding.QueueType == QueueType.Durable); + // IFlowParallelRequest.AddRequest will store the flow immediately + if (!flowPayload.FlowContext.IsStoredOrDeleted()) + await flowContext.Store(context.Binding.QueueType == QueueType.Durable); })); } diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index df1a15c..49c198a 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -48,7 +48,7 @@ namespace Tapeti.Flow.Default /// public IFlowParallelRequestBuilder YieldWithParallelRequest() { - return new ParallelRequestBuilder(config, this); + return new ParallelRequestBuilder(config, this, publisher); } /// @@ -64,8 +64,8 @@ namespace Tapeti.Flow.Default } - internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true) + internal async Task PrepareRequest(FlowContext context, ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName = null, bool convergeMethodTaskSync = false) { if (context.FlowState == null) { @@ -89,8 +89,15 @@ namespace Tapeti.Flow.Default ReplyTo = responseHandlerInfo.ReplyToQueue }; - if (store) - await context.Store(responseHandlerInfo.IsDurableQueue); + return properties; + } + + + internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName = null, bool convergeMethodTaskSync = false) + { + var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync); + await context.Store(responseHandlerInfo.IsDurableQueue); await publisher.Publish(message, properties, true); } @@ -195,7 +202,7 @@ namespace Tapeti.Flow.Default }; } - + /// public async Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint) { @@ -221,7 +228,7 @@ namespace Tapeti.Flow.Default } else flowContext = flowPayload.FlowContext; - + try { await executableYieldPoint.Execute(flowContext); @@ -256,7 +263,7 @@ namespace Tapeti.Flow.Default /// public Task Converge(IFlowHandlerContext context) { - return Execute(context, new DelegateYieldPoint(flowContext => + return Execute(context, new DelegateYieldPoint(flowContext => Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync))); } @@ -276,7 +283,7 @@ namespace Tapeti.Flow.Default if (convergeMethodSync) yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] { }); else - yieldPoint = await(Task)method.Invoke(controllerPayload.Controller, new object[] { }); + yieldPoint = await (Task)method.Invoke(controllerPayload.Controller, new object[] { }); if (yieldPoint == null) throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}"); @@ -297,13 +304,15 @@ namespace Tapeti.Flow.Default private readonly ITapetiConfig config; private readonly FlowProvider flowProvider; + private readonly IInternalPublisher publisher; private readonly List requests = new List(); - public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider) + public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider, IInternalPublisher publisher) { this.config = config; this.flowProvider = flowProvider; + this.publisher = publisher; } @@ -325,7 +334,7 @@ namespace Tapeti.Flow.Default } - public IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler) + private IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler) { requests.Add(new RequestInfo { @@ -378,18 +387,21 @@ namespace Tapeti.Flow.Default if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); + var preparedRequests = new List(); + foreach (var requestInfo in requests) { - await flowProvider.SendRequest( + var properties = await flowProvider.PrepareRequest( context, - requestInfo.Message, requestInfo.ResponseHandlerInfo, convergeMethod.Method.Name, - convergeMethodSync, - false); + convergeMethodSync); + + preparedRequests.Add(new PreparedRequest(requestInfo.Message, properties)); } await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); + await Task.WhenAll(preparedRequests.Select(r => publisher.Publish(r.Message, r.Properties, true))); }); } } @@ -433,12 +445,11 @@ namespace Tapeti.Flow.Default var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); return flowProvider.SendRequest( - flowContext, - message, - responseHandlerInfo, - flowContext.ContinuationMetadata.ConvergeMethodName, - flowContext.ContinuationMetadata.ConvergeMethodSync, - false); + flowContext, + message, + responseHandlerInfo, + flowContext.ContinuationMetadata.ConvergeMethodName, + flowContext.ContinuationMetadata.ConvergeMethodSync); } } @@ -449,5 +460,19 @@ namespace Tapeti.Flow.Default public string ReplyToQueue { get; set; } public bool IsDurableQueue { get; set; } } + + + internal class PreparedRequest + { + public object Message { get; } + public MessageProperties Properties { get; } + + + public PreparedRequest(object message, MessageProperties properties) + { + Message = message; + Properties = properties; + } + } } }