diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs
index b686f1a..199f058 100644
--- a/Tapeti.Flow/Default/FlowProvider.cs
+++ b/Tapeti.Flow/Default/FlowProvider.cs
@@ -45,6 +45,20 @@ namespace Tapeti.Flow.Default
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
+ ///
+ public IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class
+ {
+ var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
+ return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo));
+ }
+
+ ///
+ public IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class
+ {
+ var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
+ return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo));
+ }
+
///
public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) where TRequest : class where TResponse : class
{
@@ -52,6 +66,13 @@ namespace Tapeti.Flow.Default
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
+ ///
+ public IYieldPoint YieldWithRequestDirectSync(TRequest message, string queueName, Func responseHandler) where TRequest : class where TResponse : class
+ {
+ var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
+ return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo));
+ }
+
///
public IFlowParallelRequestBuilder YieldWithParallelRequest()
{
@@ -110,6 +131,16 @@ namespace Tapeti.Flow.Default
}
+ internal async Task SendRequestDirect(FlowContext context, object message, string queueName, ResponseHandlerInfo responseHandlerInfo,
+ string convergeMethodName = null, bool convergeMethodTaskSync = false)
+ {
+ var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync);
+ await context.Store(responseHandlerInfo.IsDurableQueue);
+
+ await publisher.PublishDirect(message, queueName, properties, true);
+ }
+
+
private async Task SendResponse(FlowContext context, object message)
{
var reply = context.HasFlowStateAndLock
diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs
index 613e82d..e1b8fc8 100644
--- a/Tapeti.Flow/IFlowProvider.cs
+++ b/Tapeti.Flow/IFlowProvider.cs
@@ -35,6 +35,34 @@ namespace Tapeti.Flow
IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) where TRequest : class where TResponse : class;
+ ///
+ /// Publish a request message directly to a queue and continue the flow when the response arrives.
+ /// The exchange and routing key are not used.
+ /// The request message must be marked with the [Request] attribute, and the
+ /// Response type must match. Used for asynchronous response handlers.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class;
+
+
+ ///
+ /// Publish a request message directly to a queue and continue the flow when the response arrives.
+ /// The exchange and routing key are not used.
+ /// The request message must be marked with the [Request] attribute, and the
+ /// Response type must match. Used for asynchronous response handlers.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ IYieldPoint YieldWithRequestDirect(TRequest message, string queueName, Func> responseHandler) where TRequest : class where TResponse : class;
+
+
///
/// Publish a request message and continue the flow when the response arrives.
/// The request message must be marked with the [Request] attribute, and the
@@ -54,6 +82,27 @@ namespace Tapeti.Flow
IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) where TRequest : class where TResponse : class;
+ ///
+ /// Publish a request message directly to a queue and continue the flow when the response arrives.
+ /// The exchange and routing key are not used.
+ /// The request message must be marked with the [Request] attribute, and the
+ /// Response type must match. Used for synchronous response handlers.
+ ///
+ ///
+ /// The reason why this requires the extra 'Sync' in the name: one does not simply overload methods
+ /// with Task vs non-Task Funcs. "Ambiguous call". Apparantly this is because a return type
+ /// of a method is not part of its signature,according to:
+ /// http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ IYieldPoint YieldWithRequestDirectSync(TRequest message, string queueName, Func responseHandler) where TRequest : class where TResponse : class;
+
+
///
/// Create a request builder to publish one or more requests messages. Call Yield on the resulting builder
/// to acquire an IYieldPoint.