1
0
mirror of synced 2024-11-21 17:03:50 +00:00

[ci skip] WIP: support for adding requests mid-parallel flow

This commit is contained in:
Mark van Renswoude 2021-12-09 23:52:25 +01:00
parent 68d3a94438
commit 2b4dd8e251
4 changed files with 141 additions and 1 deletions

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Linq;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Annotations; using Tapeti.Annotations;
@ -52,6 +53,10 @@ namespace Tapeti.Flow.Default
} }
else else
throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}"); throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}");
foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(IFlowParallelRequest)))
parameter.SetBinding(ParallelRequestParameterFactory);
} }
@ -103,5 +108,12 @@ namespace Tapeti.Flow.Default
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out _)) if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out _))
throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}"); throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}");
} }
private static object ParallelRequestParameterFactory(IMessageContext context)
{
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.GetParallelRequest(new FlowHandlerContext(context));
}
} }
} }

View File

@ -243,6 +243,15 @@ namespace Tapeti.Flow.Default
} }
/// <inheritdoc />
public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context)
{
if (!context.MessageContext.TryGet<FlowMessageContextPayload>(out var flowPayload))
return null;
}
private class ParallelRequestBuilder : IFlowParallelRequestBuilder private class ParallelRequestBuilder : IFlowParallelRequestBuilder
{ {
@ -284,6 +293,18 @@ namespace Tapeti.Flow.Default
} }
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler)
{
requests.Add(new RequestInfo
{
Message = message,
ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler)
});
return this;
}
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler) public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler)
{ {
requests.Add(new RequestInfo requests.Add(new RequestInfo
@ -296,6 +317,18 @@ namespace Tapeti.Flow.Default
} }
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse, IFlowParallelRequest> responseHandler)
{
requests.Add(new RequestInfo
{
Message = message,
ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler)
});
return this;
}
public IYieldPoint Yield(Func<Task<IYieldPoint>> continuation) public IYieldPoint Yield(Func<Task<IYieldPoint>> continuation)
{ {
return BuildYieldPoint(continuation, false); return BuildYieldPoint(continuation, false);
@ -331,6 +364,40 @@ namespace Tapeti.Flow.Default
} }
private class ParallelRequest : IFlowParallelRequest
{
private readonly ITapetiConfig config;
private readonly SendRequestFunc sendRequest;
public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest)
{
this.config = config;
this.sendRequest = sendRequest;
}
public IFlowParallelRequest AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler)
{
throw new NotImplementedException();
}
public IFlowParallelRequest AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler)
{
throw new NotImplementedException();
}
public IFlowParallelRequest AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler)
{
throw new NotImplementedException();
}
public IFlowParallelRequest AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse, IFlowParallelRequest> responseHandler)
{
throw new NotImplementedException();
}
}
internal class ResponseHandlerInfo internal class ResponseHandlerInfo
{ {
public string MethodName { get; set; } public string MethodName { get; set; }

View File

@ -109,6 +109,12 @@ namespace Tapeti.Flow
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="yieldPoint"></param> /// <param name="yieldPoint"></param>
Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint); Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint);
/// <summary>
/// Returns the parallel request for the given message context.
/// </summary>
IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context);
} }
@ -127,6 +133,13 @@ namespace Tapeti.Flow
/// <param name="responseHandler"></param> /// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler); IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,Task})"/>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler);
/// <summary> /// <summary>
/// Publish a request message and continue the flow when the response arrives. /// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint. /// Note that the response handler can not influence the flow as it does not return a YieldPoint.
@ -137,6 +150,9 @@ namespace Tapeti.Flow
/// <param name="responseHandler"></param> /// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler); IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
/// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are
/// async, so you should always await them.
/// <summary> /// <summary>
/// Constructs an IYieldPoint to continue the flow when responses arrive. /// Constructs an IYieldPoint to continue the flow when responses arrive.
/// The continuation method is called when all responses have arrived. /// The continuation method is called when all responses have arrived.
@ -159,6 +175,51 @@ namespace Tapeti.Flow
} }
/// <summary>
/// Provides means of adding one or more requests to a parallel request.
/// </summary>
/// <remarks>
/// Add a parameter of this type to a parallel request's response handler to gain access to it's functionality.
/// Not available in other contexts.
/// </remarks>
public interface IFlowParallelRequest
{
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint.
/// It can instead store state in the controller for the continuation passed to the Yield method.
/// Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
Task<IFlowParallelRequest> AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,Task})"/>
Task<IFlowParallelRequest> AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler);
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint.
/// It can instead store state in the controller for the continuation passed to the Yield method.
/// Used for synchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
Task<IFlowParallelRequest> AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequestSync{TRequest,TResponse}(TRequest,Action{TResponse})"/>
Task<IFlowParallelRequest> AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse, IFlowParallelRequest> responseHandler);
}
/// <summary> /// <summary>
/// Defines if and how the Flow should continue. Construct using any of the IFlowProvider methods. /// Defines if and how the Flow should continue. Construct using any of the IFlowProvider methods.
/// </summary> /// </summary>

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework> <TargetFramework>net5.0</TargetFramework>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">