From 2b4dd8e25111a33d5076c40fee1b743c38606200 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 9 Dec 2021 23:52:25 +0100 Subject: [PATCH 1/4] [ci skip] WIP: support for adding requests mid-parallel flow --- Tapeti.Flow/Default/FlowBindingMiddleware.cs | 12 ++++ Tapeti.Flow/Default/FlowProvider.cs | 67 ++++++++++++++++++++ Tapeti.Flow/IFlowProvider.cs | 61 ++++++++++++++++++ Tapeti.Tests/Tapeti.Tests.csproj | 2 +- 4 files changed, 141 insertions(+), 1 deletion(-) diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 1e21847..c0f9565 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Reflection; using System.Threading.Tasks; using Tapeti.Annotations; @@ -52,6 +53,10 @@ namespace Tapeti.Flow.Default } else throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}"); + + + foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(IFlowParallelRequest))) + parameter.SetBinding(ParallelRequestParameterFactory); } @@ -103,5 +108,12 @@ namespace Tapeti.Flow.Default if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out _)) throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}"); } + + + private static object ParallelRequestParameterFactory(IMessageContext context) + { + var flowHandler = context.Config.DependencyResolver.Resolve(); + return flowHandler.GetParallelRequest(new FlowHandlerContext(context)); + } } } diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 821328e..1c9bd7f 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -243,6 +243,15 @@ namespace Tapeti.Flow.Default } + /// + public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context) + { + if (!context.MessageContext.TryGet(out var flowPayload)) + return null; + + + } + private class ParallelRequestBuilder : IFlowParallelRequestBuilder { @@ -284,6 +293,18 @@ namespace Tapeti.Flow.Default } + public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) + { + requests.Add(new RequestInfo + { + Message = message, + ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) + }); + + return this; + } + + public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) { requests.Add(new RequestInfo @@ -296,6 +317,18 @@ namespace Tapeti.Flow.Default } + public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) + { + requests.Add(new RequestInfo + { + Message = message, + ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) + }); + + return this; + } + + public IYieldPoint Yield(Func> continuation) { return BuildYieldPoint(continuation, false); @@ -331,6 +364,40 @@ namespace Tapeti.Flow.Default } + private class ParallelRequest : IFlowParallelRequest + { + private readonly ITapetiConfig config; + private readonly SendRequestFunc sendRequest; + + + public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest) + { + this.config = config; + this.sendRequest = sendRequest; + } + + public IFlowParallelRequest AddRequest(TRequest message, Func responseHandler) + { + throw new NotImplementedException(); + } + + public IFlowParallelRequest AddRequest(TRequest message, Func responseHandler) + { + throw new NotImplementedException(); + } + + public IFlowParallelRequest AddRequestSync(TRequest message, Action responseHandler) + { + throw new NotImplementedException(); + } + + public IFlowParallelRequest AddRequestSync(TRequest message, Action responseHandler) + { + throw new NotImplementedException(); + } + } + + internal class ResponseHandlerInfo { public string MethodName { get; set; } diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index fa5f0a6..bdb7e1d 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -109,6 +109,12 @@ namespace Tapeti.Flow /// /// Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint); + + + /// + /// Returns the parallel request for the given message context. + /// + IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context); } @@ -127,6 +133,13 @@ namespace Tapeti.Flow /// IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + /// + /// This overload allows the response handler access to the IFlowParallelRequest interface, which + /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. + /// + /// + IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + /// /// Publish a request message and continue the flow when the response arrives. /// Note that the response handler can not influence the flow as it does not return a YieldPoint. @@ -137,6 +150,9 @@ namespace Tapeti.Flow /// IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler); + /// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are + /// async, so you should always await them. + /// /// Constructs an IYieldPoint to continue the flow when responses arrive. /// The continuation method is called when all responses have arrived. @@ -159,6 +175,51 @@ namespace Tapeti.Flow } + /// + /// Provides means of adding one or more requests to a parallel request. + /// + /// + /// Add a parameter of this type to a parallel request's response handler to gain access to it's functionality. + /// Not available in other contexts. + /// + public interface IFlowParallelRequest + { + /// + /// Publish a request message and continue the flow when the response arrives. + /// Note that the response handler can not influence the flow as it does not return a YieldPoint. + /// It can instead store state in the controller for the continuation passed to the Yield method. + /// Used for asynchronous response handlers. + /// + /// + /// + Task AddRequest(TRequest message, Func responseHandler); + + /// + /// This overload allows the response handler access to the IFlowParallelRequest interface, which + /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. + /// + /// + Task AddRequest(TRequest message, Func responseHandler); + + /// + /// Publish a request message and continue the flow when the response arrives. + /// Note that the response handler can not influence the flow as it does not return a YieldPoint. + /// It can instead store state in the controller for the continuation passed to the Yield method. + /// Used for synchronous response handlers. + /// + /// + /// + Task AddRequestSync(TRequest message, Action responseHandler); + + /// + /// This overload allows the response handler access to the IFlowParallelRequest interface, which + /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. + /// + /// + Task AddRequestSync(TRequest message, Action responseHandler); + } + + /// /// Defines if and how the Flow should continue. Construct using any of the IFlowProvider methods. /// diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index 949f4a3..4a6bfa1 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -1,7 +1,7 @@ - netcoreapp2.1 + net5.0 From bc00d476bdcd6c213e65a464e1eb947fc3cf02b5 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 10 Dec 2021 09:56:37 +0100 Subject: [PATCH 2/4] Implemented adding requests to parallel flow Fixed console logger outputting incorrect message when declaring queues --- .../ParallelFlowController.cs | 23 ++++++- .../ReceivingMessageController.cs | 3 +- .../Default/FlowContinuationMiddleware.cs | 8 +-- Tapeti.Flow/Default/FlowProvider.cs | 67 ++++++++----------- Tapeti.Flow/FlowMessageContextPayload.cs | 6 +- Tapeti.Flow/IFlowProvider.cs | 13 +--- Tapeti/Default/ConsoleLogger.cs | 4 +- 7 files changed, 60 insertions(+), 64 deletions(-) diff --git a/Examples/03-FlowRequestResponse/ParallelFlowController.cs b/Examples/03-FlowRequestResponse/ParallelFlowController.cs index 2193302..1d11a12 100644 --- a/Examples/03-FlowRequestResponse/ParallelFlowController.cs +++ b/Examples/03-FlowRequestResponse/ParallelFlowController.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using ExampleLib; using Messaging.TapetiExample; using Tapeti.Annotations; @@ -16,6 +17,7 @@ namespace _03_FlowRequestResponse public string FirstQuote; public string SecondQuote; + public string ThirdQuote; public ParallelFlowController(IFlowProvider flowProvider, IExampleState exampleState) @@ -35,7 +37,7 @@ namespace _03_FlowRequestResponse Amount = 1 }, HandleFirstQuoteResponse) - .AddRequestSync( + .AddRequest( new QuoteRequestMessage { Amount = 2 @@ -54,10 +56,26 @@ namespace _03_FlowRequestResponse [Continuation] - public void HandleSecondQuoteResponse(QuoteResponseMessage message) + public async Task HandleSecondQuoteResponse(QuoteResponseMessage message, IFlowParallelRequest parallelRequest) { Console.WriteLine("[ParallelFlowController] Second quote response received"); SecondQuote = message.Quote; + + // Example of adding a request to an ongoing parallel request + await parallelRequest.AddRequestSync( + new QuoteRequestMessage + { + Amount = 3 + }, + HandleThirdQuoteResponse); + } + + + [Continuation] + public void HandleThirdQuoteResponse(QuoteResponseMessage message) + { + Console.WriteLine("[ParallelFlowController] First quote response received"); + ThirdQuote = message.Quote; } @@ -65,6 +83,7 @@ namespace _03_FlowRequestResponse { Console.WriteLine("[ParallelFlowController] First quote: " + FirstQuote); Console.WriteLine("[ParallelFlowController] Second quote: " + SecondQuote); + Console.WriteLine("[ParallelFlowController] Third quote: " + ThirdQuote); exampleState.Done(); return flowProvider.End(); diff --git a/Examples/03-FlowRequestResponse/ReceivingMessageController.cs b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs index 46a265e..71d1ba6 100644 --- a/Examples/03-FlowRequestResponse/ReceivingMessageController.cs +++ b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs @@ -25,8 +25,7 @@ namespace _03_FlowRequestResponse break; default: - // We have to return a response. - quote = null; + quote = new string('\'', message.Amount); break; } diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index 3f34b49..7ceca70 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -40,16 +40,10 @@ namespace Tapeti.Flow.Default // Remove Continuation now because the IYieldPoint result handler will store the new state flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); - var converge = flowContext.FlowState.Continuations.Count == 0 && - flowContext.ContinuationMetadata.ConvergeMethodName != null; - - if (converge) - // Indicate to the FlowBindingMiddleware that the state must not to be stored - flowPayload.FlowIsConverging = true; await next(); - if (converge) + if (flowPayload.FlowIsConverging) await CallConvergeMethod(context, controllerPayload, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync); diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 1c9bd7f..5f08f7c 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -249,19 +249,19 @@ namespace Tapeti.Flow.Default if (!context.MessageContext.TryGet(out var flowPayload)) return null; - + return new ParallelRequest(config, SendRequest, flowPayload.FlowContext); } + private delegate Task SendRequestFunc(FlowContext context, + object message, + ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName, + bool convergeMethodSync); + + private class ParallelRequestBuilder : IFlowParallelRequestBuilder { - public delegate Task SendRequestFunc(FlowContext context, - object message, - ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName, - bool convergeMethodSync); - - private class RequestInfo { public object Message { get; set; } @@ -283,41 +283,23 @@ namespace Tapeti.Flow.Default public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) { - requests.Add(new RequestInfo - { - Message = message, - ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) - }); - - return this; + return InternalAddRequest(message, responseHandler); } public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) { - requests.Add(new RequestInfo - { - Message = message, - ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) - }); - - return this; + return InternalAddRequest(message, responseHandler); } public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) { - requests.Add(new RequestInfo - { - Message = message, - ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) - }); - - return this; + return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) + public IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler) { requests.Add(new RequestInfo { @@ -368,32 +350,39 @@ namespace Tapeti.Flow.Default { private readonly ITapetiConfig config; private readonly SendRequestFunc sendRequest; + private readonly FlowContext flowContext; - public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest) + public ParallelRequest(ITapetiConfig config, SendRequestFunc sendRequest, FlowContext flowContext) { this.config = config; this.sendRequest = sendRequest; + this.flowContext = flowContext; } - public IFlowParallelRequest AddRequest(TRequest message, Func responseHandler) + + public Task AddRequest(TRequest message, Func responseHandler) { - throw new NotImplementedException(); + return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequest AddRequest(TRequest message, Func responseHandler) + + public Task AddRequest(TRequest message, Func responseHandler) { - throw new NotImplementedException(); + return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequest AddRequestSync(TRequest message, Action responseHandler) + + public Task AddRequestSync(TRequest message, Action responseHandler) { - throw new NotImplementedException(); + return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequest AddRequestSync(TRequest message, Action responseHandler) + + private Task InternalAddRequest(object message, Delegate responseHandler) { - throw new NotImplementedException(); + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return sendRequest(flowContext, message, responseHandlerInfo, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync); } } diff --git a/Tapeti.Flow/FlowMessageContextPayload.cs b/Tapeti.Flow/FlowMessageContextPayload.cs index 6fb97ce..f0cdadb 100644 --- a/Tapeti.Flow/FlowMessageContextPayload.cs +++ b/Tapeti.Flow/FlowMessageContextPayload.cs @@ -10,13 +10,15 @@ namespace Tapeti.Flow internal class FlowMessageContextPayload : IMessageContextPayload, IDisposable { public FlowContext FlowContext { get; } - + /// /// Indicates if the current message handler is the last one to be called before a /// parallel flow is done and the convergeMethod will be called. /// Temporarily disables storing the flow state. /// - public bool FlowIsConverging { get; set; } + public bool FlowIsConverging => FlowContext != null && + FlowContext.FlowState.Continuations.Count == 0 && + FlowContext.ContinuationMetadata.ConvergeMethodName != null; public FlowMessageContextPayload(FlowContext flowContext) diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index bdb7e1d..becb406 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -192,14 +192,14 @@ namespace Tapeti.Flow /// /// /// - Task AddRequest(TRequest message, Func responseHandler); + Task AddRequest(TRequest message, Func responseHandler); /// /// This overload allows the response handler access to the IFlowParallelRequest interface, which /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. /// /// - Task AddRequest(TRequest message, Func responseHandler); + Task AddRequest(TRequest message, Func responseHandler); /// /// Publish a request message and continue the flow when the response arrives. @@ -209,14 +209,7 @@ namespace Tapeti.Flow /// /// /// - Task AddRequestSync(TRequest message, Action responseHandler); - - /// - /// This overload allows the response handler access to the IFlowParallelRequest interface, which - /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. - /// - /// - Task AddRequestSync(TRequest message, Action responseHandler); + Task AddRequestSync(TRequest message, Action responseHandler); } diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index ff1f626..b1a66ca 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -76,8 +76,8 @@ namespace Tapeti.Default public void QueueDeclare(string queueName, bool durable, bool passive) { Console.WriteLine(passive - ? $"[Tapeti] Declaring {(durable ? "durable" : "dynamic")} queue {queueName}" - : $"[Tapeti] Verifying durable queue {queueName}"); + ? $"[Tapeti] Verifying durable queue {queueName}" + : $"[Tapeti] Declaring {(durable ? "durable" : "dynamic")} queue {queueName}"); } /// From 58d19080476f85bca633a0192ce1be816018a785 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 10 Dec 2021 11:45:09 +0100 Subject: [PATCH 3/4] Added NoRequestsBehaviour to ParallelFlow.Yield --- .../ParallelFlowController.cs | 9 ++ Tapeti.Flow/Default/FlowContext.cs | 14 ++- .../Default/FlowContinuationMiddleware.cs | 30 +---- Tapeti.Flow/Default/FlowProvider.cs | 116 +++++++++++++----- Tapeti.Flow/IFlowProvider.cs | 42 ++++++- Tapeti/Connection/TapetiClient.cs | 3 +- 6 files changed, 146 insertions(+), 68 deletions(-) diff --git a/Examples/03-FlowRequestResponse/ParallelFlowController.cs b/Examples/03-FlowRequestResponse/ParallelFlowController.cs index 1d11a12..04d79f5 100644 --- a/Examples/03-FlowRequestResponse/ParallelFlowController.cs +++ b/Examples/03-FlowRequestResponse/ParallelFlowController.cs @@ -85,6 +85,15 @@ namespace _03_FlowRequestResponse Console.WriteLine("[ParallelFlowController] Second quote: " + SecondQuote); Console.WriteLine("[ParallelFlowController] Third quote: " + ThirdQuote); + return flowProvider.YieldWithParallelRequest() + .YieldSync(ImmediateConvergeTest, FlowNoRequestsBehaviour.Converge); + } + + + private IYieldPoint ImmediateConvergeTest() + { + Console.WriteLine("[ParallelFlowController] Second parallel flow immediately converged"); + exampleState.Done(); return flowProvider.End(); } diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs index dd955a3..70f98f9 100644 --- a/Tapeti.Flow/Default/FlowContext.cs +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; namespace Tapeti.Flow.Default @@ -12,13 +13,13 @@ namespace Tapeti.Flow.Default public Guid ContinuationID { get; set; } public ContinuationMetadata ContinuationMetadata { get; set; } - private bool storeCalled; - private bool deleteCalled; + private int storeCalled; + private int deleteCalled; public async Task Store(bool persistent) { - storeCalled = true; + storeCalled++; if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext)); if (FlowState == null) throw new ArgumentNullException(nameof(FlowState)); @@ -30,7 +31,7 @@ namespace Tapeti.Flow.Default public async Task Delete() { - deleteCalled = true; + deleteCalled++; if (FlowStateLock != null) await FlowStateLock.DeleteFlowState(); @@ -38,13 +39,16 @@ namespace Tapeti.Flow.Default public bool IsStoredOrDeleted() { - return storeCalled || deleteCalled; + return storeCalled > 0 || deleteCalled > 0; } public void EnsureStoreOrDeleteIsCalled() { if (!IsStoredOrDeleted()) throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID); + + Debug.Assert(storeCalled <= 1, "Store called more than once!"); + Debug.Assert(deleteCalled <= 1, "Delete called more than once!"); } public void Dispose() diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index 7ceca70..e516667 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -44,9 +44,10 @@ namespace Tapeti.Flow.Default await next(); if (flowPayload.FlowIsConverging) - await CallConvergeMethod(context, controllerPayload, - flowContext.ContinuationMetadata.ConvergeMethodName, - flowContext.ContinuationMetadata.ConvergeMethodSync); + { + var flowHandler = flowContext.HandlerContext.Config.DependencyResolver.Resolve(); + await flowHandler.Converge(new FlowHandlerContext(context)); + } } else await next(); @@ -121,28 +122,5 @@ namespace Tapeti.Flow.Default context.Store(new FlowMessageContextPayload(flowContext)); return flowContext; } - - - private static async Task CallConvergeMethod(IMessageContext context, ControllerMessageContextPayload controllerPayload, string methodName, bool sync) - { - IYieldPoint yieldPoint; - - - - var method = controllerPayload.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance); - if (method == null) - throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {methodName}"); - - if (sync) - yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] {}); - else - yieldPoint = await (Task)method.Invoke(controllerPayload.Controller, new object[] { }); - - if (yieldPoint == null) - throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {methodName}"); - - var flowHandler = context.Config.DependencyResolver.Resolve(); - await flowHandler.Execute(new FlowHandlerContext(context), yieldPoint); - } } } diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 5f08f7c..8b6d416 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -48,7 +48,7 @@ namespace Tapeti.Flow.Default /// public IFlowParallelRequestBuilder YieldWithParallelRequest() { - return new ParallelRequestBuilder(config, SendRequest); + return new ParallelRequestBuilder(config, this); } /// @@ -64,8 +64,8 @@ namespace Tapeti.Flow.Default } - private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName = null, bool convergeMethodTaskSync = false) + internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true) { if (context.FlowState == null) { @@ -89,7 +89,8 @@ namespace Tapeti.Flow.Default ReplyTo = responseHandlerInfo.ReplyToQueue }; - await context.Store(responseHandlerInfo.IsDurableQueue); + if (store) + await context.Store(responseHandlerInfo.IsDurableQueue); await publisher.Publish(message, properties, true); } @@ -122,7 +123,7 @@ namespace Tapeti.Flow.Default } - private static async Task EndFlow(FlowContext context) + internal static async Task EndFlow(FlowContext context) { await context.Delete(); @@ -246,18 +247,43 @@ namespace Tapeti.Flow.Default /// public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context) { - if (!context.MessageContext.TryGet(out var flowPayload)) - return null; - - return new ParallelRequest(config, SendRequest, flowPayload.FlowContext); + return context.MessageContext.TryGet(out var flowPayload) + ? new ParallelRequest(config, this, flowPayload.FlowContext) + : null; } - private delegate Task SendRequestFunc(FlowContext context, - object message, - ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName, - bool convergeMethodSync); + /// + public Task Converge(IFlowHandlerContext context) + { + return Execute(context, new DelegateYieldPoint(flowContext => + Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync))); + } + + + internal async Task Converge(FlowContext flowContext, string convergeMethodName, bool convergeMethodSync) + { + IYieldPoint yieldPoint; + + if (!flowContext.HandlerContext.MessageContext.TryGet(out var controllerPayload)) + throw new ArgumentException("Context does not contain a controller payload", nameof(flowContext)); + + + var method = controllerPayload.Controller.GetType().GetMethod(convergeMethodName, BindingFlags.NonPublic | BindingFlags.Instance); + if (method == null) + throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {convergeMethodName}"); + + if (convergeMethodSync) + yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] { }); + else + yieldPoint = await(Task)method.Invoke(controllerPayload.Controller, new object[] { }); + + if (yieldPoint == null) + throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}"); + + await Execute(flowContext.HandlerContext, yieldPoint); + } + private class ParallelRequestBuilder : IFlowParallelRequestBuilder @@ -270,14 +296,14 @@ namespace Tapeti.Flow.Default private readonly ITapetiConfig config; - private readonly SendRequestFunc sendRequest; + private readonly FlowProvider flowProvider; private readonly List requests = new List(); - public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest) + public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider) { this.config = config; - this.sendRequest = sendRequest; + this.flowProvider = flowProvider; } @@ -311,36 +337,57 @@ namespace Tapeti.Flow.Default } - public IYieldPoint Yield(Func> continuation) + public IYieldPoint Yield(Func> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) { - return BuildYieldPoint(continuation, false); + return BuildYieldPoint(continuation, false, noRequestsBehaviour); } - public IYieldPoint YieldSync(Func continuation) + public IYieldPoint YieldSync(Func continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) { - return BuildYieldPoint(continuation, true); + return BuildYieldPoint(continuation, true, noRequestsBehaviour); } - private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync) + private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) { if (requests.Count == 0) - throw new YieldPointException("At least one request must be added before yielding a parallel request"); + { + switch (noRequestsBehaviour) + { + case FlowNoRequestsBehaviour.Exception: + throw new YieldPointException("At least one request must be added before yielding a parallel request"); + + case FlowNoRequestsBehaviour.Converge: + return new DelegateYieldPoint(context => + flowProvider.Converge(context, convergeMethod.Method.Name, convergeMethodSync)); + + case FlowNoRequestsBehaviour.EndFlow: + return new DelegateYieldPoint(EndFlow); + + default: + throw new ArgumentOutOfRangeException(nameof(noRequestsBehaviour), noRequestsBehaviour, null); + } + } if (convergeMethod?.Method == null) throw new ArgumentNullException(nameof(convergeMethod)); - return new DelegateYieldPoint(context => + return new DelegateYieldPoint(async context => { if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); - return Task.WhenAll(requests.Select(requestInfo => - sendRequest(context, requestInfo.Message, + await Task.WhenAll(requests.Select(requestInfo => + flowProvider.SendRequest( + context, + requestInfo.Message, requestInfo.ResponseHandlerInfo, convergeMethod.Method.Name, - convergeMethodSync))); + convergeMethodSync, + false))); + + await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); }); } } @@ -349,14 +396,14 @@ namespace Tapeti.Flow.Default private class ParallelRequest : IFlowParallelRequest { private readonly ITapetiConfig config; - private readonly SendRequestFunc sendRequest; + private readonly FlowProvider flowProvider; private readonly FlowContext flowContext; - public ParallelRequest(ITapetiConfig config, SendRequestFunc sendRequest, FlowContext flowContext) + public ParallelRequest(ITapetiConfig config, FlowProvider flowProvider, FlowContext flowContext) { this.config = config; - this.sendRequest = sendRequest; + this.flowProvider = flowProvider; this.flowContext = flowContext; } @@ -382,7 +429,14 @@ namespace Tapeti.Flow.Default private Task InternalAddRequest(object message, Delegate responseHandler) { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); - return sendRequest(flowContext, message, responseHandlerInfo, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync); + + return flowProvider.SendRequest( + flowContext, + message, + responseHandlerInfo, + flowContext.ContinuationMetadata.ConvergeMethodName, + flowContext.ContinuationMetadata.ConvergeMethodSync, + false); } } diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index becb406..df8a485 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -115,9 +115,40 @@ namespace Tapeti.Flow /// Returns the parallel request for the given message context. /// IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context); + + + /// + /// Calls the converge method for a parallel flow. + /// + Task Converge(IFlowHandlerContext context); } + /// + /// Determines how the Yield method of a parallel request behaves when no requests have been added. + /// Useful in cases where requests are sent conditionally. + /// + public enum FlowNoRequestsBehaviour + { + /// + /// Throw an exception. This is the default behaviour to prevent subtle bugs when not specifying the behaviour explicitly, + /// as well as for backwards compatibility. + /// + Exception, + + /// + /// Immediately call the continuation method. + /// + Converge, + + /// + /// End the flow without calling the converge method. + /// + EndFlow + } + + + /// /// Builder to publish one or more request messages and continuing the flow when the responses arrive. /// @@ -152,7 +183,6 @@ namespace Tapeti.Flow /// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are /// async, so you should always await them. - /// /// Constructs an IYieldPoint to continue the flow when responses arrive. /// The continuation method is called when all responses have arrived. @@ -160,8 +190,9 @@ namespace Tapeti.Flow /// controller and can store state. /// Used for asynchronous continuation methods. /// - /// - IYieldPoint Yield(Func> continuation); + /// The converge continuation method to be called when all responses have been handled. + /// How the Yield method should behave when no requests have been added to the parallel request builder. + IYieldPoint Yield(Func> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception); /// /// Constructs an IYieldPoint to continue the flow when responses arrive. @@ -170,8 +201,9 @@ namespace Tapeti.Flow /// controller and can store state. /// Used for synchronous continuation methods. /// - /// - IYieldPoint YieldSync(Func continuation); + /// The converge continuation method to be called when all responses have been handled. + /// How the Yield method should behave when no requests have been added to the parallel request builder. + IYieldPoint YieldSync(Func continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception); } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 9f68728..17ccd30 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -945,8 +945,9 @@ namespace Tapeti.Connection if (returnInfo.RefCount == 0) returnRoutingKeys.Remove(messageInfo.ReturnKey); } + else + messageInfo.CompletionSource.SetResult(0); - messageInfo.CompletionSource.SetResult(0); confirmMessages.Remove(deliveryTag); } } From cf244ba3ff0d177b556c457a653b9b725490fd5c Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 10 Dec 2021 12:53:18 +0100 Subject: [PATCH 4/4] Added documentation for IFlowParallelRequest --- docs/flow.rst | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/docs/flow.rst b/docs/flow.rst index 53bcabb..85fb50b 100644 --- a/docs/flow.rst +++ b/docs/flow.rst @@ -251,11 +251,36 @@ A few things to note: #) The response handlers do not return an IYieldPoint themselves, but void (for AddRequestSync) or Task (for AddRequest). Therefore they can not influence the flow. Instead the converge method as passed to Yield or YieldSync determines how the flow continues. It is called immediately after the last response handler. #) The converge method must be private, as it is not a valid message handler in itself. -#) You must add at least one request. +#) You must add at least one request, or specify the NoRequestsBehaviour parameter for Yield/YieldSync explicitly. Note that you do not have to perform all the operations in one go. You can store the result of ``YieldWithParallelRequest`` and conditionally call ``AddRequest`` or ``AddRequestSync`` as many times as required. +Adding requests to a parallel flow +---------------------------------- +As mentioned above, you can not start a new parallel request in the same flow while the current one has not converged yet. This is enforced by the response handlers not returning an IYieldPoint. + +You can however add requests to the current parallel request while handling one of the responses. This is equivalent to adding the request to the parallel flow builder initially, and will delay calling the converge method until a response has been received to this new request as well. + +To add an additional request, include a second parameter in the continuation method of type IFlowParallelRequest. The continuation method also needs to be async to be able to await the IFlowParallelRequest.AddRequest[Sync] methods. For example: + +:: + + [Continuation] + public async Task HandleDoctorAppointmentResponse(DoctorAppointmentResponseMessage appointment, + IFlowParallelRequest parallelRequest) + { + // Now that we have the appointment details, we can query the patient data + await parallelRequest.AddRequestSync( + new PatientRequestMessage + { + PatientID = appointment.PatientID + }, + HandlePatientResponse); + } + + + Persistent state ---------------- By default flow state is only preserved while the service is running. To persist the flow state across restarts and reboots, provide an implementation of IFlowRepository to ``WithFlow()``.