2020-02-12 10:34:51 +00:00
using System ;
using System.Linq.Expressions ;
using System.Reflection ;
using System.Threading.Tasks ;
using Tapeti.Annotations ;
using Tapeti.Config ;
using Tapeti.Default ;
using Tapeti.Helpers ;
namespace Tapeti.Connection
{
/// <inheritdoc />
internal class TapetiPublisher : IInternalPublisher
{
private readonly ITapetiConfig config ;
private readonly Func < ITapetiClient > clientFactory ;
private readonly IExchangeStrategy exchangeStrategy ;
private readonly IRoutingKeyStrategy routingKeyStrategy ;
private readonly IMessageSerializer messageSerializer ;
public TapetiPublisher ( ITapetiConfig config , Func < ITapetiClient > clientFactory )
{
this . config = config ;
this . clientFactory = clientFactory ;
exchangeStrategy = config . DependencyResolver . Resolve < IExchangeStrategy > ( ) ;
routingKeyStrategy = config . DependencyResolver . Resolve < IRoutingKeyStrategy > ( ) ;
messageSerializer = config . DependencyResolver . Resolve < IMessageSerializer > ( ) ;
}
/// <inheritdoc />
public async Task Publish ( object message )
{
await Publish ( message , null , IsMandatory ( message ) ) ;
}
/// <inheritdoc />
public async Task PublishRequest < TController , TRequest , TResponse > ( TRequest message , Expression < Func < TController , Action < TResponse > > > responseMethodSelector ) where TController : class
{
await PublishRequest ( message , responseMethodSelector . Body ) ;
}
/// <inheritdoc />
public async Task PublishRequest < TController , TRequest , TResponse > ( TRequest message , Expression < Func < TController , Func < TResponse , Task > > > responseMethodSelector ) where TController : class
{
await PublishRequest ( message , responseMethodSelector . Body ) ;
}
private async Task PublishRequest ( object message , Expression responseMethodBody )
{
var callExpression = ( responseMethodBody as UnaryExpression ) ? . Operand as MethodCallExpression ;
var targetMethodExpression = callExpression ? . Object as ConstantExpression ;
var responseHandler = targetMethodExpression ? . Value as MethodInfo ;
if ( responseHandler = = null )
throw new ArgumentException ( "Unable to determine the response method" , nameof ( responseMethodBody ) ) ;
var requestAttribute = message . GetType ( ) . GetCustomAttribute < RequestAttribute > ( ) ;
if ( requestAttribute ? . Response = = null )
throw new ArgumentException ( $"Request message {message.GetType().Name} must be marked with the Request attribute and a valid Response type" , nameof ( message ) ) ;
var binding = config . Bindings . ForMethod ( responseHandler ) ;
if ( binding = = null )
throw new ArgumentException ( "responseHandler must be a registered message handler" , nameof ( responseHandler ) ) ;
if ( ! binding . Accept ( requestAttribute . Response ) )
throw new ArgumentException ( $"responseHandler must accept message of type {requestAttribute.Response}" , nameof ( responseHandler ) ) ;
var responseHandleAttribute = binding . Method . GetCustomAttribute < ResponseHandlerAttribute > ( ) ;
if ( responseHandleAttribute = = null )
throw new ArgumentException ( "responseHandler must be marked with the ResponseHandler attribute" , nameof ( responseHandler ) ) ;
if ( binding . QueueName = = null )
throw new ArgumentException ( "responseHandler is not yet subscribed to a queue, TapetiConnection.Subscribe must be called before starting a request" , nameof ( responseHandler ) ) ;
var properties = new MessageProperties
{
ReplyTo = binding . QueueName
} ;
await Publish ( message , properties , IsMandatory ( message ) ) ;
}
/// <inheritdoc />
public async Task SendToQueue ( string queueName , object message )
{
await PublishDirect ( message , queueName , null , IsMandatory ( message ) ) ;
}
/// <inheritdoc />
public async Task Publish ( object message , IMessageProperties properties , bool mandatory )
{
var messageClass = message . GetType ( ) ;
var exchange = exchangeStrategy . GetExchange ( messageClass ) ;
var routingKey = routingKeyStrategy . GetRoutingKey ( messageClass ) ;
await Publish ( message , properties , exchange , routingKey , mandatory ) ;
}
/// <inheritdoc />
public async Task PublishDirect ( object message , string queueName , IMessageProperties properties , bool mandatory )
{
await Publish ( message , properties , null , queueName , mandatory ) ;
}
private async Task Publish ( object message , IMessageProperties properties , string exchange , string routingKey , bool mandatory )
{
var writableProperties = new MessageProperties ( properties ) ;
2021-07-18 11:27:10 +00:00
writableProperties . Timestamp ? ? = DateTime . UtcNow ;
2020-02-12 10:34:51 +00:00
writableProperties . Persistent = true ;
var context = new PublishContext
{
Config = config ,
Exchange = exchange ,
RoutingKey = routingKey ,
Message = message ,
Properties = writableProperties
} ;
await MiddlewareHelper . GoAsync (
config . Middleware . Publish ,
async ( handler , next ) = > await handler . Handle ( context , next ) ,
async ( ) = >
{
var body = messageSerializer . Serialize ( message , writableProperties ) ;
await clientFactory ( ) . Publish ( body , writableProperties , exchange , routingKey , mandatory ) ;
} ) ;
}
private static bool IsMandatory ( object message )
{
return message . GetType ( ) . GetCustomAttribute < MandatoryAttribute > ( ) ! = null ;
}
private class PublishContext : IPublishContext
{
public ITapetiConfig Config { get ; set ; }
public string Exchange { get ; set ; }
public string RoutingKey { get ; set ; }
public object Message { get ; set ; }
public IMessageProperties Properties { get ; set ; }
}
}
}