2017-01-31 11:01:08 +00:00
using System ;
2017-02-05 22:22:34 +00:00
using System.Collections.Generic ;
2017-02-12 18:04:26 +00:00
using System.Linq ;
2017-01-31 11:01:08 +00:00
using System.Reflection ;
using System.Threading.Tasks ;
using RabbitMQ.Client.Framing ;
using Tapeti.Config ;
using Tapeti.Flow.Annotations ;
using Tapeti.Flow.FlowHelpers ;
namespace Tapeti.Flow.Default
{
public class FlowProvider : IFlowProvider , IFlowHandler
{
private readonly IConfig config ;
2017-02-05 22:22:34 +00:00
private readonly IInternalPublisher publisher ;
2017-01-31 11:01:08 +00:00
public FlowProvider ( IConfig config , IPublisher publisher )
{
this . config = config ;
2017-02-05 22:22:34 +00:00
this . publisher = ( IInternalPublisher ) publisher ;
2017-01-31 11:01:08 +00:00
}
public IYieldPoint YieldWithRequest < TRequest , TResponse > ( TRequest message , Func < TResponse , Task < IYieldPoint > > responseHandler )
{
var responseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler ) ;
return new DelegateYieldPoint ( true , context = > SendRequest ( context , message , responseHandlerInfo ) ) ;
}
public IYieldPoint YieldWithRequestSync < TRequest , TResponse > ( TRequest message , Func < TResponse , IYieldPoint > responseHandler )
{
var responseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler ) ;
return new DelegateYieldPoint ( true , context = > SendRequest ( context , message , responseHandlerInfo ) ) ;
}
public IFlowParallelRequestBuilder YieldWithParallelRequest ( )
{
2017-02-12 18:04:26 +00:00
return new ParallelRequestBuilder ( config , SendRequest ) ;
2017-01-31 11:01:08 +00:00
}
public IYieldPoint EndWithResponse < TResponse > ( TResponse message )
{
return new DelegateYieldPoint ( false , context = > SendResponse ( context , message ) ) ;
}
public IYieldPoint End ( )
{
return new DelegateYieldPoint ( false , EndFlow ) ;
}
2017-02-12 18:04:26 +00:00
private async Task SendRequest ( FlowContext context , object message , ResponseHandlerInfo responseHandlerInfo ,
string convergeMethodName = null , bool convergeMethodTaskSync = false )
2017-01-31 11:01:08 +00:00
{
var continuationID = Guid . NewGuid ( ) ;
context . FlowState . Continuations . Add ( continuationID ,
2017-02-05 22:22:34 +00:00
new ContinuationMetadata
2017-01-31 11:01:08 +00:00
{
MethodName = responseHandlerInfo . MethodName ,
2017-02-12 18:04:26 +00:00
ConvergeMethodName = convergeMethodName ,
ConvergeMethodSync = convergeMethodTaskSync
2017-02-05 22:22:34 +00:00
} ) ;
2017-01-31 11:01:08 +00:00
var properties = new BasicProperties
{
CorrelationId = continuationID . ToString ( ) ,
ReplyTo = responseHandlerInfo . ReplyToQueue
} ;
await publisher . Publish ( message , properties ) ;
}
private async Task SendResponse ( FlowContext context , object message )
{
2017-02-05 22:22:34 +00:00
var reply = context . FlowState . Metadata . Reply ;
if ( reply = = null )
throw new YieldPointException ( "No response is required" ) ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
if ( message . GetType ( ) . FullName ! = reply . ResponseTypeName )
throw new YieldPointException ( $"Flow must end with a response message of type {reply.ResponseTypeName}, {message.GetType().FullName} was returned instead" ) ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
var properties = new BasicProperties ( ) ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
// Only set the property if it's not null, otherwise a string reference exception can occur:
// http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html
if ( reply . CorrelationId ! = null )
properties . CorrelationId = reply . CorrelationId ;
// TODO disallow if replyto is not specified?
if ( context . FlowState . Metadata . Reply . ReplyTo ! = null )
await publisher . PublishDirect ( message , reply . ReplyTo , properties ) ;
else
await publisher . Publish ( message , properties ) ;
2017-01-31 11:01:08 +00:00
}
private static Task EndFlow ( FlowContext context )
{
2017-02-05 22:22:34 +00:00
if ( context . FlowState . Metadata . Reply ! = null )
throw new YieldPointException ( $"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}" ) ;
2017-01-31 11:01:08 +00:00
return Task . CompletedTask ;
}
private static ResponseHandlerInfo GetResponseHandlerInfo ( IConfig config , object request , Delegate responseHandler )
{
var binding = config . GetBinding ( responseHandler ) ;
if ( binding = = null )
throw new ArgumentException ( "responseHandler must be a registered message handler" , nameof ( responseHandler ) ) ;
var requestAttribute = request . GetType ( ) . GetCustomAttribute < RequestAttribute > ( ) ;
if ( requestAttribute ? . Response ! = null & & requestAttribute . Response ! = binding . MessageClass )
throw new ArgumentException ( $"responseHandler must accept message of type {binding.MessageClass}" , nameof ( responseHandler ) ) ;
2017-02-05 22:22:34 +00:00
var continuationAttribute = binding . Method . GetCustomAttribute < ContinuationAttribute > ( ) ;
if ( continuationAttribute = = null )
throw new ArgumentException ( $"responseHandler must be marked with the Continuation attribute" , nameof ( responseHandler ) ) ;
2017-01-31 11:01:08 +00:00
return new ResponseHandlerInfo
{
MethodName = MethodSerializer . Serialize ( responseHandler . Method ) ,
ReplyToQueue = binding . QueueName
} ;
}
2017-02-05 22:22:34 +00:00
private static ReplyMetadata GetReply ( IMessageContext context )
2017-01-31 11:01:08 +00:00
{
2017-02-05 22:22:34 +00:00
var requestAttribute = context . Message . GetType ( ) . GetCustomAttribute < RequestAttribute > ( ) ;
if ( requestAttribute ? . Response = = null )
return null ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
return new ReplyMetadata
{
CorrelationId = context . Properties . CorrelationId ,
ReplyTo = context . Properties . ReplyTo ,
ResponseTypeName = requestAttribute . Response . FullName
} ;
}
public async Task Execute ( IMessageContext context , IYieldPoint yieldPoint )
{
2017-02-12 18:04:26 +00:00
var stateYieldPoint = yieldPoint as IStateYieldPoint ;
var executableYieldPoint = yieldPoint as IExecutableYieldPoint ;
var storeState = stateYieldPoint ? . StoreState ? ? false ;
2017-02-05 22:22:34 +00:00
FlowContext flowContext ;
object flowContextItem ;
if ( ! context . Items . TryGetValue ( ContextItems . FlowContext , out flowContextItem ) )
{
flowContext = new FlowContext
{
MessageContext = context
} ;
if ( storeState )
{
// Initiate the flow
var flowStore = context . DependencyResolver . Resolve < IFlowStore > ( ) ;
var flowID = Guid . NewGuid ( ) ;
flowContext . FlowStateLock = await flowStore . LockFlowState ( flowID ) ;
if ( flowContext . FlowStateLock = = null )
throw new InvalidOperationException ( "Unable to lock a new flow" ) ;
flowContext . FlowState = await flowContext . FlowStateLock . GetFlowState ( ) ;
if ( flowContext . FlowState = = null )
throw new InvalidOperationException ( "Unable to get state for new flow" ) ;
flowContext . FlowState . Metadata . Reply = GetReply ( context ) ;
}
}
else
flowContext = ( FlowContext ) flowContextItem ;
try
{
2017-02-12 18:04:26 +00:00
if ( executableYieldPoint ! = null )
await executableYieldPoint . Execute ( flowContext ) ;
2017-02-05 22:22:34 +00:00
}
catch ( YieldPointException e )
{
var controllerName = flowContext . MessageContext . Controller . GetType ( ) . FullName ;
var methodName = flowContext . MessageContext . Binding . Method . Name ;
throw new YieldPointException ( $"{e.Message} in controller {controllerName}, method {methodName}" , e ) ;
}
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
if ( storeState )
2017-01-31 11:01:08 +00:00
{
flowContext . FlowState . Data = Newtonsoft . Json . JsonConvert . SerializeObject ( context . Controller ) ;
await flowContext . FlowStateLock . StoreFlowState ( flowContext . FlowState ) ;
}
else
{
await flowContext . FlowStateLock . DeleteFlowState ( ) ;
}
}
private class ParallelRequestBuilder : IFlowParallelRequestBuilder
{
2017-02-12 18:04:26 +00:00
public delegate Task SendRequestFunc ( FlowContext context ,
object message ,
ResponseHandlerInfo responseHandlerInfo ,
string convergeMethodName ,
bool convergeMethodSync ) ;
private class RequestInfo
2017-01-31 11:01:08 +00:00
{
public object Message { get ; set ; }
public ResponseHandlerInfo ResponseHandlerInfo { get ; set ; }
}
private readonly IConfig config ;
2017-02-12 18:04:26 +00:00
private readonly SendRequestFunc sendRequest ;
2017-01-31 11:01:08 +00:00
private readonly List < RequestInfo > requests = new List < RequestInfo > ( ) ;
2017-02-12 18:04:26 +00:00
public ParallelRequestBuilder ( IConfig config , SendRequestFunc sendRequest )
2017-01-31 11:01:08 +00:00
{
this . config = config ;
this . sendRequest = sendRequest ;
}
public IFlowParallelRequestBuilder AddRequest < TRequest , TResponse > ( TRequest message , Func < TResponse , Task > responseHandler )
{
requests . Add ( new RequestInfo
{
Message = message ,
ResponseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler )
} ) ;
return this ;
}
public IFlowParallelRequestBuilder AddRequestSync < TRequest , TResponse > ( TRequest message , Action < TResponse > responseHandler )
{
requests . Add ( new RequestInfo
{
Message = message ,
ResponseHandlerInfo = GetResponseHandlerInfo ( config , message , responseHandler )
} ) ;
return this ;
}
public IYieldPoint Yield ( Func < Task < IYieldPoint > > continuation )
{
2017-02-12 18:04:26 +00:00
return BuildYieldPoint ( continuation , false ) ;
2017-01-31 11:01:08 +00:00
}
2017-02-12 18:04:26 +00:00
public IYieldPoint YieldSync ( Func < IYieldPoint > continuation )
2017-01-31 11:01:08 +00:00
{
2017-02-12 18:04:26 +00:00
return BuildYieldPoint ( continuation , true ) ;
2017-01-31 11:01:08 +00:00
}
2017-02-12 18:04:26 +00:00
private IYieldPoint BuildYieldPoint ( Delegate convergeMethod , bool convergeMethodSync )
{
if ( convergeMethod ? . Method = = null )
throw new ArgumentNullException ( nameof ( convergeMethod ) ) ;
return new DelegateYieldPoint ( true , context = >
{
if ( convergeMethod . Method . DeclaringType ! = context . MessageContext . Controller . GetType ( ) )
throw new YieldPointException ( "Converge method must be in the same controller class" ) ;
return Task . WhenAll ( requests . Select ( requestInfo = >
sendRequest ( context , requestInfo . Message ,
requestInfo . ResponseHandlerInfo ,
convergeMethod . Method . Name ,
convergeMethodSync ) ) ) ;
} ) ;
}
}
2017-01-31 11:01:08 +00:00
internal class ResponseHandlerInfo
{
public string MethodName { get ; set ; }
public string ReplyToQueue { get ; set ; }
}
}
}