Fixed timing issue when a parallel flow response arrives before the flow is stored

This commit is contained in:
Mark van Renswoude 2023-04-17 16:40:26 +02:00
parent b66c448c17
commit 0573ffc93c
2 changed files with 49 additions and 22 deletions

View File

@ -94,7 +94,9 @@ namespace Tapeti.Flow.Default
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext =>
{
await flowContext.Store(context.Binding.QueueType == QueueType.Durable);
// IFlowParallelRequest.AddRequest will store the flow immediately
if (!flowPayload.FlowContext.IsStoredOrDeleted())
await flowContext.Store(context.Binding.QueueType == QueueType.Durable);
}));
}

View File

@ -48,7 +48,7 @@ namespace Tapeti.Flow.Default
/// <inheritdoc />
public IFlowParallelRequestBuilder YieldWithParallelRequest()
{
return new ParallelRequestBuilder(config, this);
return new ParallelRequestBuilder(config, this, publisher);
}
/// <inheritdoc />
@ -64,8 +64,8 @@ namespace Tapeti.Flow.Default
}
internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true)
internal async Task<MessageProperties> PrepareRequest(FlowContext context, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false)
{
if (context.FlowState == null)
{
@ -89,8 +89,15 @@ namespace Tapeti.Flow.Default
ReplyTo = responseHandlerInfo.ReplyToQueue
};
if (store)
await context.Store(responseHandlerInfo.IsDurableQueue);
return properties;
}
internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false)
{
var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync);
await context.Store(responseHandlerInfo.IsDurableQueue);
await publisher.Publish(message, properties, true);
}
@ -195,7 +202,7 @@ namespace Tapeti.Flow.Default
};
}
/// <inheritdoc />
public async Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint)
{
@ -221,7 +228,7 @@ namespace Tapeti.Flow.Default
}
else
flowContext = flowPayload.FlowContext;
try
{
await executableYieldPoint.Execute(flowContext);
@ -256,7 +263,7 @@ namespace Tapeti.Flow.Default
/// <inheritdoc />
public Task Converge(IFlowHandlerContext context)
{
return Execute(context, new DelegateYieldPoint(flowContext =>
return Execute(context, new DelegateYieldPoint(flowContext =>
Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync)));
}
@ -276,7 +283,7 @@ namespace Tapeti.Flow.Default
if (convergeMethodSync)
yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] { });
else
yieldPoint = await(Task<IYieldPoint>)method.Invoke(controllerPayload.Controller, new object[] { });
yieldPoint = await (Task<IYieldPoint>)method.Invoke(controllerPayload.Controller, new object[] { });
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}");
@ -297,13 +304,15 @@ namespace Tapeti.Flow.Default
private readonly ITapetiConfig config;
private readonly FlowProvider flowProvider;
private readonly IInternalPublisher publisher;
private readonly List<RequestInfo> requests = new List<RequestInfo>();
public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider)
public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider, IInternalPublisher publisher)
{
this.config = config;
this.flowProvider = flowProvider;
this.publisher = publisher;
}
@ -325,7 +334,7 @@ namespace Tapeti.Flow.Default
}
public IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler)
private IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler)
{
requests.Add(new RequestInfo
{
@ -378,18 +387,21 @@ namespace Tapeti.Flow.Default
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType())
throw new YieldPointException("Converge method must be in the same controller class");
var preparedRequests = new List<PreparedRequest>();
foreach (var requestInfo in requests)
{
await flowProvider.SendRequest(
var properties = await flowProvider.PrepareRequest(
context,
requestInfo.Message,
requestInfo.ResponseHandlerInfo,
convergeMethod.Method.Name,
convergeMethodSync,
false);
convergeMethodSync);
preparedRequests.Add(new PreparedRequest(requestInfo.Message, properties));
}
await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue));
await Task.WhenAll(preparedRequests.Select(r => publisher.Publish(r.Message, r.Properties, true)));
});
}
}
@ -433,12 +445,11 @@ namespace Tapeti.Flow.Default
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return flowProvider.SendRequest(
flowContext,
message,
responseHandlerInfo,
flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync,
false);
flowContext,
message,
responseHandlerInfo,
flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync);
}
}
@ -449,5 +460,19 @@ namespace Tapeti.Flow.Default
public string ReplyToQueue { get; set; }
public bool IsDurableQueue { get; set; }
}
internal class PreparedRequest
{
public object Message { get; }
public MessageProperties Properties { get; }
public PreparedRequest(object message, MessageProperties properties)
{
Message = message;
Properties = properties;
}
}
}
}