diff --git a/Examples/03-FlowRequestResponse/ParallelFlowController.cs b/Examples/03-FlowRequestResponse/ParallelFlowController.cs index 2193302..04d79f5 100644 --- a/Examples/03-FlowRequestResponse/ParallelFlowController.cs +++ b/Examples/03-FlowRequestResponse/ParallelFlowController.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using ExampleLib; using Messaging.TapetiExample; using Tapeti.Annotations; @@ -16,6 +17,7 @@ namespace _03_FlowRequestResponse public string FirstQuote; public string SecondQuote; + public string ThirdQuote; public ParallelFlowController(IFlowProvider flowProvider, IExampleState exampleState) @@ -35,7 +37,7 @@ namespace _03_FlowRequestResponse Amount = 1 }, HandleFirstQuoteResponse) - .AddRequestSync( + .AddRequest( new QuoteRequestMessage { Amount = 2 @@ -54,10 +56,26 @@ namespace _03_FlowRequestResponse [Continuation] - public void HandleSecondQuoteResponse(QuoteResponseMessage message) + public async Task HandleSecondQuoteResponse(QuoteResponseMessage message, IFlowParallelRequest parallelRequest) { Console.WriteLine("[ParallelFlowController] Second quote response received"); SecondQuote = message.Quote; + + // Example of adding a request to an ongoing parallel request + await parallelRequest.AddRequestSync( + new QuoteRequestMessage + { + Amount = 3 + }, + HandleThirdQuoteResponse); + } + + + [Continuation] + public void HandleThirdQuoteResponse(QuoteResponseMessage message) + { + Console.WriteLine("[ParallelFlowController] First quote response received"); + ThirdQuote = message.Quote; } @@ -65,6 +83,16 @@ namespace _03_FlowRequestResponse { Console.WriteLine("[ParallelFlowController] First quote: " + FirstQuote); Console.WriteLine("[ParallelFlowController] Second quote: " + SecondQuote); + Console.WriteLine("[ParallelFlowController] Third quote: " + ThirdQuote); + + return flowProvider.YieldWithParallelRequest() + .YieldSync(ImmediateConvergeTest, FlowNoRequestsBehaviour.Converge); + } + + + private IYieldPoint ImmediateConvergeTest() + { + Console.WriteLine("[ParallelFlowController] Second parallel flow immediately converged"); exampleState.Done(); return flowProvider.End(); diff --git a/Examples/03-FlowRequestResponse/ReceivingMessageController.cs b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs index 46a265e..71d1ba6 100644 --- a/Examples/03-FlowRequestResponse/ReceivingMessageController.cs +++ b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs @@ -25,8 +25,7 @@ namespace _03_FlowRequestResponse break; default: - // We have to return a response. - quote = null; + quote = new string('\'', message.Amount); break; } diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 1e21847..c0f9565 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Reflection; using System.Threading.Tasks; using Tapeti.Annotations; @@ -52,6 +53,10 @@ namespace Tapeti.Flow.Default } else throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}"); + + + foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(IFlowParallelRequest))) + parameter.SetBinding(ParallelRequestParameterFactory); } @@ -103,5 +108,12 @@ namespace Tapeti.Flow.Default if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out _)) throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}"); } + + + private static object ParallelRequestParameterFactory(IMessageContext context) + { + var flowHandler = context.Config.DependencyResolver.Resolve(); + return flowHandler.GetParallelRequest(new FlowHandlerContext(context)); + } } } diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs index dd955a3..70f98f9 100644 --- a/Tapeti.Flow/Default/FlowContext.cs +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; namespace Tapeti.Flow.Default @@ -12,13 +13,13 @@ namespace Tapeti.Flow.Default public Guid ContinuationID { get; set; } public ContinuationMetadata ContinuationMetadata { get; set; } - private bool storeCalled; - private bool deleteCalled; + private int storeCalled; + private int deleteCalled; public async Task Store(bool persistent) { - storeCalled = true; + storeCalled++; if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext)); if (FlowState == null) throw new ArgumentNullException(nameof(FlowState)); @@ -30,7 +31,7 @@ namespace Tapeti.Flow.Default public async Task Delete() { - deleteCalled = true; + deleteCalled++; if (FlowStateLock != null) await FlowStateLock.DeleteFlowState(); @@ -38,13 +39,16 @@ namespace Tapeti.Flow.Default public bool IsStoredOrDeleted() { - return storeCalled || deleteCalled; + return storeCalled > 0 || deleteCalled > 0; } public void EnsureStoreOrDeleteIsCalled() { if (!IsStoredOrDeleted()) throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID); + + Debug.Assert(storeCalled <= 1, "Store called more than once!"); + Debug.Assert(deleteCalled <= 1, "Delete called more than once!"); } public void Dispose() diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index 3f34b49..e516667 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -40,19 +40,14 @@ namespace Tapeti.Flow.Default // Remove Continuation now because the IYieldPoint result handler will store the new state flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); - var converge = flowContext.FlowState.Continuations.Count == 0 && - flowContext.ContinuationMetadata.ConvergeMethodName != null; - - if (converge) - // Indicate to the FlowBindingMiddleware that the state must not to be stored - flowPayload.FlowIsConverging = true; await next(); - if (converge) - await CallConvergeMethod(context, controllerPayload, - flowContext.ContinuationMetadata.ConvergeMethodName, - flowContext.ContinuationMetadata.ConvergeMethodSync); + if (flowPayload.FlowIsConverging) + { + var flowHandler = flowContext.HandlerContext.Config.DependencyResolver.Resolve(); + await flowHandler.Converge(new FlowHandlerContext(context)); + } } else await next(); @@ -127,28 +122,5 @@ namespace Tapeti.Flow.Default context.Store(new FlowMessageContextPayload(flowContext)); return flowContext; } - - - private static async Task CallConvergeMethod(IMessageContext context, ControllerMessageContextPayload controllerPayload, string methodName, bool sync) - { - IYieldPoint yieldPoint; - - - - var method = controllerPayload.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance); - if (method == null) - throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {methodName}"); - - if (sync) - yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] {}); - else - yieldPoint = await (Task)method.Invoke(controllerPayload.Controller, new object[] { }); - - if (yieldPoint == null) - throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {methodName}"); - - var flowHandler = context.Config.DependencyResolver.Resolve(); - await flowHandler.Execute(new FlowHandlerContext(context), yieldPoint); - } } } diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 821328e..8b6d416 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -48,7 +48,7 @@ namespace Tapeti.Flow.Default /// public IFlowParallelRequestBuilder YieldWithParallelRequest() { - return new ParallelRequestBuilder(config, SendRequest); + return new ParallelRequestBuilder(config, this); } /// @@ -64,8 +64,8 @@ namespace Tapeti.Flow.Default } - private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName = null, bool convergeMethodTaskSync = false) + internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true) { if (context.FlowState == null) { @@ -89,7 +89,8 @@ namespace Tapeti.Flow.Default ReplyTo = responseHandlerInfo.ReplyToQueue }; - await context.Store(responseHandlerInfo.IsDurableQueue); + if (store) + await context.Store(responseHandlerInfo.IsDurableQueue); await publisher.Publish(message, properties, true); } @@ -122,7 +123,7 @@ namespace Tapeti.Flow.Default } - private static async Task EndFlow(FlowContext context) + internal static async Task EndFlow(FlowContext context) { await context.Delete(); @@ -243,16 +244,50 @@ namespace Tapeti.Flow.Default } + /// + public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context) + { + return context.MessageContext.TryGet(out var flowPayload) + ? new ParallelRequest(config, this, flowPayload.FlowContext) + : null; + } + + + /// + public Task Converge(IFlowHandlerContext context) + { + return Execute(context, new DelegateYieldPoint(flowContext => + Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync))); + } + + + internal async Task Converge(FlowContext flowContext, string convergeMethodName, bool convergeMethodSync) + { + IYieldPoint yieldPoint; + + if (!flowContext.HandlerContext.MessageContext.TryGet(out var controllerPayload)) + throw new ArgumentException("Context does not contain a controller payload", nameof(flowContext)); + + + var method = controllerPayload.Controller.GetType().GetMethod(convergeMethodName, BindingFlags.NonPublic | BindingFlags.Instance); + if (method == null) + throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {convergeMethodName}"); + + if (convergeMethodSync) + yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] { }); + else + yieldPoint = await(Task)method.Invoke(controllerPayload.Controller, new object[] { }); + + if (yieldPoint == null) + throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}"); + + await Execute(flowContext.HandlerContext, yieldPoint); + } + + private class ParallelRequestBuilder : IFlowParallelRequestBuilder { - public delegate Task SendRequestFunc(FlowContext context, - object message, - ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName, - bool convergeMethodSync); - - private class RequestInfo { public object Message { get; set; } @@ -261,30 +296,36 @@ namespace Tapeti.Flow.Default private readonly ITapetiConfig config; - private readonly SendRequestFunc sendRequest; + private readonly FlowProvider flowProvider; private readonly List requests = new List(); - public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest) + public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider) { this.config = config; - this.sendRequest = sendRequest; + this.flowProvider = flowProvider; } public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) { - requests.Add(new RequestInfo - { - Message = message, - ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) - }); + return InternalAddRequest(message, responseHandler); + } - return this; + + public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) + { + return InternalAddRequest(message, responseHandler); } public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) + { + return InternalAddRequest(message, responseHandler); + } + + + public IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler) { requests.Add(new RequestInfo { @@ -296,41 +337,110 @@ namespace Tapeti.Flow.Default } - public IYieldPoint Yield(Func> continuation) + public IYieldPoint Yield(Func> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) { - return BuildYieldPoint(continuation, false); + return BuildYieldPoint(continuation, false, noRequestsBehaviour); } - public IYieldPoint YieldSync(Func continuation) + public IYieldPoint YieldSync(Func continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) { - return BuildYieldPoint(continuation, true); + return BuildYieldPoint(continuation, true, noRequestsBehaviour); } - private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync) + private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) { if (requests.Count == 0) - throw new YieldPointException("At least one request must be added before yielding a parallel request"); + { + switch (noRequestsBehaviour) + { + case FlowNoRequestsBehaviour.Exception: + throw new YieldPointException("At least one request must be added before yielding a parallel request"); + + case FlowNoRequestsBehaviour.Converge: + return new DelegateYieldPoint(context => + flowProvider.Converge(context, convergeMethod.Method.Name, convergeMethodSync)); + + case FlowNoRequestsBehaviour.EndFlow: + return new DelegateYieldPoint(EndFlow); + + default: + throw new ArgumentOutOfRangeException(nameof(noRequestsBehaviour), noRequestsBehaviour, null); + } + } if (convergeMethod?.Method == null) throw new ArgumentNullException(nameof(convergeMethod)); - return new DelegateYieldPoint(context => + return new DelegateYieldPoint(async context => { if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); - return Task.WhenAll(requests.Select(requestInfo => - sendRequest(context, requestInfo.Message, + await Task.WhenAll(requests.Select(requestInfo => + flowProvider.SendRequest( + context, + requestInfo.Message, requestInfo.ResponseHandlerInfo, convergeMethod.Method.Name, - convergeMethodSync))); + convergeMethodSync, + false))); + + await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); }); } } + private class ParallelRequest : IFlowParallelRequest + { + private readonly ITapetiConfig config; + private readonly FlowProvider flowProvider; + private readonly FlowContext flowContext; + + + public ParallelRequest(ITapetiConfig config, FlowProvider flowProvider, FlowContext flowContext) + { + this.config = config; + this.flowProvider = flowProvider; + this.flowContext = flowContext; + } + + + public Task AddRequest(TRequest message, Func responseHandler) + { + return InternalAddRequest(message, responseHandler); + } + + + public Task AddRequest(TRequest message, Func responseHandler) + { + return InternalAddRequest(message, responseHandler); + } + + + public Task AddRequestSync(TRequest message, Action responseHandler) + { + return InternalAddRequest(message, responseHandler); + } + + + private Task InternalAddRequest(object message, Delegate responseHandler) + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + + return flowProvider.SendRequest( + flowContext, + message, + responseHandlerInfo, + flowContext.ContinuationMetadata.ConvergeMethodName, + flowContext.ContinuationMetadata.ConvergeMethodSync, + false); + } + } + + internal class ResponseHandlerInfo { public string MethodName { get; set; } diff --git a/Tapeti.Flow/FlowMessageContextPayload.cs b/Tapeti.Flow/FlowMessageContextPayload.cs index 6fb97ce..f0cdadb 100644 --- a/Tapeti.Flow/FlowMessageContextPayload.cs +++ b/Tapeti.Flow/FlowMessageContextPayload.cs @@ -10,13 +10,15 @@ namespace Tapeti.Flow internal class FlowMessageContextPayload : IMessageContextPayload, IDisposable { public FlowContext FlowContext { get; } - + /// /// Indicates if the current message handler is the last one to be called before a /// parallel flow is done and the convergeMethod will be called. /// Temporarily disables storing the flow state. /// - public bool FlowIsConverging { get; set; } + public bool FlowIsConverging => FlowContext != null && + FlowContext.FlowState.Continuations.Count == 0 && + FlowContext.ContinuationMetadata.ConvergeMethodName != null; public FlowMessageContextPayload(FlowContext flowContext) diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index fa5f0a6..df8a485 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -109,9 +109,46 @@ namespace Tapeti.Flow /// /// Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint); + + + /// + /// Returns the parallel request for the given message context. + /// + IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context); + + + /// + /// Calls the converge method for a parallel flow. + /// + Task Converge(IFlowHandlerContext context); } + /// + /// Determines how the Yield method of a parallel request behaves when no requests have been added. + /// Useful in cases where requests are sent conditionally. + /// + public enum FlowNoRequestsBehaviour + { + /// + /// Throw an exception. This is the default behaviour to prevent subtle bugs when not specifying the behaviour explicitly, + /// as well as for backwards compatibility. + /// + Exception, + + /// + /// Immediately call the continuation method. + /// + Converge, + + /// + /// End the flow without calling the converge method. + /// + EndFlow + } + + + /// /// Builder to publish one or more request messages and continuing the flow when the responses arrive. /// @@ -127,6 +164,13 @@ namespace Tapeti.Flow /// IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + /// + /// This overload allows the response handler access to the IFlowParallelRequest interface, which + /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. + /// + /// + IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + /// /// Publish a request message and continue the flow when the response arrives. /// Note that the response handler can not influence the flow as it does not return a YieldPoint. @@ -137,6 +181,8 @@ namespace Tapeti.Flow /// IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler); + /// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are + /// async, so you should always await them. /// /// Constructs an IYieldPoint to continue the flow when responses arrive. /// The continuation method is called when all responses have arrived. @@ -144,8 +190,9 @@ namespace Tapeti.Flow /// controller and can store state. /// Used for asynchronous continuation methods. /// - /// - IYieldPoint Yield(Func> continuation); + /// The converge continuation method to be called when all responses have been handled. + /// How the Yield method should behave when no requests have been added to the parallel request builder. + IYieldPoint Yield(Func> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception); /// /// Constructs an IYieldPoint to continue the flow when responses arrive. @@ -154,8 +201,47 @@ namespace Tapeti.Flow /// controller and can store state. /// Used for synchronous continuation methods. /// - /// - IYieldPoint YieldSync(Func continuation); + /// The converge continuation method to be called when all responses have been handled. + /// How the Yield method should behave when no requests have been added to the parallel request builder. + IYieldPoint YieldSync(Func continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception); + } + + + /// + /// Provides means of adding one or more requests to a parallel request. + /// + /// + /// Add a parameter of this type to a parallel request's response handler to gain access to it's functionality. + /// Not available in other contexts. + /// + public interface IFlowParallelRequest + { + /// + /// Publish a request message and continue the flow when the response arrives. + /// Note that the response handler can not influence the flow as it does not return a YieldPoint. + /// It can instead store state in the controller for the continuation passed to the Yield method. + /// Used for asynchronous response handlers. + /// + /// + /// + Task AddRequest(TRequest message, Func responseHandler); + + /// + /// This overload allows the response handler access to the IFlowParallelRequest interface, which + /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. + /// + /// + Task AddRequest(TRequest message, Func responseHandler); + + /// + /// Publish a request message and continue the flow when the response arrives. + /// Note that the response handler can not influence the flow as it does not return a YieldPoint. + /// It can instead store state in the controller for the continuation passed to the Yield method. + /// Used for synchronous response handlers. + /// + /// + /// + Task AddRequestSync(TRequest message, Action responseHandler); } diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index 949f4a3..4a6bfa1 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -1,7 +1,7 @@ - netcoreapp2.1 + net5.0 diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 9f68728..17ccd30 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -945,8 +945,9 @@ namespace Tapeti.Connection if (returnInfo.RefCount == 0) returnRoutingKeys.Remove(messageInfo.ReturnKey); } + else + messageInfo.CompletionSource.SetResult(0); - messageInfo.CompletionSource.SetResult(0); confirmMessages.Remove(deliveryTag); } } diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index ff1f626..b1a66ca 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -76,8 +76,8 @@ namespace Tapeti.Default public void QueueDeclare(string queueName, bool durable, bool passive) { Console.WriteLine(passive - ? $"[Tapeti] Declaring {(durable ? "durable" : "dynamic")} queue {queueName}" - : $"[Tapeti] Verifying durable queue {queueName}"); + ? $"[Tapeti] Verifying durable queue {queueName}" + : $"[Tapeti] Declaring {(durable ? "durable" : "dynamic")} queue {queueName}"); } /// diff --git a/docs/flow.rst b/docs/flow.rst index 53bcabb..85fb50b 100644 --- a/docs/flow.rst +++ b/docs/flow.rst @@ -251,11 +251,36 @@ A few things to note: #) The response handlers do not return an IYieldPoint themselves, but void (for AddRequestSync) or Task (for AddRequest). Therefore they can not influence the flow. Instead the converge method as passed to Yield or YieldSync determines how the flow continues. It is called immediately after the last response handler. #) The converge method must be private, as it is not a valid message handler in itself. -#) You must add at least one request. +#) You must add at least one request, or specify the NoRequestsBehaviour parameter for Yield/YieldSync explicitly. Note that you do not have to perform all the operations in one go. You can store the result of ``YieldWithParallelRequest`` and conditionally call ``AddRequest`` or ``AddRequestSync`` as many times as required. +Adding requests to a parallel flow +---------------------------------- +As mentioned above, you can not start a new parallel request in the same flow while the current one has not converged yet. This is enforced by the response handlers not returning an IYieldPoint. + +You can however add requests to the current parallel request while handling one of the responses. This is equivalent to adding the request to the parallel flow builder initially, and will delay calling the converge method until a response has been received to this new request as well. + +To add an additional request, include a second parameter in the continuation method of type IFlowParallelRequest. The continuation method also needs to be async to be able to await the IFlowParallelRequest.AddRequest[Sync] methods. For example: + +:: + + [Continuation] + public async Task HandleDoctorAppointmentResponse(DoctorAppointmentResponseMessage appointment, + IFlowParallelRequest parallelRequest) + { + // Now that we have the appointment details, we can query the patient data + await parallelRequest.AddRequestSync( + new PatientRequestMessage + { + PatientID = appointment.PatientID + }, + HandlePatientResponse); + } + + + Persistent state ---------------- By default flow state is only preserved while the service is running. To persist the flow state across restarts and reboots, provide an implementation of IFlowRepository to ``WithFlow()``.