From 9c40c2928cbdae0c728d72527a490f20d13d932e Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 20 Apr 2023 10:40:13 +0200 Subject: [PATCH] Fixed timing issue when a parallel flow response arrives before the flow is stored --- Tapeti.Flow/Default/FlowBindingMiddleware.cs | 4 +- Tapeti.Flow/Default/FlowProvider.cs | 63 ++++++++++++++------ 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index c501feb..6f6a887 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -154,7 +154,9 @@ namespace Tapeti.Flow.Default var flowHandler = context.Config.DependencyResolver.Resolve(); return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext => { - await flowContext.Store(context.Binding.QueueType == QueueType.Durable); + // IFlowParallelRequest.AddRequest will store the flow immediately + if (!flowPayload.FlowContext.IsStoredOrDeleted()) + await flowContext.Store(context.Binding.QueueType == QueueType.Durable); })); } diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index d162836..62e3176 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -55,7 +55,7 @@ namespace Tapeti.Flow.Default /// public IFlowParallelRequestBuilder YieldWithParallelRequest() { - return new ParallelRequestBuilder(config, this); + return new ParallelRequestBuilder(config, this, publisher); } /// @@ -71,8 +71,8 @@ namespace Tapeti.Flow.Default } - internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, - string? convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true) + internal async Task PrepareRequest(FlowContext context, ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName = null, bool convergeMethodTaskSync = false) { if (!context.HasFlowStateAndLock) { @@ -96,8 +96,15 @@ namespace Tapeti.Flow.Default ReplyTo = responseHandlerInfo.ReplyToQueue }; - if (store) - await context.Store(responseHandlerInfo.IsDurableQueue); + return properties; + } + + + internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName = null, bool convergeMethodTaskSync = false) + { + var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync); + await context.Store(responseHandlerInfo.IsDurableQueue); await publisher.Publish(message, properties, true); } @@ -200,7 +207,7 @@ namespace Tapeti.Flow.Default flowContext.SetFlowState(flowState, flowStateLock); } - + /// public async ValueTask Execute(IFlowHandlerContext context, IYieldPoint yieldPoint) { @@ -222,7 +229,7 @@ namespace Tapeti.Flow.Default } else flowContext = flowPayload.FlowContext; - + try { await executableYieldPoint.Execute(flowContext); @@ -327,13 +334,15 @@ namespace Tapeti.Flow.Default private readonly ITapetiConfig config; private readonly FlowProvider flowProvider; + private readonly IInternalPublisher publisher; private readonly List requests = new(); - public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider) + public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider, IInternalPublisher publisher) { this.config = config; this.flowProvider = flowProvider; + this.publisher = publisher; } @@ -407,18 +416,21 @@ namespace Tapeti.Flow.Default if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller?.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); + var preparedRequests = new List(); + foreach (var requestInfo in requests) { - await flowProvider.SendRequest( + var properties = await flowProvider.PrepareRequest( context, - requestInfo.Message, requestInfo.ResponseHandlerInfo, convergeMethod.Method.Name, - convergeMethodSync, - false); + convergeMethodSync); + + preparedRequests.Add(new PreparedRequest(requestInfo.Message, properties)); } await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); + await Task.WhenAll(preparedRequests.Select(r => publisher.Publish(r.Message, r.Properties, true))); }); } } @@ -465,12 +477,11 @@ namespace Tapeti.Flow.Default throw new InvalidOperationException("No ContinuationMetadata in FlowContext"); return flowProvider.SendRequest( - flowContext, - message, - responseHandlerInfo, - flowContext.ContinuationMetadata.ConvergeMethodName, - flowContext.ContinuationMetadata.ConvergeMethodSync, - false); + flowContext, + message, + responseHandlerInfo, + flowContext.ContinuationMetadata.ConvergeMethodName, + flowContext.ContinuationMetadata.ConvergeMethodSync); } } @@ -489,5 +500,19 @@ namespace Tapeti.Flow.Default IsDurableQueue = isDurableQueue; } } + + + internal class PreparedRequest + { + public object Message { get; } + public MessageProperties Properties { get; } + + + public PreparedRequest(object message, MessageProperties properties) + { + Message = message; + Properties = properties; + } + } } }