1
0
mirror of synced 2024-11-22 01:13:49 +00:00

Actually fixed Flow context storing for parallel request

This commit is contained in:
Mark van Renswoude 2018-12-19 20:20:08 +01:00
parent ab03b38e6c
commit 43458b19f7
4 changed files with 10 additions and 12 deletions

View File

@ -87,7 +87,10 @@ namespace Tapeti.Flow.Default
private static Task HandleParallelResponse(IMessageContext context) private static Task HandleParallelResponse(IMessageContext context)
{ {
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(context, new DelegateYieldPoint((a) => Task.CompletedTask)); return flowHandler.Execute(context, new DelegateYieldPoint(async flowContext =>
{
await flowContext.Store();
}));
} }

View File

@ -25,9 +25,6 @@ namespace Tapeti.Flow.Default
await CallConvergeMethod(context, await CallConvergeMethod(context,
flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync); flowContext.ContinuationMetadata.ConvergeMethodSync);
else if (flowContext.FlowState.Continuations.Count > 0)
// This is a parallel flow waiting for other continuations, always store the state
await flowContext.Store();
} }
else else
await next(); await next();

View File

@ -68,16 +68,13 @@ namespace Test
} }
/** [Start]
* The Visualizer could've been injected through the constructor, which is public IYieldPoint TestParallelRequest()
* the recommended way. Just testing the injection middleware here.
*/
public async Task<IYieldPoint> Marco(MarcoMessage message, Visualizer myVisualizer)
{ {
Console.WriteLine(">> Marco (yielding with request)"); Console.WriteLine(">> Marco (yielding with request)");
await myVisualizer.VisualizeMarco();
StateTestGuid = Guid.NewGuid(); StateTestGuid = Guid.NewGuid();
Console.WriteLine($"Starting parallel request with StateTestGuid {StateTestGuid}");
return flowProvider.YieldWithParallelRequest() return flowProvider.YieldWithParallelRequest()
.AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage .AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
@ -113,7 +110,7 @@ namespace Test
private IYieldPoint ContinuePoloConfirmation() private IYieldPoint ContinuePoloConfirmation()
{ {
Console.WriteLine("> ConvergePoloConfirmation (ending flow)"); Console.WriteLine("> ConvergePoloConfirmation (ending flow)");
return flowProvider.EndWithResponse(new PoloMessage()); return flowProvider.End();
} }

View File

@ -61,7 +61,8 @@ namespace Test
connection.GetPublisher().Publish(new FlowEndController.PingMessage()); connection.GetPublisher().Publish(new FlowEndController.PingMessage());
container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true); //container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true);
container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.TestParallelRequest);
Thread.Sleep(1000); Thread.Sleep(1000);