From 58d19080476f85bca633a0192ce1be816018a785 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 10 Dec 2021 11:45:09 +0100 Subject: [PATCH] Added NoRequestsBehaviour to ParallelFlow.Yield --- .../ParallelFlowController.cs | 9 ++ Tapeti.Flow/Default/FlowContext.cs | 14 ++- .../Default/FlowContinuationMiddleware.cs | 30 +---- Tapeti.Flow/Default/FlowProvider.cs | 116 +++++++++++++----- Tapeti.Flow/IFlowProvider.cs | 42 ++++++- Tapeti/Connection/TapetiClient.cs | 3 +- 6 files changed, 146 insertions(+), 68 deletions(-) diff --git a/Examples/03-FlowRequestResponse/ParallelFlowController.cs b/Examples/03-FlowRequestResponse/ParallelFlowController.cs index 1d11a12..04d79f5 100644 --- a/Examples/03-FlowRequestResponse/ParallelFlowController.cs +++ b/Examples/03-FlowRequestResponse/ParallelFlowController.cs @@ -85,6 +85,15 @@ namespace _03_FlowRequestResponse Console.WriteLine("[ParallelFlowController] Second quote: " + SecondQuote); Console.WriteLine("[ParallelFlowController] Third quote: " + ThirdQuote); + return flowProvider.YieldWithParallelRequest() + .YieldSync(ImmediateConvergeTest, FlowNoRequestsBehaviour.Converge); + } + + + private IYieldPoint ImmediateConvergeTest() + { + Console.WriteLine("[ParallelFlowController] Second parallel flow immediately converged"); + exampleState.Done(); return flowProvider.End(); } diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs index dd955a3..70f98f9 100644 --- a/Tapeti.Flow/Default/FlowContext.cs +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; namespace Tapeti.Flow.Default @@ -12,13 +13,13 @@ namespace Tapeti.Flow.Default public Guid ContinuationID { get; set; } public ContinuationMetadata ContinuationMetadata { get; set; } - private bool storeCalled; - private bool deleteCalled; + private int storeCalled; + private int deleteCalled; public async Task Store(bool persistent) { - storeCalled = true; + storeCalled++; if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext)); if (FlowState == null) throw new ArgumentNullException(nameof(FlowState)); @@ -30,7 +31,7 @@ namespace Tapeti.Flow.Default public async Task Delete() { - deleteCalled = true; + deleteCalled++; if (FlowStateLock != null) await FlowStateLock.DeleteFlowState(); @@ -38,13 +39,16 @@ namespace Tapeti.Flow.Default public bool IsStoredOrDeleted() { - return storeCalled || deleteCalled; + return storeCalled > 0 || deleteCalled > 0; } public void EnsureStoreOrDeleteIsCalled() { if (!IsStoredOrDeleted()) throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID); + + Debug.Assert(storeCalled <= 1, "Store called more than once!"); + Debug.Assert(deleteCalled <= 1, "Delete called more than once!"); } public void Dispose() diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index 7ceca70..e516667 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -44,9 +44,10 @@ namespace Tapeti.Flow.Default await next(); if (flowPayload.FlowIsConverging) - await CallConvergeMethod(context, controllerPayload, - flowContext.ContinuationMetadata.ConvergeMethodName, - flowContext.ContinuationMetadata.ConvergeMethodSync); + { + var flowHandler = flowContext.HandlerContext.Config.DependencyResolver.Resolve(); + await flowHandler.Converge(new FlowHandlerContext(context)); + } } else await next(); @@ -121,28 +122,5 @@ namespace Tapeti.Flow.Default context.Store(new FlowMessageContextPayload(flowContext)); return flowContext; } - - - private static async Task CallConvergeMethod(IMessageContext context, ControllerMessageContextPayload controllerPayload, string methodName, bool sync) - { - IYieldPoint yieldPoint; - - - - var method = controllerPayload.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance); - if (method == null) - throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {methodName}"); - - if (sync) - yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] {}); - else - yieldPoint = await (Task)method.Invoke(controllerPayload.Controller, new object[] { }); - - if (yieldPoint == null) - throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {methodName}"); - - var flowHandler = context.Config.DependencyResolver.Resolve(); - await flowHandler.Execute(new FlowHandlerContext(context), yieldPoint); - } } } diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 5f08f7c..8b6d416 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -48,7 +48,7 @@ namespace Tapeti.Flow.Default /// public IFlowParallelRequestBuilder YieldWithParallelRequest() { - return new ParallelRequestBuilder(config, SendRequest); + return new ParallelRequestBuilder(config, this); } /// @@ -64,8 +64,8 @@ namespace Tapeti.Flow.Default } - private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName = null, bool convergeMethodTaskSync = false) + internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true) { if (context.FlowState == null) { @@ -89,7 +89,8 @@ namespace Tapeti.Flow.Default ReplyTo = responseHandlerInfo.ReplyToQueue }; - await context.Store(responseHandlerInfo.IsDurableQueue); + if (store) + await context.Store(responseHandlerInfo.IsDurableQueue); await publisher.Publish(message, properties, true); } @@ -122,7 +123,7 @@ namespace Tapeti.Flow.Default } - private static async Task EndFlow(FlowContext context) + internal static async Task EndFlow(FlowContext context) { await context.Delete(); @@ -246,18 +247,43 @@ namespace Tapeti.Flow.Default /// public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context) { - if (!context.MessageContext.TryGet(out var flowPayload)) - return null; - - return new ParallelRequest(config, SendRequest, flowPayload.FlowContext); + return context.MessageContext.TryGet(out var flowPayload) + ? new ParallelRequest(config, this, flowPayload.FlowContext) + : null; } - private delegate Task SendRequestFunc(FlowContext context, - object message, - ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName, - bool convergeMethodSync); + /// + public Task Converge(IFlowHandlerContext context) + { + return Execute(context, new DelegateYieldPoint(flowContext => + Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync))); + } + + + internal async Task Converge(FlowContext flowContext, string convergeMethodName, bool convergeMethodSync) + { + IYieldPoint yieldPoint; + + if (!flowContext.HandlerContext.MessageContext.TryGet(out var controllerPayload)) + throw new ArgumentException("Context does not contain a controller payload", nameof(flowContext)); + + + var method = controllerPayload.Controller.GetType().GetMethod(convergeMethodName, BindingFlags.NonPublic | BindingFlags.Instance); + if (method == null) + throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {convergeMethodName}"); + + if (convergeMethodSync) + yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] { }); + else + yieldPoint = await(Task)method.Invoke(controllerPayload.Controller, new object[] { }); + + if (yieldPoint == null) + throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}"); + + await Execute(flowContext.HandlerContext, yieldPoint); + } + private class ParallelRequestBuilder : IFlowParallelRequestBuilder @@ -270,14 +296,14 @@ namespace Tapeti.Flow.Default private readonly ITapetiConfig config; - private readonly SendRequestFunc sendRequest; + private readonly FlowProvider flowProvider; private readonly List requests = new List(); - public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest) + public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider) { this.config = config; - this.sendRequest = sendRequest; + this.flowProvider = flowProvider; } @@ -311,36 +337,57 @@ namespace Tapeti.Flow.Default } - public IYieldPoint Yield(Func> continuation) + public IYieldPoint Yield(Func> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) { - return BuildYieldPoint(continuation, false); + return BuildYieldPoint(continuation, false, noRequestsBehaviour); } - public IYieldPoint YieldSync(Func continuation) + public IYieldPoint YieldSync(Func continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) { - return BuildYieldPoint(continuation, true); + return BuildYieldPoint(continuation, true, noRequestsBehaviour); } - private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync) + private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) { if (requests.Count == 0) - throw new YieldPointException("At least one request must be added before yielding a parallel request"); + { + switch (noRequestsBehaviour) + { + case FlowNoRequestsBehaviour.Exception: + throw new YieldPointException("At least one request must be added before yielding a parallel request"); + + case FlowNoRequestsBehaviour.Converge: + return new DelegateYieldPoint(context => + flowProvider.Converge(context, convergeMethod.Method.Name, convergeMethodSync)); + + case FlowNoRequestsBehaviour.EndFlow: + return new DelegateYieldPoint(EndFlow); + + default: + throw new ArgumentOutOfRangeException(nameof(noRequestsBehaviour), noRequestsBehaviour, null); + } + } if (convergeMethod?.Method == null) throw new ArgumentNullException(nameof(convergeMethod)); - return new DelegateYieldPoint(context => + return new DelegateYieldPoint(async context => { if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); - return Task.WhenAll(requests.Select(requestInfo => - sendRequest(context, requestInfo.Message, + await Task.WhenAll(requests.Select(requestInfo => + flowProvider.SendRequest( + context, + requestInfo.Message, requestInfo.ResponseHandlerInfo, convergeMethod.Method.Name, - convergeMethodSync))); + convergeMethodSync, + false))); + + await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); }); } } @@ -349,14 +396,14 @@ namespace Tapeti.Flow.Default private class ParallelRequest : IFlowParallelRequest { private readonly ITapetiConfig config; - private readonly SendRequestFunc sendRequest; + private readonly FlowProvider flowProvider; private readonly FlowContext flowContext; - public ParallelRequest(ITapetiConfig config, SendRequestFunc sendRequest, FlowContext flowContext) + public ParallelRequest(ITapetiConfig config, FlowProvider flowProvider, FlowContext flowContext) { this.config = config; - this.sendRequest = sendRequest; + this.flowProvider = flowProvider; this.flowContext = flowContext; } @@ -382,7 +429,14 @@ namespace Tapeti.Flow.Default private Task InternalAddRequest(object message, Delegate responseHandler) { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); - return sendRequest(flowContext, message, responseHandlerInfo, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync); + + return flowProvider.SendRequest( + flowContext, + message, + responseHandlerInfo, + flowContext.ContinuationMetadata.ConvergeMethodName, + flowContext.ContinuationMetadata.ConvergeMethodSync, + false); } } diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index becb406..df8a485 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -115,9 +115,40 @@ namespace Tapeti.Flow /// Returns the parallel request for the given message context. /// IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context); + + + /// + /// Calls the converge method for a parallel flow. + /// + Task Converge(IFlowHandlerContext context); } + /// + /// Determines how the Yield method of a parallel request behaves when no requests have been added. + /// Useful in cases where requests are sent conditionally. + /// + public enum FlowNoRequestsBehaviour + { + /// + /// Throw an exception. This is the default behaviour to prevent subtle bugs when not specifying the behaviour explicitly, + /// as well as for backwards compatibility. + /// + Exception, + + /// + /// Immediately call the continuation method. + /// + Converge, + + /// + /// End the flow without calling the converge method. + /// + EndFlow + } + + + /// /// Builder to publish one or more request messages and continuing the flow when the responses arrive. /// @@ -152,7 +183,6 @@ namespace Tapeti.Flow /// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are /// async, so you should always await them. - /// /// Constructs an IYieldPoint to continue the flow when responses arrive. /// The continuation method is called when all responses have arrived. @@ -160,8 +190,9 @@ namespace Tapeti.Flow /// controller and can store state. /// Used for asynchronous continuation methods. /// - /// - IYieldPoint Yield(Func> continuation); + /// The converge continuation method to be called when all responses have been handled. + /// How the Yield method should behave when no requests have been added to the parallel request builder. + IYieldPoint Yield(Func> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception); /// /// Constructs an IYieldPoint to continue the flow when responses arrive. @@ -170,8 +201,9 @@ namespace Tapeti.Flow /// controller and can store state. /// Used for synchronous continuation methods. /// - /// - IYieldPoint YieldSync(Func continuation); + /// The converge continuation method to be called when all responses have been handled. + /// How the Yield method should behave when no requests have been added to the parallel request builder. + IYieldPoint YieldSync(Func continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception); } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 9f68728..17ccd30 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -945,8 +945,9 @@ namespace Tapeti.Connection if (returnInfo.RefCount == 0) returnRoutingKeys.Remove(messageInfo.ReturnKey); } + else + messageInfo.CompletionSource.SetResult(0); - messageInfo.CompletionSource.SetResult(0); confirmMessages.Remove(deliveryTag); } }