From a6f2b24ea04e1bded887734dae8352b5b861471a Mon Sep 17 00:00:00 2001 From: Frederik Date: Wed, 1 May 2024 09:25:13 +0200 Subject: [PATCH] Added possibility to publish a request-response directly to a queue --- Tapeti.Flow/Default/FlowProvider.cs | 31 ++++++++++++++++++ Tapeti.Flow/IFlowProvider.cs | 49 +++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 62e3176..49df2d2 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -45,6 +45,20 @@ namespace Tapeti.Flow.Default return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } + /// + public IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo)); + } + + /// + public IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo)); + } + /// public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) where TRequest : class where TResponse : class { @@ -52,6 +66,13 @@ namespace Tapeti.Flow.Default return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } + /// + public IYieldPoint YieldWithRequestDirectSync(TRequest message, string queueName, Func responseHandler) where TRequest : class where TResponse : class + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo)); + } + /// public IFlowParallelRequestBuilder YieldWithParallelRequest() { @@ -110,6 +131,16 @@ namespace Tapeti.Flow.Default } + internal async Task SendRequestDirect(FlowContext context, object message, string queueName, ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName = null, bool convergeMethodTaskSync = false) + { + var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync); + await context.Store(responseHandlerInfo.IsDurableQueue); + + await publisher.PublishDirect(message, queueName, properties, true); + } + + private async Task SendResponse(FlowContext context, object message) { var reply = context.HasFlowStateAndLock diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index 613e82d..e1b8fc8 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -35,6 +35,34 @@ namespace Tapeti.Flow IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) where TRequest : class where TResponse : class; + /// + /// Publish a request message directly to a queue and continue the flow when the response arrives. + /// The exchange and routing key are not used. + /// The request message must be marked with the [Request] attribute, and the + /// Response type must match. Used for asynchronous response handlers. + /// + /// + /// + /// + /// + /// + IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class; + + + /// + /// Publish a request message directly to a queue and continue the flow when the response arrives. + /// The exchange and routing key are not used. + /// The request message must be marked with the [Request] attribute, and the + /// Response type must match. Used for asynchronous response handlers. + /// + /// + /// + /// + /// + /// + IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class; + + /// /// Publish a request message and continue the flow when the response arrives. /// The request message must be marked with the [Request] attribute, and the @@ -54,6 +82,27 @@ namespace Tapeti.Flow IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) where TRequest : class where TResponse : class; + /// + /// Publish a request message directly to a queue and continue the flow when the response arrives. + /// The exchange and routing key are not used. + /// The request message must be marked with the [Request] attribute, and the + /// Response type must match. Used for synchronous response handlers. + /// + /// + /// The reason why this requires the extra 'Sync' in the name: one does not simply overload methods + /// with Task vs non-Task Funcs. "Ambiguous call". Apparantly this is because a return type + /// of a method is not part of its signature,according to: + /// http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter + /// + /// + /// + /// + /// + /// + /// + IYieldPoint YieldWithRequestDirectSync(TRequest message, string queueName, Func responseHandler) where TRequest : class where TResponse : class; + + /// /// Create a request builder to publish one or more requests messages. Call Yield on the resulting builder /// to acquire an IYieldPoint.