From 2b4dd8e25111a33d5076c40fee1b743c38606200 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 9 Dec 2021 23:52:25 +0100 Subject: [PATCH] [ci skip] WIP: support for adding requests mid-parallel flow --- Tapeti.Flow/Default/FlowBindingMiddleware.cs | 12 ++++ Tapeti.Flow/Default/FlowProvider.cs | 67 ++++++++++++++++++++ Tapeti.Flow/IFlowProvider.cs | 61 ++++++++++++++++++ Tapeti.Tests/Tapeti.Tests.csproj | 2 +- 4 files changed, 141 insertions(+), 1 deletion(-) diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 1e21847..c0f9565 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Reflection; using System.Threading.Tasks; using Tapeti.Annotations; @@ -52,6 +53,10 @@ namespace Tapeti.Flow.Default } else throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}"); + + + foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(IFlowParallelRequest))) + parameter.SetBinding(ParallelRequestParameterFactory); } @@ -103,5 +108,12 @@ namespace Tapeti.Flow.Default if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out _)) throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}"); } + + + private static object ParallelRequestParameterFactory(IMessageContext context) + { + var flowHandler = context.Config.DependencyResolver.Resolve(); + return flowHandler.GetParallelRequest(new FlowHandlerContext(context)); + } } } diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 821328e..1c9bd7f 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -243,6 +243,15 @@ namespace Tapeti.Flow.Default } + /// + public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context) + { + if (!context.MessageContext.TryGet(out var flowPayload)) + return null; + + + } + private class ParallelRequestBuilder : IFlowParallelRequestBuilder { @@ -284,6 +293,18 @@ namespace Tapeti.Flow.Default } + public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) + { + requests.Add(new RequestInfo + { + Message = message, + ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) + }); + + return this; + } + + public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) { requests.Add(new RequestInfo @@ -296,6 +317,18 @@ namespace Tapeti.Flow.Default } + public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) + { + requests.Add(new RequestInfo + { + Message = message, + ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) + }); + + return this; + } + + public IYieldPoint Yield(Func> continuation) { return BuildYieldPoint(continuation, false); @@ -331,6 +364,40 @@ namespace Tapeti.Flow.Default } + private class ParallelRequest : IFlowParallelRequest + { + private readonly ITapetiConfig config; + private readonly SendRequestFunc sendRequest; + + + public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest) + { + this.config = config; + this.sendRequest = sendRequest; + } + + public IFlowParallelRequest AddRequest(TRequest message, Func responseHandler) + { + throw new NotImplementedException(); + } + + public IFlowParallelRequest AddRequest(TRequest message, Func responseHandler) + { + throw new NotImplementedException(); + } + + public IFlowParallelRequest AddRequestSync(TRequest message, Action responseHandler) + { + throw new NotImplementedException(); + } + + public IFlowParallelRequest AddRequestSync(TRequest message, Action responseHandler) + { + throw new NotImplementedException(); + } + } + + internal class ResponseHandlerInfo { public string MethodName { get; set; } diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index fa5f0a6..bdb7e1d 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -109,6 +109,12 @@ namespace Tapeti.Flow /// /// Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint); + + + /// + /// Returns the parallel request for the given message context. + /// + IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context); } @@ -127,6 +133,13 @@ namespace Tapeti.Flow /// IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + /// + /// This overload allows the response handler access to the IFlowParallelRequest interface, which + /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. + /// + /// + IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + /// /// Publish a request message and continue the flow when the response arrives. /// Note that the response handler can not influence the flow as it does not return a YieldPoint. @@ -137,6 +150,9 @@ namespace Tapeti.Flow /// IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler); + /// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are + /// async, so you should always await them. + /// /// Constructs an IYieldPoint to continue the flow when responses arrive. /// The continuation method is called when all responses have arrived. @@ -159,6 +175,51 @@ namespace Tapeti.Flow } + /// + /// Provides means of adding one or more requests to a parallel request. + /// + /// + /// Add a parameter of this type to a parallel request's response handler to gain access to it's functionality. + /// Not available in other contexts. + /// + public interface IFlowParallelRequest + { + /// + /// Publish a request message and continue the flow when the response arrives. + /// Note that the response handler can not influence the flow as it does not return a YieldPoint. + /// It can instead store state in the controller for the continuation passed to the Yield method. + /// Used for asynchronous response handlers. + /// + /// + /// + Task AddRequest(TRequest message, Func responseHandler); + + /// + /// This overload allows the response handler access to the IFlowParallelRequest interface, which + /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. + /// + /// + Task AddRequest(TRequest message, Func responseHandler); + + /// + /// Publish a request message and continue the flow when the response arrives. + /// Note that the response handler can not influence the flow as it does not return a YieldPoint. + /// It can instead store state in the controller for the continuation passed to the Yield method. + /// Used for synchronous response handlers. + /// + /// + /// + Task AddRequestSync(TRequest message, Action responseHandler); + + /// + /// This overload allows the response handler access to the IFlowParallelRequest interface, which + /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. + /// + /// + Task AddRequestSync(TRequest message, Action responseHandler); + } + + /// /// Defines if and how the Flow should continue. Construct using any of the IFlowProvider methods. /// diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index 949f4a3..4a6bfa1 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -1,7 +1,7 @@ - netcoreapp2.1 + net5.0