From 7a25a1fd1fd701287ef2631d3b0ff96dc6b9ce08 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 9 Feb 2022 12:19:05 +0100 Subject: [PATCH] Added ValueTask support to flows --- .../ParallelFlowController.cs | 2 +- Tapeti.Flow/Default/FlowBindingMiddleware.cs | 8 +++ Tapeti.Flow/Default/FlowProvider.cs | 25 ++++++++++ Tapeti.Flow/IFlowProvider.cs | 49 +++++++++++++++++++ appveyor.yml | 2 +- 5 files changed, 84 insertions(+), 2 deletions(-) diff --git a/Examples/03-FlowRequestResponse/ParallelFlowController.cs b/Examples/03-FlowRequestResponse/ParallelFlowController.cs index 04d79f5..9a8dc7a 100644 --- a/Examples/03-FlowRequestResponse/ParallelFlowController.cs +++ b/Examples/03-FlowRequestResponse/ParallelFlowController.cs @@ -56,7 +56,7 @@ namespace _03_FlowRequestResponse [Continuation] - public async Task HandleSecondQuoteResponse(QuoteResponseMessage message, IFlowParallelRequest parallelRequest) + public async ValueTask HandleSecondQuoteResponse(QuoteResponseMessage message, IFlowParallelRequest parallelRequest) { Console.WriteLine("[ParallelFlowController] Second quote response received"); SecondQuote = message.Quote; diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 83564c8..877f890 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -50,6 +50,14 @@ namespace Tapeti.Flow.Default await HandleParallelResponse(messageContext); }); } + if (context.Result.Info.ParameterType == typeof(ValueTask)) + { + context.Result.SetHandler(async (messageContext, value) => + { + await (ValueTask)value; + await HandleParallelResponse(messageContext); + }); + } else if (context.Result.Info.ParameterType == typeof(void)) { context.Result.SetHandler((messageContext, value) => HandleParallelResponse(messageContext)); diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 394377b..509ab64 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -38,6 +38,13 @@ namespace Tapeti.Flow.Default return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } + /// + public IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); + } + /// public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) { @@ -312,18 +319,31 @@ namespace Tapeti.Flow.Default return InternalAddRequest(message, responseHandler); } + public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) + { + return InternalAddRequest(message, responseHandler); + } public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) { return InternalAddRequest(message, responseHandler); } + public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) + { + return InternalAddRequest(message, responseHandler); + } public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) { return InternalAddRequest(message, responseHandler); } + public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) + { + return InternalAddRequest(message, responseHandler); + } + private IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler) { @@ -342,6 +362,11 @@ namespace Tapeti.Flow.Default return BuildYieldPoint(continuation, false, noRequestsBehaviour); } + public IYieldPoint Yield(Func> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) + { + throw new NotImplementedException(); + } + public IYieldPoint YieldSync(Func continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) { diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index d8b7abe..3ac98de 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -23,6 +23,18 @@ namespace Tapeti.Flow IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler); + /// + /// Publish a request message and continue the flow when the response arrives. + /// The request message must be marked with the [Request] attribute, and the + /// Response type must match. Used for asynchronous response handlers. + /// + /// + /// + /// + /// + IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler); + + /// /// Publish a request message and continue the flow when the response arrives. /// The request message must be marked with the [Request] attribute, and the @@ -164,6 +176,16 @@ namespace Tapeti.Flow /// IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + /// + /// Publish a request message and continue the flow when the response arrives. + /// Note that the response handler can not influence the flow as it does not return a YieldPoint. + /// It can instead store state in the controller for the continuation passed to the Yield method. + /// Used for asynchronous response handlers. + /// + /// + /// + IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + /// /// This overload allows the response handler access to the IFlowParallelRequest interface, which /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. @@ -171,6 +193,13 @@ namespace Tapeti.Flow /// IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + /// + /// This overload allows the response handler access to the IFlowParallelRequest interface, which + /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. + /// + /// + IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + /// /// Publish a request message and continue the flow when the response arrives. /// Note that the response handler can not influence the flow as it does not return a YieldPoint. @@ -181,6 +210,13 @@ namespace Tapeti.Flow /// IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler); + /// + /// This overload allows the response handler access to the IFlowParallelRequest interface, which + /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. + /// + /// + IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler); + /// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are /// async, so you should always await them. /// @@ -194,6 +230,19 @@ namespace Tapeti.Flow /// How the Yield method should behave when no requests have been added to the parallel request builder. IYieldPoint Yield(Func> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception); + /// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are + /// async, so you should always await them. + /// + /// Constructs an IYieldPoint to continue the flow when responses arrive. + /// The continuation method is called when all responses have arrived. + /// Response handlers and the continuation method are guaranteed thread-safe access to the + /// controller and can store state. + /// Used for asynchronous continuation methods. + /// + /// The converge continuation method to be called when all responses have been handled. + /// How the Yield method should behave when no requests have been added to the parallel request builder. + IYieldPoint Yield(Func> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception); + /// /// Constructs an IYieldPoint to continue the flow when responses arrive. /// The continuation method is called when all responses have arrived. diff --git a/appveyor.yml b/appveyor.yml index 14eabb1..80d0eb8 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -10,7 +10,7 @@ before_build: - ps: build\UpdateVersion.ps1 environment: - pack_params: -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true -p:EmbedUntrackedSources=true --output output -p:Configuration=Release -p:p:ContinuousIntegrationBuild=true + pack_params: -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true -p:EmbedUntrackedSources=true --output output -p:Configuration=Release -p:p:ContinuousIntegrationBuild=true --verbosity detailed after_build: # Create NuGet packages