2017-01-31 11:01:08 +00:00
using System ;
2017-02-05 22:22:34 +00:00
using System.Collections.Generic ;
2018-12-19 19:50:56 +00:00
using System.Diagnostics ;
2017-02-12 18:04:26 +00:00
using System.Linq ;
2017-01-31 11:01:08 +00:00
using System.Reflection ;
using System.Threading.Tasks ;
2018-12-19 19:50:56 +00:00
using Tapeti.Annotations ;
2017-01-31 11:01:08 +00:00
using Tapeti.Config ;
2019-08-13 18:30:04 +00:00
using Tapeti.Default ;
2017-01-31 11:01:08 +00:00
using Tapeti.Flow.Annotations ;
using Tapeti.Flow.FlowHelpers ;
namespace Tapeti.Flow.Default
{
2019-08-14 18:48:40 +00:00
/// <inheritdoc cref="IFlowProvider"/> />
/// <summary>
/// Default implementation for IFlowProvider.
/// </summary>
2017-01-31 11:01:08 +00:00
public class FlowProvider : IFlowProvider , IFlowHandler
{
2019-08-13 18:30:04 +00:00
private readonly ITapetiConfig config ;
2017-02-05 22:22:34 +00:00
private readonly IInternalPublisher publisher ;
2017-01-31 11:01:08 +00:00
2019-08-15 09:26:55 +00:00
/// <inheritdoc />
2019-08-13 18:30:04 +00:00
public FlowProvider ( ITapetiConfig config , IPublisher publisher )
2017-01-31 11:01:08 +00:00
{
this . config = config ;
2017-02-05 22:22:34 +00:00
this . publisher = ( IInternalPublisher ) publisher ;
2017-01-31 11:01:08 +00:00
}
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2017-01-31 11:01:08 +00:00
public IYieldPoint YieldWithRequest < TRequest , TResponse > ( TRequest message , Func < TResponse , Task < IYieldPoint > > responseHandler )
{
var responseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler ) ;
2017-10-17 08:34:07 +00:00
return new DelegateYieldPoint ( context = > SendRequest ( context , message , responseHandlerInfo ) ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2017-01-31 11:01:08 +00:00
public IYieldPoint YieldWithRequestSync < TRequest , TResponse > ( TRequest message , Func < TResponse , IYieldPoint > responseHandler )
{
var responseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler ) ;
2017-10-17 08:34:07 +00:00
return new DelegateYieldPoint ( context = > SendRequest ( context , message , responseHandlerInfo ) ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2017-01-31 11:01:08 +00:00
public IFlowParallelRequestBuilder YieldWithParallelRequest ( )
{
2017-02-12 18:04:26 +00:00
return new ParallelRequestBuilder ( config , SendRequest ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2017-01-31 11:01:08 +00:00
public IYieldPoint EndWithResponse < TResponse > ( TResponse message )
{
2017-10-17 08:34:07 +00:00
return new DelegateYieldPoint ( context = > SendResponse ( context , message ) ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2017-01-31 11:01:08 +00:00
public IYieldPoint End ( )
{
2017-10-17 08:34:07 +00:00
return new DelegateYieldPoint ( EndFlow ) ;
2017-01-31 11:01:08 +00:00
}
2017-02-12 18:04:26 +00:00
private async Task SendRequest ( FlowContext context , object message , ResponseHandlerInfo responseHandlerInfo ,
string convergeMethodName = null , bool convergeMethodTaskSync = false )
2017-01-31 11:01:08 +00:00
{
2017-10-17 08:34:07 +00:00
if ( context . FlowState = = null )
{
await CreateNewFlowState ( context ) ;
2018-12-19 20:40:53 +00:00
Debug . Assert ( context . FlowState ! = null , "context.FlowState != null" ) ;
2017-10-17 08:34:07 +00:00
}
2017-01-31 11:01:08 +00:00
var continuationID = Guid . NewGuid ( ) ;
context . FlowState . Continuations . Add ( continuationID ,
2017-02-05 22:22:34 +00:00
new ContinuationMetadata
2017-01-31 11:01:08 +00:00
{
MethodName = responseHandlerInfo . MethodName ,
2017-02-12 18:04:26 +00:00
ConvergeMethodName = convergeMethodName ,
ConvergeMethodSync = convergeMethodTaskSync
2017-02-05 22:22:34 +00:00
} ) ;
2017-01-31 11:01:08 +00:00
2019-08-13 18:30:04 +00:00
var properties = new MessageProperties
2017-01-31 11:01:08 +00:00
{
CorrelationId = continuationID . ToString ( ) ,
ReplyTo = responseHandlerInfo . ReplyToQueue
} ;
2017-10-17 08:34:07 +00:00
await context . Store ( ) ;
2019-01-24 21:52:21 +00:00
await publisher . Publish ( message , properties , true ) ;
2017-01-31 11:01:08 +00:00
}
private async Task SendResponse ( FlowContext context , object message )
{
2017-10-17 08:34:07 +00:00
var reply = context . FlowState = = null
2019-08-15 10:04:03 +00:00
? GetReply ( context . HandlerContext )
2017-10-17 08:34:07 +00:00
: context . FlowState . Metadata . Reply ;
2017-02-05 22:22:34 +00:00
if ( reply = = null )
throw new YieldPointException ( "No response is required" ) ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
if ( message . GetType ( ) . FullName ! = reply . ResponseTypeName )
throw new YieldPointException ( $"Flow must end with a response message of type {reply.ResponseTypeName}, {message.GetType().FullName} was returned instead" ) ;
2017-01-31 11:01:08 +00:00
2019-08-13 18:30:04 +00:00
var properties = new MessageProperties
{
CorrelationId = reply . CorrelationId
} ;
2017-02-05 22:22:34 +00:00
// TODO disallow if replyto is not specified?
2017-10-17 08:34:07 +00:00
if ( reply . ReplyTo ! = null )
2019-04-24 16:04:30 +00:00
await publisher . PublishDirect ( message , reply . ReplyTo , properties , reply . Mandatory ) ;
2017-02-05 22:22:34 +00:00
else
2019-04-24 16:04:30 +00:00
await publisher . Publish ( message , properties , reply . Mandatory ) ;
2017-10-17 08:34:07 +00:00
await context . Delete ( ) ;
2017-01-31 11:01:08 +00:00
}
2017-10-17 08:34:07 +00:00
private static async Task EndFlow ( FlowContext context )
2017-01-31 11:01:08 +00:00
{
2017-10-17 08:34:07 +00:00
await context . Delete ( ) ;
2017-01-31 11:01:08 +00:00
2018-12-19 19:50:56 +00:00
if ( context . FlowState ? . Metadata . Reply ! = null )
2017-10-17 08:34:07 +00:00
throw new YieldPointException ( $"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}" ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-13 18:30:04 +00:00
private static ResponseHandlerInfo GetResponseHandlerInfo ( ITapetiConfig config , object request , Delegate responseHandler )
2017-01-31 11:01:08 +00:00
{
2019-08-15 15:56:38 +00:00
var requestAttribute = request . GetType ( ) . GetCustomAttribute < RequestAttribute > ( ) ;
if ( requestAttribute ? . Response = = null )
throw new ArgumentException ( $"Request message {request.GetType().Name} must be marked with the Request attribute and a valid Response type" , nameof ( request ) ) ;
2019-08-13 18:30:04 +00:00
var binding = config . Bindings . ForMethod ( responseHandler ) ;
2017-01-31 11:01:08 +00:00
if ( binding = = null )
throw new ArgumentException ( "responseHandler must be a registered message handler" , nameof ( responseHandler ) ) ;
2019-08-15 15:56:38 +00:00
if ( ! binding . Accept ( requestAttribute . Response ) )
2019-04-24 16:04:30 +00:00
throw new ArgumentException ( $"responseHandler must accept message of type {requestAttribute.Response}" , nameof ( responseHandler ) ) ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
var continuationAttribute = binding . Method . GetCustomAttribute < ContinuationAttribute > ( ) ;
if ( continuationAttribute = = null )
2017-02-15 21:05:01 +00:00
throw new ArgumentException ( "responseHandler must be marked with the Continuation attribute" , nameof ( responseHandler ) ) ;
if ( binding . QueueName = = null )
2019-08-15 15:56:38 +00:00
throw new ArgumentException ( "responseHandler is not yet subscribed to a queue, TapetiConnection.Subscribe must be called before starting a flow" , nameof ( responseHandler ) ) ;
2017-02-05 22:22:34 +00:00
2017-01-31 11:01:08 +00:00
return new ResponseHandlerInfo
{
MethodName = MethodSerializer . Serialize ( responseHandler . Method ) ,
ReplyToQueue = binding . QueueName
} ;
}
2019-08-15 10:04:03 +00:00
private static ReplyMetadata GetReply ( IFlowHandlerContext context )
2017-01-31 11:01:08 +00:00
{
2019-08-15 10:04:03 +00:00
var requestAttribute = context . ControllerMessageContext ? . Message ? . GetType ( ) . GetCustomAttribute < RequestAttribute > ( ) ;
2017-02-05 22:22:34 +00:00
if ( requestAttribute ? . Response = = null )
return null ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
return new ReplyMetadata
{
2019-08-15 10:04:03 +00:00
CorrelationId = context . ControllerMessageContext . Properties . CorrelationId ,
ReplyTo = context . ControllerMessageContext . Properties . ReplyTo ,
2019-04-24 16:04:30 +00:00
ResponseTypeName = requestAttribute . Response . FullName ,
2019-08-15 10:04:03 +00:00
Mandatory = context . ControllerMessageContext . Properties . Persistent . GetValueOrDefault ( true )
2017-02-05 22:22:34 +00:00
} ;
}
2019-08-13 18:30:04 +00:00
private static async Task CreateNewFlowState ( FlowContext flowContext )
2017-10-17 08:34:07 +00:00
{
2019-08-15 10:04:03 +00:00
var flowStore = flowContext . HandlerContext . Config . DependencyResolver . Resolve < IFlowStore > ( ) ;
2017-10-17 08:34:07 +00:00
var flowID = Guid . NewGuid ( ) ;
flowContext . FlowStateLock = await flowStore . LockFlowState ( flowID ) ;
if ( flowContext . FlowStateLock = = null )
throw new InvalidOperationException ( "Unable to lock a new flow" ) ;
flowContext . FlowState = new FlowState
{
Metadata = new FlowMetadata
{
2019-08-15 10:04:03 +00:00
Reply = GetReply ( flowContext . HandlerContext )
2017-10-17 08:34:07 +00:00
}
} ;
}
2017-02-05 22:22:34 +00:00
2019-08-15 10:04:03 +00:00
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
2019-08-15 10:04:03 +00:00
public async Task Execute ( IFlowHandlerContext context , IYieldPoint yieldPoint )
2017-02-05 22:22:34 +00:00
{
2018-12-19 19:50:56 +00:00
if ( ! ( yieldPoint is DelegateYieldPoint executableYieldPoint ) )
2019-08-15 10:04:03 +00:00
throw new YieldPointException ( $"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Method.Name}" ) ;
2017-02-05 22:22:34 +00:00
2019-08-15 10:04:03 +00:00
var messageContext = context . ControllerMessageContext ;
if ( messageContext = = null | | ! messageContext . Get ( ContextItems . FlowContext , out FlowContext flowContext ) )
2017-02-05 22:22:34 +00:00
{
flowContext = new FlowContext
{
2019-08-15 10:04:03 +00:00
HandlerContext = context
2017-02-05 22:22:34 +00:00
} ;
2019-08-15 10:04:03 +00:00
messageContext ? . Store ( ContextItems . FlowContext , flowContext ) ;
2017-02-05 22:22:34 +00:00
}
try
{
2018-12-19 19:50:56 +00:00
await executableYieldPoint . Execute ( flowContext ) ;
2017-02-05 22:22:34 +00:00
}
catch ( YieldPointException e )
{
2017-10-17 08:34:07 +00:00
// Useful for debugging
e . Data [ "Tapeti.Controller.Name" ] = context . Controller . GetType ( ) . FullName ;
2019-08-15 10:04:03 +00:00
e . Data [ "Tapeti.Controller.Method" ] = context . Method . Name ;
2017-10-17 08:34:07 +00:00
throw ;
2017-02-05 22:22:34 +00:00
}
2017-01-31 11:01:08 +00:00
2017-10-17 08:34:07 +00:00
flowContext . EnsureStoreOrDeleteIsCalled ( ) ;
2017-01-31 11:01:08 +00:00
}
2017-10-17 08:34:07 +00:00
2017-01-31 11:01:08 +00:00
private class ParallelRequestBuilder : IFlowParallelRequestBuilder
{
2017-02-12 18:04:26 +00:00
public delegate Task SendRequestFunc ( FlowContext context ,
object message ,
ResponseHandlerInfo responseHandlerInfo ,
string convergeMethodName ,
bool convergeMethodSync ) ;
private class RequestInfo
2017-01-31 11:01:08 +00:00
{
public object Message { get ; set ; }
public ResponseHandlerInfo ResponseHandlerInfo { get ; set ; }
}
2019-08-13 18:30:04 +00:00
private readonly ITapetiConfig config ;
2017-02-12 18:04:26 +00:00
private readonly SendRequestFunc sendRequest ;
2017-01-31 11:01:08 +00:00
private readonly List < RequestInfo > requests = new List < RequestInfo > ( ) ;
2019-08-13 18:30:04 +00:00
public ParallelRequestBuilder ( ITapetiConfig config , SendRequestFunc sendRequest )
2017-01-31 11:01:08 +00:00
{
this . config = config ;
this . sendRequest = sendRequest ;
}
public IFlowParallelRequestBuilder AddRequest < TRequest , TResponse > ( TRequest message , Func < TResponse , Task > responseHandler )
{
requests . Add ( new RequestInfo
{
Message = message ,
ResponseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler )
} ) ;
return this ;
}
public IFlowParallelRequestBuilder AddRequestSync < TRequest , TResponse > ( TRequest message , Action < TResponse > responseHandler )
{
requests . Add ( new RequestInfo
{
Message = message ,
ResponseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler )
} ) ;
return this ;
}
public IYieldPoint Yield ( Func < Task < IYieldPoint > > continuation )
{
2017-02-12 18:04:26 +00:00
return BuildYieldPoint ( continuation , false ) ;
2017-01-31 11:01:08 +00:00
}
2017-02-12 18:04:26 +00:00
public IYieldPoint YieldSync ( Func < IYieldPoint > continuation )
2017-01-31 11:01:08 +00:00
{
2017-02-12 18:04:26 +00:00
return BuildYieldPoint ( continuation , true ) ;
2017-01-31 11:01:08 +00:00
}
2017-02-12 18:04:26 +00:00
private IYieldPoint BuildYieldPoint ( Delegate convergeMethod , bool convergeMethodSync )
{
2019-08-15 15:45:39 +00:00
if ( requests . Count = = 0 )
throw new YieldPointException ( "At least one request must be added before yielding a parallel request" ) ;
2017-02-12 18:04:26 +00:00
if ( convergeMethod ? . Method = = null )
throw new ArgumentNullException ( nameof ( convergeMethod ) ) ;
2017-10-17 08:34:07 +00:00
return new DelegateYieldPoint ( context = >
2017-02-12 18:04:26 +00:00
{
2019-08-15 10:04:03 +00:00
if ( convergeMethod . Method . DeclaringType ! = context . HandlerContext . Controller . GetType ( ) )
2017-02-12 18:04:26 +00:00
throw new YieldPointException ( "Converge method must be in the same controller class" ) ;
return Task . WhenAll ( requests . Select ( requestInfo = >
sendRequest ( context , requestInfo . Message ,
requestInfo . ResponseHandlerInfo ,
convergeMethod . Method . Name ,
convergeMethodSync ) ) ) ;
} ) ;
}
}
2017-01-31 11:01:08 +00:00
internal class ResponseHandlerInfo
{
public string MethodName { get ; set ; }
public string ReplyToQueue { get ; set ; }
}
}
}