2017-01-31 11:01:08 +00:00
using System ;
2017-02-05 22:22:34 +00:00
using System.Collections.Generic ;
2018-12-19 19:50:56 +00:00
using System.Diagnostics ;
2017-02-12 18:04:26 +00:00
using System.Linq ;
2017-01-31 11:01:08 +00:00
using System.Reflection ;
using System.Threading.Tasks ;
2018-12-19 19:50:56 +00:00
using Tapeti.Annotations ;
2017-01-31 11:01:08 +00:00
using Tapeti.Config ;
2019-08-13 18:30:04 +00:00
using Tapeti.Default ;
2017-01-31 11:01:08 +00:00
using Tapeti.Flow.Annotations ;
using Tapeti.Flow.FlowHelpers ;
namespace Tapeti.Flow.Default
{
2019-08-14 18:48:40 +00:00
/// <inheritdoc cref="IFlowProvider"/> />
/// <summary>
/// Default implementation for IFlowProvider.
/// </summary>
2017-01-31 11:01:08 +00:00
public class FlowProvider : IFlowProvider , IFlowHandler
{
2019-08-13 18:30:04 +00:00
private readonly ITapetiConfig config ;
2017-02-05 22:22:34 +00:00
private readonly IInternalPublisher publisher ;
2017-01-31 11:01:08 +00:00
2021-05-29 19:51:58 +00:00
/// <summary>
/// </summary>
2019-08-13 18:30:04 +00:00
public FlowProvider ( ITapetiConfig config , IPublisher publisher )
2017-01-31 11:01:08 +00:00
{
this . config = config ;
2017-02-05 22:22:34 +00:00
this . publisher = ( IInternalPublisher ) publisher ;
2017-01-31 11:01:08 +00:00
}
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2017-01-31 11:01:08 +00:00
public IYieldPoint YieldWithRequest < TRequest , TResponse > ( TRequest message , Func < TResponse , Task < IYieldPoint > > responseHandler )
{
var responseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler ) ;
2017-10-17 08:34:07 +00:00
return new DelegateYieldPoint ( context = > SendRequest ( context , message , responseHandlerInfo ) ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2017-01-31 11:01:08 +00:00
public IYieldPoint YieldWithRequestSync < TRequest , TResponse > ( TRequest message , Func < TResponse , IYieldPoint > responseHandler )
{
var responseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler ) ;
2017-10-17 08:34:07 +00:00
return new DelegateYieldPoint ( context = > SendRequest ( context , message , responseHandlerInfo ) ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2017-01-31 11:01:08 +00:00
public IFlowParallelRequestBuilder YieldWithParallelRequest ( )
{
2021-12-10 10:45:09 +00:00
return new ParallelRequestBuilder ( config , this ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2017-01-31 11:01:08 +00:00
public IYieldPoint EndWithResponse < TResponse > ( TResponse message )
{
2017-10-17 08:34:07 +00:00
return new DelegateYieldPoint ( context = > SendResponse ( context , message ) ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2017-01-31 11:01:08 +00:00
public IYieldPoint End ( )
{
2017-10-17 08:34:07 +00:00
return new DelegateYieldPoint ( EndFlow ) ;
2017-01-31 11:01:08 +00:00
}
2021-12-10 10:45:09 +00:00
internal async Task SendRequest ( FlowContext context , object message , ResponseHandlerInfo responseHandlerInfo ,
string convergeMethodName = null , bool convergeMethodTaskSync = false , bool store = true )
2017-01-31 11:01:08 +00:00
{
2017-10-17 08:34:07 +00:00
if ( context . FlowState = = null )
{
await CreateNewFlowState ( context ) ;
2018-12-19 20:40:53 +00:00
Debug . Assert ( context . FlowState ! = null , "context.FlowState != null" ) ;
2017-10-17 08:34:07 +00:00
}
2017-01-31 11:01:08 +00:00
var continuationID = Guid . NewGuid ( ) ;
context . FlowState . Continuations . Add ( continuationID ,
2017-02-05 22:22:34 +00:00
new ContinuationMetadata
2017-01-31 11:01:08 +00:00
{
MethodName = responseHandlerInfo . MethodName ,
2017-02-12 18:04:26 +00:00
ConvergeMethodName = convergeMethodName ,
ConvergeMethodSync = convergeMethodTaskSync
2017-02-05 22:22:34 +00:00
} ) ;
2017-01-31 11:01:08 +00:00
2019-08-13 18:30:04 +00:00
var properties = new MessageProperties
2017-01-31 11:01:08 +00:00
{
CorrelationId = continuationID . ToString ( ) ,
ReplyTo = responseHandlerInfo . ReplyToQueue
} ;
2021-12-10 10:45:09 +00:00
if ( store )
await context . Store ( responseHandlerInfo . IsDurableQueue ) ;
2017-10-17 08:34:07 +00:00
2019-01-24 21:52:21 +00:00
await publisher . Publish ( message , properties , true ) ;
2017-01-31 11:01:08 +00:00
}
private async Task SendResponse ( FlowContext context , object message )
{
2017-10-17 08:34:07 +00:00
var reply = context . FlowState = = null
2019-08-15 10:04:03 +00:00
? GetReply ( context . HandlerContext )
2017-10-17 08:34:07 +00:00
: context . FlowState . Metadata . Reply ;
2017-02-05 22:22:34 +00:00
if ( reply = = null )
throw new YieldPointException ( "No response is required" ) ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
if ( message . GetType ( ) . FullName ! = reply . ResponseTypeName )
throw new YieldPointException ( $"Flow must end with a response message of type {reply.ResponseTypeName}, {message.GetType().FullName} was returned instead" ) ;
2017-01-31 11:01:08 +00:00
2019-08-13 18:30:04 +00:00
var properties = new MessageProperties
{
CorrelationId = reply . CorrelationId
} ;
2017-02-05 22:22:34 +00:00
// TODO disallow if replyto is not specified?
2017-10-17 08:34:07 +00:00
if ( reply . ReplyTo ! = null )
2019-04-24 16:04:30 +00:00
await publisher . PublishDirect ( message , reply . ReplyTo , properties , reply . Mandatory ) ;
2017-02-05 22:22:34 +00:00
else
2019-04-24 16:04:30 +00:00
await publisher . Publish ( message , properties , reply . Mandatory ) ;
2017-10-17 08:34:07 +00:00
await context . Delete ( ) ;
2017-01-31 11:01:08 +00:00
}
2021-12-10 10:45:09 +00:00
internal static async Task EndFlow ( FlowContext context )
2017-01-31 11:01:08 +00:00
{
2017-10-17 08:34:07 +00:00
await context . Delete ( ) ;
2017-01-31 11:01:08 +00:00
2018-12-19 19:50:56 +00:00
if ( context . FlowState ? . Metadata . Reply ! = null )
2017-10-17 08:34:07 +00:00
throw new YieldPointException ( $"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}" ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-13 18:30:04 +00:00
private static ResponseHandlerInfo GetResponseHandlerInfo ( ITapetiConfig config , object request , Delegate responseHandler )
2017-01-31 11:01:08 +00:00
{
2019-08-15 15:56:38 +00:00
var requestAttribute = request . GetType ( ) . GetCustomAttribute < RequestAttribute > ( ) ;
if ( requestAttribute ? . Response = = null )
throw new ArgumentException ( $"Request message {request.GetType().Name} must be marked with the Request attribute and a valid Response type" , nameof ( request ) ) ;
2019-08-13 18:30:04 +00:00
var binding = config . Bindings . ForMethod ( responseHandler ) ;
2017-01-31 11:01:08 +00:00
if ( binding = = null )
throw new ArgumentException ( "responseHandler must be a registered message handler" , nameof ( responseHandler ) ) ;
2019-08-15 15:56:38 +00:00
if ( ! binding . Accept ( requestAttribute . Response ) )
2019-04-24 16:04:30 +00:00
throw new ArgumentException ( $"responseHandler must accept message of type {requestAttribute.Response}" , nameof ( responseHandler ) ) ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
var continuationAttribute = binding . Method . GetCustomAttribute < ContinuationAttribute > ( ) ;
if ( continuationAttribute = = null )
2017-02-15 21:05:01 +00:00
throw new ArgumentException ( "responseHandler must be marked with the Continuation attribute" , nameof ( responseHandler ) ) ;
if ( binding . QueueName = = null )
2019-08-15 15:56:38 +00:00
throw new ArgumentException ( "responseHandler is not yet subscribed to a queue, TapetiConnection.Subscribe must be called before starting a flow" , nameof ( responseHandler ) ) ;
2017-02-05 22:22:34 +00:00
2017-01-31 11:01:08 +00:00
return new ResponseHandlerInfo
{
MethodName = MethodSerializer . Serialize ( responseHandler . Method ) ,
2019-08-19 07:33:07 +00:00
ReplyToQueue = binding . QueueName ,
IsDurableQueue = binding . QueueType = = QueueType . Durable
2017-01-31 11:01:08 +00:00
} ;
}
2019-08-15 10:04:03 +00:00
private static ReplyMetadata GetReply ( IFlowHandlerContext context )
2017-01-31 11:01:08 +00:00
{
2021-09-02 14:16:11 +00:00
var requestAttribute = context . MessageContext ? . Message ? . GetType ( ) . GetCustomAttribute < RequestAttribute > ( ) ;
2017-02-05 22:22:34 +00:00
if ( requestAttribute ? . Response = = null )
return null ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
return new ReplyMetadata
{
2021-09-02 14:16:11 +00:00
CorrelationId = context . MessageContext . Properties . CorrelationId ,
ReplyTo = context . MessageContext . Properties . ReplyTo ,
2019-04-24 16:04:30 +00:00
ResponseTypeName = requestAttribute . Response . FullName ,
2021-09-02 14:16:11 +00:00
Mandatory = context . MessageContext . Properties . Persistent . GetValueOrDefault ( true )
2017-02-05 22:22:34 +00:00
} ;
}
2019-08-13 18:30:04 +00:00
private static async Task CreateNewFlowState ( FlowContext flowContext )
2017-10-17 08:34:07 +00:00
{
2019-08-15 10:04:03 +00:00
var flowStore = flowContext . HandlerContext . Config . DependencyResolver . Resolve < IFlowStore > ( ) ;
2017-10-17 08:34:07 +00:00
var flowID = Guid . NewGuid ( ) ;
flowContext . FlowStateLock = await flowStore . LockFlowState ( flowID ) ;
if ( flowContext . FlowStateLock = = null )
throw new InvalidOperationException ( "Unable to lock a new flow" ) ;
flowContext . FlowState = new FlowState
{
Metadata = new FlowMetadata
{
2019-08-15 10:04:03 +00:00
Reply = GetReply ( flowContext . HandlerContext )
2017-10-17 08:34:07 +00:00
}
} ;
}
2017-02-05 22:22:34 +00:00
2019-08-15 10:04:03 +00:00
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2019-08-15 10:04:03 +00:00
public async Task Execute ( IFlowHandlerContext context , IYieldPoint yieldPoint )
2017-02-05 22:22:34 +00:00
{
2018-12-19 19:50:56 +00:00
if ( ! ( yieldPoint is DelegateYieldPoint executableYieldPoint ) )
2019-08-15 10:04:03 +00:00
throw new YieldPointException ( $"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Method.Name}" ) ;
2017-02-05 22:22:34 +00:00
2019-08-16 09:47:57 +00:00
FlowContext flowContext = null ;
var disposeFlowContext = false ;
try
2017-02-05 22:22:34 +00:00
{
2021-09-02 14:16:11 +00:00
var messageContext = context . MessageContext ;
if ( messageContext = = null | | ! messageContext . TryGet < FlowMessageContextPayload > ( out var flowPayload ) )
2017-02-05 22:22:34 +00:00
{
2019-08-16 09:47:57 +00:00
flowContext = new FlowContext
{
HandlerContext = context
} ;
// If we ended up here it is because of a Start. No point in storing the new FlowContext
// in the messageContext as the yield point is the last to execute.
disposeFlowContext = true ;
}
2021-09-02 14:16:11 +00:00
else
flowContext = flowPayload . FlowContext ;
2019-08-16 09:47:57 +00:00
try
{
await executableYieldPoint . Execute ( flowContext ) ;
}
catch ( YieldPointException e )
{
// Useful for debugging
e . Data [ "Tapeti.Controller.Name" ] = context . Controller . GetType ( ) . FullName ;
e . Data [ "Tapeti.Controller.Method" ] = context . Method . Name ;
throw ;
}
2017-02-05 22:22:34 +00:00
2019-08-16 09:47:57 +00:00
flowContext . EnsureStoreOrDeleteIsCalled ( ) ;
2017-02-05 22:22:34 +00:00
}
2019-08-16 09:47:57 +00:00
finally
2017-02-05 22:22:34 +00:00
{
2019-08-16 09:47:57 +00:00
if ( disposeFlowContext )
flowContext . Dispose ( ) ;
2017-02-05 22:22:34 +00:00
}
2017-01-31 11:01:08 +00:00
}
2021-12-09 22:52:25 +00:00
/// <inheritdoc />
public IFlowParallelRequest GetParallelRequest ( IFlowHandlerContext context )
{
2021-12-10 10:45:09 +00:00
return context . MessageContext . TryGet < FlowMessageContextPayload > ( out var flowPayload )
? new ParallelRequest ( config , this , flowPayload . FlowContext )
: null ;
}
2021-12-09 22:52:25 +00:00
2021-12-10 10:45:09 +00:00
/// <inheritdoc />
public Task Converge ( IFlowHandlerContext context )
{
return Execute ( context , new DelegateYieldPoint ( flowContext = >
Converge ( flowContext , flowContext . ContinuationMetadata . ConvergeMethodName , flowContext . ContinuationMetadata . ConvergeMethodSync ) ) ) ;
2021-12-09 22:52:25 +00:00
}
2017-10-17 08:34:07 +00:00
2021-12-10 10:45:09 +00:00
internal async Task Converge ( FlowContext flowContext , string convergeMethodName , bool convergeMethodSync )
{
IYieldPoint yieldPoint ;
if ( ! flowContext . HandlerContext . MessageContext . TryGet < ControllerMessageContextPayload > ( out var controllerPayload ) )
throw new ArgumentException ( "Context does not contain a controller payload" , nameof ( flowContext ) ) ;
var method = controllerPayload . Controller . GetType ( ) . GetMethod ( convergeMethodName , BindingFlags . NonPublic | BindingFlags . Instance ) ;
if ( method = = null )
throw new ArgumentException ( $"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {convergeMethodName}" ) ;
if ( convergeMethodSync )
yieldPoint = ( IYieldPoint ) method . Invoke ( controllerPayload . Controller , new object [ ] { } ) ;
else
yieldPoint = await ( Task < IYieldPoint > ) method . Invoke ( controllerPayload . Controller , new object [ ] { } ) ;
if ( yieldPoint = = null )
throw new YieldPointException ( $"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}" ) ;
await Execute ( flowContext . HandlerContext , yieldPoint ) ;
}
2017-02-12 18:04:26 +00:00
2021-12-10 08:56:37 +00:00
private class ParallelRequestBuilder : IFlowParallelRequestBuilder
{
2017-02-12 18:04:26 +00:00
private class RequestInfo
2017-01-31 11:01:08 +00:00
{
public object Message { get ; set ; }
public ResponseHandlerInfo ResponseHandlerInfo { get ; set ; }
}
2019-08-13 18:30:04 +00:00
private readonly ITapetiConfig config ;
2021-12-10 10:45:09 +00:00
private readonly FlowProvider flowProvider ;
2017-01-31 11:01:08 +00:00
private readonly List < RequestInfo > requests = new List < RequestInfo > ( ) ;
2021-12-10 10:45:09 +00:00
public ParallelRequestBuilder ( ITapetiConfig config , FlowProvider flowProvider )
2017-01-31 11:01:08 +00:00
{
this . config = config ;
2021-12-10 10:45:09 +00:00
this . flowProvider = flowProvider ;
2017-01-31 11:01:08 +00:00
}
public IFlowParallelRequestBuilder AddRequest < TRequest , TResponse > ( TRequest message , Func < TResponse , Task > responseHandler )
{
2021-12-10 08:56:37 +00:00
return InternalAddRequest ( message , responseHandler ) ;
2017-01-31 11:01:08 +00:00
}
2021-12-09 22:52:25 +00:00
public IFlowParallelRequestBuilder AddRequest < TRequest , TResponse > ( TRequest message , Func < TResponse , IFlowParallelRequest , Task > responseHandler )
{
2021-12-10 08:56:37 +00:00
return InternalAddRequest ( message , responseHandler ) ;
2021-12-09 22:52:25 +00:00
}
2017-01-31 11:01:08 +00:00
public IFlowParallelRequestBuilder AddRequestSync < TRequest , TResponse > ( TRequest message , Action < TResponse > responseHandler )
{
2021-12-10 08:56:37 +00:00
return InternalAddRequest ( message , responseHandler ) ;
2017-01-31 11:01:08 +00:00
}
2021-12-10 08:56:37 +00:00
public IFlowParallelRequestBuilder InternalAddRequest ( object message , Delegate responseHandler )
2021-12-09 22:52:25 +00:00
{
requests . Add ( new RequestInfo
{
Message = message ,
ResponseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler )
} ) ;
return this ;
}
2021-12-10 10:45:09 +00:00
public IYieldPoint Yield ( Func < Task < IYieldPoint > > continuation , FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour . Exception )
2017-01-31 11:01:08 +00:00
{
2021-12-10 10:45:09 +00:00
return BuildYieldPoint ( continuation , false , noRequestsBehaviour ) ;
2017-01-31 11:01:08 +00:00
}
2021-12-10 10:45:09 +00:00
public IYieldPoint YieldSync ( Func < IYieldPoint > continuation , FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour . Exception )
2017-01-31 11:01:08 +00:00
{
2021-12-10 10:45:09 +00:00
return BuildYieldPoint ( continuation , true , noRequestsBehaviour ) ;
2017-01-31 11:01:08 +00:00
}
2017-02-12 18:04:26 +00:00
2021-12-10 10:45:09 +00:00
private IYieldPoint BuildYieldPoint ( Delegate convergeMethod , bool convergeMethodSync , FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour . Exception )
2017-02-12 18:04:26 +00:00
{
2019-08-15 15:45:39 +00:00
if ( requests . Count = = 0 )
2021-12-10 10:45:09 +00:00
{
switch ( noRequestsBehaviour )
{
case FlowNoRequestsBehaviour . Exception :
throw new YieldPointException ( "At least one request must be added before yielding a parallel request" ) ;
case FlowNoRequestsBehaviour . Converge :
return new DelegateYieldPoint ( context = >
flowProvider . Converge ( context , convergeMethod . Method . Name , convergeMethodSync ) ) ;
case FlowNoRequestsBehaviour . EndFlow :
return new DelegateYieldPoint ( EndFlow ) ;
default :
throw new ArgumentOutOfRangeException ( nameof ( noRequestsBehaviour ) , noRequestsBehaviour , null ) ;
}
}
2019-08-15 15:45:39 +00:00
2017-02-12 18:04:26 +00:00
if ( convergeMethod ? . Method = = null )
throw new ArgumentNullException ( nameof ( convergeMethod ) ) ;
2021-12-10 10:45:09 +00:00
return new DelegateYieldPoint ( async context = >
2017-02-12 18:04:26 +00:00
{
2019-08-15 10:04:03 +00:00
if ( convergeMethod . Method . DeclaringType ! = context . HandlerContext . Controller . GetType ( ) )
2017-02-12 18:04:26 +00:00
throw new YieldPointException ( "Converge method must be in the same controller class" ) ;
2021-12-10 10:45:09 +00:00
await Task . WhenAll ( requests . Select ( requestInfo = >
flowProvider . SendRequest (
context ,
requestInfo . Message ,
2017-02-12 18:04:26 +00:00
requestInfo . ResponseHandlerInfo ,
convergeMethod . Method . Name ,
2021-12-10 10:45:09 +00:00
convergeMethodSync ,
false ) ) ) ;
await context . Store ( requests . Any ( i = > i . ResponseHandlerInfo . IsDurableQueue ) ) ;
2017-02-12 18:04:26 +00:00
} ) ;
}
}
2017-01-31 11:01:08 +00:00
2021-12-09 22:52:25 +00:00
private class ParallelRequest : IFlowParallelRequest
{
private readonly ITapetiConfig config ;
2021-12-10 10:45:09 +00:00
private readonly FlowProvider flowProvider ;
2021-12-10 08:56:37 +00:00
private readonly FlowContext flowContext ;
2021-12-09 22:52:25 +00:00
2021-12-10 10:45:09 +00:00
public ParallelRequest ( ITapetiConfig config , FlowProvider flowProvider , FlowContext flowContext )
2021-12-09 22:52:25 +00:00
{
this . config = config ;
2021-12-10 10:45:09 +00:00
this . flowProvider = flowProvider ;
2021-12-10 08:56:37 +00:00
this . flowContext = flowContext ;
2021-12-09 22:52:25 +00:00
}
2021-12-10 08:56:37 +00:00
public Task AddRequest < TRequest , TResponse > ( TRequest message , Func < TResponse , Task > responseHandler )
2021-12-09 22:52:25 +00:00
{
2021-12-10 08:56:37 +00:00
return InternalAddRequest ( message , responseHandler ) ;
2021-12-09 22:52:25 +00:00
}
2021-12-10 08:56:37 +00:00
public Task AddRequest < TRequest , TResponse > ( TRequest message , Func < TResponse , IFlowParallelRequest , Task > responseHandler )
2021-12-09 22:52:25 +00:00
{
2021-12-10 08:56:37 +00:00
return InternalAddRequest ( message , responseHandler ) ;
2021-12-09 22:52:25 +00:00
}
2021-12-10 08:56:37 +00:00
public Task AddRequestSync < TRequest , TResponse > ( TRequest message , Action < TResponse > responseHandler )
2021-12-09 22:52:25 +00:00
{
2021-12-10 08:56:37 +00:00
return InternalAddRequest ( message , responseHandler ) ;
2021-12-09 22:52:25 +00:00
}
2021-12-10 08:56:37 +00:00
private Task InternalAddRequest ( object message , Delegate responseHandler )
2021-12-09 22:52:25 +00:00
{
2021-12-10 08:56:37 +00:00
var responseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler ) ;
2021-12-10 10:45:09 +00:00
return flowProvider . SendRequest (
flowContext ,
message ,
responseHandlerInfo ,
flowContext . ContinuationMetadata . ConvergeMethodName ,
flowContext . ContinuationMetadata . ConvergeMethodSync ,
false ) ;
2021-12-09 22:52:25 +00:00
}
}
2017-01-31 11:01:08 +00:00
internal class ResponseHandlerInfo
{
public string MethodName { get ; set ; }
public string ReplyToQueue { get ; set ; }
2019-08-19 07:33:07 +00:00
public bool IsDurableQueue { get ; set ; }
2017-01-31 11:01:08 +00:00
}
}
}