2017-02-07 15:13:33 +00:00
using System ;
2021-12-09 22:52:25 +00:00
using System.Linq ;
2017-02-07 15:13:33 +00:00
using System.Reflection ;
using System.Threading.Tasks ;
2018-12-19 19:50:56 +00:00
using Tapeti.Annotations ;
2017-02-07 15:13:33 +00:00
using Tapeti.Config ;
using Tapeti.Flow.Annotations ;
using Tapeti.Helpers ;
namespace Tapeti.Flow.Default
{
2019-08-13 18:30:04 +00:00
internal class FlowBindingMiddleware : IControllerBindingMiddleware
2017-02-07 15:13:33 +00:00
{
2019-08-13 18:30:04 +00:00
public void Handle ( IControllerBindingContext context , Action next )
2017-02-07 15:13:33 +00:00
{
2017-02-15 21:05:01 +00:00
if ( context . Method . GetCustomAttribute < StartAttribute > ( ) ! = null )
return ;
2017-02-07 15:13:33 +00:00
RegisterYieldPointResult ( context ) ;
2017-02-12 18:04:26 +00:00
RegisterContinuationFilter ( context ) ;
2017-02-07 15:13:33 +00:00
next ( ) ;
ValidateRequestResponse ( context ) ;
}
2019-08-13 18:30:04 +00:00
private static void RegisterContinuationFilter ( IControllerBindingContext context )
2017-02-07 15:13:33 +00:00
{
var continuationAttribute = context . Method . GetCustomAttribute < ContinuationAttribute > ( ) ;
if ( continuationAttribute = = null )
return ;
2019-08-14 18:48:40 +00:00
context . SetBindingTargetMode ( BindingTargetMode . Direct ) ;
context . Use ( new FlowContinuationMiddleware ( ) ) ;
2017-02-12 18:04:26 +00:00
if ( context . Result . HasHandler )
return ;
// Continuation without IYieldPoint indicates a ParallelRequestBuilder response handler,
// make sure to store it's state as well
if ( context . Result . Info . ParameterType = = typeof ( Task ) )
{
context . Result . SetHandler ( async ( messageContext , value ) = >
{
await ( Task ) value ;
await HandleParallelResponse ( messageContext ) ;
} ) ;
}
else if ( context . Result . Info . ParameterType = = typeof ( void ) )
{
context . Result . SetHandler ( ( messageContext , value ) = > HandleParallelResponse ( messageContext ) ) ;
}
else
throw new ArgumentException ( $"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}" ) ;
2021-12-09 22:52:25 +00:00
foreach ( var parameter in context . Parameters . Where ( p = > ! p . HasBinding & & p . Info . ParameterType = = typeof ( IFlowParallelRequest ) ) )
parameter . SetBinding ( ParallelRequestParameterFactory ) ;
2017-02-07 15:13:33 +00:00
}
2019-08-13 18:30:04 +00:00
private static void RegisterYieldPointResult ( IControllerBindingContext context )
2017-02-07 15:13:33 +00:00
{
2018-12-19 19:50:56 +00:00
if ( ! context . Result . Info . ParameterType . IsTypeOrTaskOf ( typeof ( IYieldPoint ) , out var isTaskOf ) )
2017-02-07 15:13:33 +00:00
return ;
2017-02-13 14:47:57 +00:00
if ( isTaskOf )
2017-02-07 15:13:33 +00:00
{
context . Result . SetHandler ( async ( messageContext , value ) = >
{
var yieldPoint = await ( Task < IYieldPoint > ) value ;
if ( yieldPoint ! = null )
await HandleYieldPoint ( messageContext , yieldPoint ) ;
} ) ;
}
else
context . Result . SetHandler ( ( messageContext , value ) = > HandleYieldPoint ( messageContext , ( IYieldPoint ) value ) ) ;
}
2021-09-02 14:16:11 +00:00
private static Task HandleYieldPoint ( IMessageContext context , IYieldPoint yieldPoint )
2017-02-07 15:13:33 +00:00
{
2019-08-13 18:30:04 +00:00
var flowHandler = context . Config . DependencyResolver . Resolve < IFlowHandler > ( ) ;
2019-08-15 10:04:03 +00:00
return flowHandler . Execute ( new FlowHandlerContext ( context ) , yieldPoint ) ;
2017-02-07 15:13:33 +00:00
}
2021-09-02 14:16:11 +00:00
private static Task HandleParallelResponse ( IMessageContext context )
2017-02-12 18:04:26 +00:00
{
2021-09-02 14:16:11 +00:00
if ( context . TryGet < FlowMessageContextPayload > ( out var flowPayload ) & & flowPayload . FlowIsConverging )
2020-01-20 15:47:59 +00:00
return Task . CompletedTask ;
2019-08-13 18:30:04 +00:00
var flowHandler = context . Config . DependencyResolver . Resolve < IFlowHandler > ( ) ;
2019-08-15 10:04:03 +00:00
return flowHandler . Execute ( new FlowHandlerContext ( context ) , new DelegateYieldPoint ( async flowContext = >
2018-12-19 19:20:08 +00:00
{
2023-04-17 14:40:26 +00:00
// IFlowParallelRequest.AddRequest will store the flow immediately
if ( ! flowPayload . FlowContext . IsStoredOrDeleted ( ) )
await flowContext . Store ( context . Binding . QueueType = = QueueType . Durable ) ;
2018-12-19 19:20:08 +00:00
} ) ) ;
2017-02-12 18:04:26 +00:00
}
2019-08-13 18:30:04 +00:00
private static void ValidateRequestResponse ( IControllerBindingContext context )
2017-02-07 15:13:33 +00:00
{
var request = context . MessageClass ? . GetCustomAttribute < RequestAttribute > ( ) ;
if ( request ? . Response = = null )
return ;
2018-12-19 19:50:56 +00:00
if ( ! context . Result . Info . ParameterType . IsTypeOrTaskOf ( t = > t = = request . Response | | t = = typeof ( IYieldPoint ) , out _ ) )
2017-02-07 15:13:33 +00:00
throw new ResponseExpectedException ( $"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}" ) ;
}
2021-12-09 22:52:25 +00:00
private static object ParallelRequestParameterFactory ( IMessageContext context )
{
var flowHandler = context . Config . DependencyResolver . Resolve < IFlowHandler > ( ) ;
return flowHandler . GetParallelRequest ( new FlowHandlerContext ( context ) ) ;
}
2017-02-07 15:13:33 +00:00
}
}