Added possibility to publish a request-response directly to a queue

This commit is contained in:
Frederik 2024-05-01 09:25:13 +02:00
parent c7f736e033
commit a6f2b24ea0
2 changed files with 80 additions and 0 deletions

View File

@ -45,6 +45,20 @@ namespace Tapeti.Flow.Default
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
/// <inheritdoc />
public IYieldPoint YieldWithRequestDirect<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, Task<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo));
}
/// <inheritdoc />
public IYieldPoint YieldWithRequestDirect<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, ValueTask<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo));
}
/// <inheritdoc />
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class
{
@ -52,6 +66,13 @@ namespace Tapeti.Flow.Default
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
/// <inheritdoc />
public IYieldPoint YieldWithRequestDirectSync<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo));
}
/// <inheritdoc />
public IFlowParallelRequestBuilder YieldWithParallelRequest()
{
@ -110,6 +131,16 @@ namespace Tapeti.Flow.Default
}
internal async Task SendRequestDirect(FlowContext context, object message, string queueName, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false)
{
var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync);
await context.Store(responseHandlerInfo.IsDurableQueue);
await publisher.PublishDirect(message, queueName, properties, true);
}
private async Task SendResponse(FlowContext context, object message)
{
var reply = context.HasFlowStateAndLock

View File

@ -35,6 +35,34 @@ namespace Tapeti.Flow
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message directly to a queue and continue the flow when the response arrives.
/// The exchange and routing key are not used.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="queueName"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint YieldWithRequestDirect<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, Task<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message directly to a queue and continue the flow when the response arrives.
/// The exchange and routing key are not used.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="queueName"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint YieldWithRequestDirect<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, ValueTask<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// The request message must be marked with the [Request] attribute, and the
@ -54,6 +82,27 @@ namespace Tapeti.Flow
IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message directly to a queue and continue the flow when the response arrives.
/// The exchange and routing key are not used.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for synchronous response handlers.
/// </summary>
/// <remarks>
/// The reason why this requires the extra 'Sync' in the name: one does not simply overload methods
/// with Task vs non-Task Funcs. "Ambiguous call". Apparantly this is because a return type
/// of a method is not part of its signature,according to:
/// http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter
/// </remarks>
/// <param name="message"></param>
/// <param name="queueName"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
IYieldPoint YieldWithRequestDirectSync<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Create a request builder to publish one or more requests messages. Call Yield on the resulting builder
/// to acquire an IYieldPoint.