2019-08-13 20:30:04 +02:00
|
|
|
|
using System;
|
|
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
using Tapeti.Config;
|
2023-04-13 08:39:43 +02:00
|
|
|
|
using Tapeti.Helpers;
|
2019-08-13 20:30:04 +02:00
|
|
|
|
|
|
|
|
|
namespace Tapeti.Flow.Default
|
|
|
|
|
{
|
2019-08-15 11:26:55 +02:00
|
|
|
|
/// <inheritdoc cref="IControllerMessageMiddleware"/> />
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// Handles methods marked with the Continuation attribute.
|
|
|
|
|
/// </summary>
|
|
|
|
|
internal class FlowContinuationMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware
|
2019-08-13 20:30:04 +02:00
|
|
|
|
{
|
2022-02-09 11:26:56 +01:00
|
|
|
|
public async ValueTask Filter(IMessageContext context, Func<ValueTask> next)
|
2019-08-13 20:30:04 +02:00
|
|
|
|
{
|
2021-09-02 16:16:11 +02:00
|
|
|
|
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
|
|
|
|
|
return;
|
|
|
|
|
|
2024-04-08 14:20:15 +02:00
|
|
|
|
var flowContext = await EnrichWithFlowContext(context).ConfigureAwait(false);
|
2019-08-13 20:30:04 +02:00
|
|
|
|
if (flowContext?.ContinuationMetadata == null)
|
|
|
|
|
return;
|
|
|
|
|
|
2021-09-02 16:16:11 +02:00
|
|
|
|
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
|
2019-08-13 20:30:04 +02:00
|
|
|
|
return;
|
|
|
|
|
|
2024-04-08 14:20:15 +02:00
|
|
|
|
await next().ConfigureAwait(false);
|
2019-08-13 20:30:04 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2022-02-09 11:26:56 +01:00
|
|
|
|
public async ValueTask Handle(IMessageContext context, Func<ValueTask> next)
|
2019-08-13 20:30:04 +02:00
|
|
|
|
{
|
2021-09-02 16:16:11 +02:00
|
|
|
|
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload))
|
|
|
|
|
{
|
2022-11-23 09:13:38 +01:00
|
|
|
|
if (controllerPayload.Controller == null)
|
|
|
|
|
throw new InvalidOperationException("Controller is not available (method is static?)");
|
|
|
|
|
|
2021-09-02 16:16:11 +02:00
|
|
|
|
var flowContext = flowPayload.FlowContext;
|
2022-11-23 09:13:38 +01:00
|
|
|
|
if (!string.IsNullOrEmpty(flowContext.FlowState.Data))
|
|
|
|
|
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, controllerPayload.Controller);
|
2019-08-13 20:30:04 +02:00
|
|
|
|
|
|
|
|
|
// Remove Continuation now because the IYieldPoint result handler will store the new state
|
|
|
|
|
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
|
2020-01-20 16:47:59 +01:00
|
|
|
|
|
2024-04-08 14:20:15 +02:00
|
|
|
|
await next().ConfigureAwait(false);
|
2019-08-13 20:30:04 +02:00
|
|
|
|
|
2021-12-10 09:56:37 +01:00
|
|
|
|
if (flowPayload.FlowIsConverging)
|
2021-12-10 11:45:09 +01:00
|
|
|
|
{
|
|
|
|
|
var flowHandler = flowContext.HandlerContext.Config.DependencyResolver.Resolve<IFlowHandler>();
|
2024-04-08 14:20:15 +02:00
|
|
|
|
await flowHandler.Converge(new FlowHandlerContext(context)).ConfigureAwait(false);
|
2021-12-10 11:45:09 +01:00
|
|
|
|
}
|
2019-08-13 20:30:04 +02:00
|
|
|
|
}
|
|
|
|
|
else
|
2024-04-08 14:20:15 +02:00
|
|
|
|
await next().ConfigureAwait(false);
|
2019-08-13 20:30:04 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2022-02-09 11:26:56 +01:00
|
|
|
|
public async ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<ValueTask> next)
|
2019-08-13 20:30:04 +02:00
|
|
|
|
{
|
2024-04-08 14:20:15 +02:00
|
|
|
|
await next().ConfigureAwait(false);
|
2019-08-13 20:30:04 +02:00
|
|
|
|
|
2021-09-02 16:16:11 +02:00
|
|
|
|
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
if (!context.TryGet<FlowMessageContextPayload>(out var flowPayload))
|
2019-08-13 20:30:04 +02:00
|
|
|
|
return;
|
|
|
|
|
|
2021-09-02 16:16:11 +02:00
|
|
|
|
var flowContext = flowPayload.FlowContext;
|
|
|
|
|
|
2022-11-23 09:13:38 +01:00
|
|
|
|
if (flowContext.ContinuationMetadata == null || flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
|
2020-06-11 16:36:55 +02:00
|
|
|
|
// Do not call when the controller method was filtered, if the same message has two methods
|
|
|
|
|
return;
|
|
|
|
|
|
2022-11-23 09:13:38 +01:00
|
|
|
|
if (flowContext.HasFlowStateAndLock)
|
2019-08-13 20:30:04 +02:00
|
|
|
|
{
|
2020-01-20 13:06:55 +01:00
|
|
|
|
if (!flowContext.IsStoredOrDeleted())
|
|
|
|
|
// The exception strategy can set the consume result to Success. Instead, check if the yield point
|
|
|
|
|
// was handled. The flow provider ensures we only end up here in case of an exception.
|
2024-04-08 14:20:15 +02:00
|
|
|
|
await flowContext.FlowStateLock.DeleteFlowState().ConfigureAwait(false);
|
2019-08-14 12:20:53 +02:00
|
|
|
|
|
2019-08-13 20:30:04 +02:00
|
|
|
|
flowContext.FlowStateLock.Dispose();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2022-11-23 09:13:38 +01:00
|
|
|
|
private static async ValueTask<FlowContext?> EnrichWithFlowContext(IMessageContext context)
|
2019-08-13 20:30:04 +02:00
|
|
|
|
{
|
2021-09-02 16:16:11 +02:00
|
|
|
|
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload))
|
|
|
|
|
return flowPayload.FlowContext;
|
2019-08-13 20:30:04 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (context.Properties.CorrelationId == null)
|
|
|
|
|
return null;
|
|
|
|
|
|
|
|
|
|
if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID))
|
|
|
|
|
return null;
|
|
|
|
|
|
|
|
|
|
var flowStore = context.Config.DependencyResolver.Resolve<IFlowStore>();
|
|
|
|
|
|
2024-04-08 14:20:15 +02:00
|
|
|
|
var flowID = await flowStore.FindFlowID(continuationID).ConfigureAwait(false);
|
2019-08-13 20:30:04 +02:00
|
|
|
|
if (!flowID.HasValue)
|
|
|
|
|
return null;
|
|
|
|
|
|
2024-04-08 14:20:15 +02:00
|
|
|
|
var flowStateLock = await flowStore.LockFlowState(flowID.Value).ConfigureAwait(false);
|
2019-08-13 20:30:04 +02:00
|
|
|
|
|
2024-04-08 14:20:15 +02:00
|
|
|
|
var flowState = await flowStateLock.GetFlowState().ConfigureAwait(false);
|
2019-08-13 20:30:04 +02:00
|
|
|
|
if (flowState == null)
|
|
|
|
|
return null;
|
|
|
|
|
|
2022-11-23 09:13:38 +01:00
|
|
|
|
var flowContext = new FlowContext(new FlowHandlerContext(context), flowState, flowStateLock)
|
2019-08-13 20:30:04 +02:00
|
|
|
|
{
|
|
|
|
|
ContinuationID = continuationID,
|
|
|
|
|
ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out var continuation) ? continuation : null
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// IDisposable items in the IMessageContext are automatically disposed
|
2021-09-02 16:16:11 +02:00
|
|
|
|
context.Store(new FlowMessageContextPayload(flowContext));
|
2019-08-13 20:30:04 +02:00
|
|
|
|
return flowContext;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|