Fixed timing issue when a parallel flow response arrives before the flow is stored

This commit is contained in:
Mark van Renswoude 2023-04-20 10:40:13 +02:00
parent 4ce318b560
commit 9c40c2928c
2 changed files with 47 additions and 20 deletions

View File

@ -154,7 +154,9 @@ namespace Tapeti.Flow.Default
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext => return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext =>
{ {
await flowContext.Store(context.Binding.QueueType == QueueType.Durable); // IFlowParallelRequest.AddRequest will store the flow immediately
if (!flowPayload.FlowContext.IsStoredOrDeleted())
await flowContext.Store(context.Binding.QueueType == QueueType.Durable);
})); }));
} }

View File

@ -1,4 +1,4 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
@ -55,7 +55,7 @@ namespace Tapeti.Flow.Default
/// <inheritdoc /> /// <inheritdoc />
public IFlowParallelRequestBuilder YieldWithParallelRequest() public IFlowParallelRequestBuilder YieldWithParallelRequest()
{ {
return new ParallelRequestBuilder(config, this); return new ParallelRequestBuilder(config, this, publisher);
} }
/// <inheritdoc /> /// <inheritdoc />
@ -71,8 +71,8 @@ namespace Tapeti.Flow.Default
} }
internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, internal async Task<MessageProperties> PrepareRequest(FlowContext context, ResponseHandlerInfo responseHandlerInfo,
string? convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true) string convergeMethodName = null, bool convergeMethodTaskSync = false)
{ {
if (!context.HasFlowStateAndLock) if (!context.HasFlowStateAndLock)
{ {
@ -96,8 +96,15 @@ namespace Tapeti.Flow.Default
ReplyTo = responseHandlerInfo.ReplyToQueue ReplyTo = responseHandlerInfo.ReplyToQueue
}; };
if (store) return properties;
await context.Store(responseHandlerInfo.IsDurableQueue); }
internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false)
{
var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync);
await context.Store(responseHandlerInfo.IsDurableQueue);
await publisher.Publish(message, properties, true); await publisher.Publish(message, properties, true);
} }
@ -200,7 +207,7 @@ namespace Tapeti.Flow.Default
flowContext.SetFlowState(flowState, flowStateLock); flowContext.SetFlowState(flowState, flowStateLock);
} }
/// <inheritdoc /> /// <inheritdoc />
public async ValueTask Execute(IFlowHandlerContext context, IYieldPoint yieldPoint) public async ValueTask Execute(IFlowHandlerContext context, IYieldPoint yieldPoint)
{ {
@ -222,7 +229,7 @@ namespace Tapeti.Flow.Default
} }
else else
flowContext = flowPayload.FlowContext; flowContext = flowPayload.FlowContext;
try try
{ {
await executableYieldPoint.Execute(flowContext); await executableYieldPoint.Execute(flowContext);
@ -327,13 +334,15 @@ namespace Tapeti.Flow.Default
private readonly ITapetiConfig config; private readonly ITapetiConfig config;
private readonly FlowProvider flowProvider; private readonly FlowProvider flowProvider;
private readonly IInternalPublisher publisher;
private readonly List<RequestInfo> requests = new(); private readonly List<RequestInfo> requests = new();
public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider) public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider, IInternalPublisher publisher)
{ {
this.config = config; this.config = config;
this.flowProvider = flowProvider; this.flowProvider = flowProvider;
this.publisher = publisher;
} }
@ -407,18 +416,21 @@ namespace Tapeti.Flow.Default
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller?.GetType()) if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller?.GetType())
throw new YieldPointException("Converge method must be in the same controller class"); throw new YieldPointException("Converge method must be in the same controller class");
var preparedRequests = new List<PreparedRequest>();
foreach (var requestInfo in requests) foreach (var requestInfo in requests)
{ {
await flowProvider.SendRequest( var properties = await flowProvider.PrepareRequest(
context, context,
requestInfo.Message,
requestInfo.ResponseHandlerInfo, requestInfo.ResponseHandlerInfo,
convergeMethod.Method.Name, convergeMethod.Method.Name,
convergeMethodSync, convergeMethodSync);
false);
preparedRequests.Add(new PreparedRequest(requestInfo.Message, properties));
} }
await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue));
await Task.WhenAll(preparedRequests.Select(r => publisher.Publish(r.Message, r.Properties, true)));
}); });
} }
} }
@ -465,12 +477,11 @@ namespace Tapeti.Flow.Default
throw new InvalidOperationException("No ContinuationMetadata in FlowContext"); throw new InvalidOperationException("No ContinuationMetadata in FlowContext");
return flowProvider.SendRequest( return flowProvider.SendRequest(
flowContext, flowContext,
message, message,
responseHandlerInfo, responseHandlerInfo,
flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync, flowContext.ContinuationMetadata.ConvergeMethodSync);
false);
} }
} }
@ -489,5 +500,19 @@ namespace Tapeti.Flow.Default
IsDurableQueue = isDurableQueue; IsDurableQueue = isDurableQueue;
} }
} }
internal class PreparedRequest
{
public object Message { get; }
public MessageProperties Properties { get; }
public PreparedRequest(object message, MessageProperties properties)
{
Message = message;
Properties = properties;
}
}
} }
} }