diff --git a/Examples/03-FlowRequestResponse/ParallelFlowController.cs b/Examples/03-FlowRequestResponse/ParallelFlowController.cs index 2193302..1d11a12 100644 --- a/Examples/03-FlowRequestResponse/ParallelFlowController.cs +++ b/Examples/03-FlowRequestResponse/ParallelFlowController.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using ExampleLib; using Messaging.TapetiExample; using Tapeti.Annotations; @@ -16,6 +17,7 @@ namespace _03_FlowRequestResponse public string FirstQuote; public string SecondQuote; + public string ThirdQuote; public ParallelFlowController(IFlowProvider flowProvider, IExampleState exampleState) @@ -35,7 +37,7 @@ namespace _03_FlowRequestResponse Amount = 1 }, HandleFirstQuoteResponse) - .AddRequestSync( + .AddRequest( new QuoteRequestMessage { Amount = 2 @@ -54,10 +56,26 @@ namespace _03_FlowRequestResponse [Continuation] - public void HandleSecondQuoteResponse(QuoteResponseMessage message) + public async Task HandleSecondQuoteResponse(QuoteResponseMessage message, IFlowParallelRequest parallelRequest) { Console.WriteLine("[ParallelFlowController] Second quote response received"); SecondQuote = message.Quote; + + // Example of adding a request to an ongoing parallel request + await parallelRequest.AddRequestSync( + new QuoteRequestMessage + { + Amount = 3 + }, + HandleThirdQuoteResponse); + } + + + [Continuation] + public void HandleThirdQuoteResponse(QuoteResponseMessage message) + { + Console.WriteLine("[ParallelFlowController] First quote response received"); + ThirdQuote = message.Quote; } @@ -65,6 +83,7 @@ namespace _03_FlowRequestResponse { Console.WriteLine("[ParallelFlowController] First quote: " + FirstQuote); Console.WriteLine("[ParallelFlowController] Second quote: " + SecondQuote); + Console.WriteLine("[ParallelFlowController] Third quote: " + ThirdQuote); exampleState.Done(); return flowProvider.End(); diff --git a/Examples/03-FlowRequestResponse/ReceivingMessageController.cs b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs index 46a265e..71d1ba6 100644 --- a/Examples/03-FlowRequestResponse/ReceivingMessageController.cs +++ b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs @@ -25,8 +25,7 @@ namespace _03_FlowRequestResponse break; default: - // We have to return a response. - quote = null; + quote = new string('\'', message.Amount); break; } diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index 3f34b49..7ceca70 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -40,16 +40,10 @@ namespace Tapeti.Flow.Default // Remove Continuation now because the IYieldPoint result handler will store the new state flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); - var converge = flowContext.FlowState.Continuations.Count == 0 && - flowContext.ContinuationMetadata.ConvergeMethodName != null; - - if (converge) - // Indicate to the FlowBindingMiddleware that the state must not to be stored - flowPayload.FlowIsConverging = true; await next(); - if (converge) + if (flowPayload.FlowIsConverging) await CallConvergeMethod(context, controllerPayload, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync); diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 1c9bd7f..5f08f7c 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -249,19 +249,19 @@ namespace Tapeti.Flow.Default if (!context.MessageContext.TryGet(out var flowPayload)) return null; - + return new ParallelRequest(config, SendRequest, flowPayload.FlowContext); } + private delegate Task SendRequestFunc(FlowContext context, + object message, + ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName, + bool convergeMethodSync); + + private class ParallelRequestBuilder : IFlowParallelRequestBuilder { - public delegate Task SendRequestFunc(FlowContext context, - object message, - ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName, - bool convergeMethodSync); - - private class RequestInfo { public object Message { get; set; } @@ -283,41 +283,23 @@ namespace Tapeti.Flow.Default public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) { - requests.Add(new RequestInfo - { - Message = message, - ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) - }); - - return this; + return InternalAddRequest(message, responseHandler); } public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) { - requests.Add(new RequestInfo - { - Message = message, - ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) - }); - - return this; + return InternalAddRequest(message, responseHandler); } public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) { - requests.Add(new RequestInfo - { - Message = message, - ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) - }); - - return this; + return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) + public IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler) { requests.Add(new RequestInfo { @@ -368,32 +350,39 @@ namespace Tapeti.Flow.Default { private readonly ITapetiConfig config; private readonly SendRequestFunc sendRequest; + private readonly FlowContext flowContext; - public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest) + public ParallelRequest(ITapetiConfig config, SendRequestFunc sendRequest, FlowContext flowContext) { this.config = config; this.sendRequest = sendRequest; + this.flowContext = flowContext; } - public IFlowParallelRequest AddRequest(TRequest message, Func responseHandler) + + public Task AddRequest(TRequest message, Func responseHandler) { - throw new NotImplementedException(); + return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequest AddRequest(TRequest message, Func responseHandler) + + public Task AddRequest(TRequest message, Func responseHandler) { - throw new NotImplementedException(); + return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequest AddRequestSync(TRequest message, Action responseHandler) + + public Task AddRequestSync(TRequest message, Action responseHandler) { - throw new NotImplementedException(); + return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequest AddRequestSync(TRequest message, Action responseHandler) + + private Task InternalAddRequest(object message, Delegate responseHandler) { - throw new NotImplementedException(); + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return sendRequest(flowContext, message, responseHandlerInfo, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync); } } diff --git a/Tapeti.Flow/FlowMessageContextPayload.cs b/Tapeti.Flow/FlowMessageContextPayload.cs index 6fb97ce..f0cdadb 100644 --- a/Tapeti.Flow/FlowMessageContextPayload.cs +++ b/Tapeti.Flow/FlowMessageContextPayload.cs @@ -10,13 +10,15 @@ namespace Tapeti.Flow internal class FlowMessageContextPayload : IMessageContextPayload, IDisposable { public FlowContext FlowContext { get; } - + /// /// Indicates if the current message handler is the last one to be called before a /// parallel flow is done and the convergeMethod will be called. /// Temporarily disables storing the flow state. /// - public bool FlowIsConverging { get; set; } + public bool FlowIsConverging => FlowContext != null && + FlowContext.FlowState.Continuations.Count == 0 && + FlowContext.ContinuationMetadata.ConvergeMethodName != null; public FlowMessageContextPayload(FlowContext flowContext) diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index bdb7e1d..becb406 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -192,14 +192,14 @@ namespace Tapeti.Flow /// /// /// - Task AddRequest(TRequest message, Func responseHandler); + Task AddRequest(TRequest message, Func responseHandler); /// /// This overload allows the response handler access to the IFlowParallelRequest interface, which /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. /// /// - Task AddRequest(TRequest message, Func responseHandler); + Task AddRequest(TRequest message, Func responseHandler); /// /// Publish a request message and continue the flow when the response arrives. @@ -209,14 +209,7 @@ namespace Tapeti.Flow /// /// /// - Task AddRequestSync(TRequest message, Action responseHandler); - - /// - /// This overload allows the response handler access to the IFlowParallelRequest interface, which - /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. - /// - /// - Task AddRequestSync(TRequest message, Action responseHandler); + Task AddRequestSync(TRequest message, Action responseHandler); } diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index ff1f626..b1a66ca 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -76,8 +76,8 @@ namespace Tapeti.Default public void QueueDeclare(string queueName, bool durable, bool passive) { Console.WriteLine(passive - ? $"[Tapeti] Declaring {(durable ? "durable" : "dynamic")} queue {queueName}" - : $"[Tapeti] Verifying durable queue {queueName}"); + ? $"[Tapeti] Verifying durable queue {queueName}" + : $"[Tapeti] Declaring {(durable ? "durable" : "dynamic")} queue {queueName}"); } ///