162 lines
6.5 KiB
C#
162 lines
6.5 KiB
C#
using System;
|
|
using System.Linq.Expressions;
|
|
using System.Reflection;
|
|
using System.Threading.Tasks;
|
|
using Tapeti.Annotations;
|
|
using Tapeti.Config;
|
|
using Tapeti.Default;
|
|
using Tapeti.Helpers;
|
|
|
|
namespace Tapeti.Connection
|
|
{
|
|
/// <inheritdoc />
|
|
internal class TapetiPublisher : IInternalPublisher
|
|
{
|
|
private readonly ITapetiConfig config;
|
|
private readonly Func<ITapetiClient> clientFactory;
|
|
private readonly IExchangeStrategy exchangeStrategy;
|
|
private readonly IRoutingKeyStrategy routingKeyStrategy;
|
|
private readonly IMessageSerializer messageSerializer;
|
|
|
|
|
|
public TapetiPublisher(ITapetiConfig config, Func<ITapetiClient> clientFactory)
|
|
{
|
|
this.config = config;
|
|
this.clientFactory = clientFactory;
|
|
|
|
exchangeStrategy = config.DependencyResolver.Resolve<IExchangeStrategy>();
|
|
routingKeyStrategy = config.DependencyResolver.Resolve<IRoutingKeyStrategy>();
|
|
messageSerializer = config.DependencyResolver.Resolve<IMessageSerializer>();
|
|
}
|
|
|
|
|
|
/// <inheritdoc />
|
|
public async Task Publish(object message)
|
|
{
|
|
await Publish(message, null, IsMandatory(message));
|
|
}
|
|
|
|
|
|
/// <inheritdoc />
|
|
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Action<TResponse>>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class
|
|
{
|
|
await PublishRequest(message, responseMethodSelector.Body);
|
|
}
|
|
|
|
|
|
/// <inheritdoc />
|
|
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Func<TResponse, Task>>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class
|
|
{
|
|
await PublishRequest(message, responseMethodSelector.Body);
|
|
}
|
|
|
|
|
|
private async Task PublishRequest(object message, Expression responseMethodBody)
|
|
{
|
|
var callExpression = (responseMethodBody as UnaryExpression)?.Operand as MethodCallExpression;
|
|
var targetMethodExpression = callExpression?.Object as ConstantExpression;
|
|
|
|
var responseHandler = targetMethodExpression?.Value as MethodInfo;
|
|
if (responseHandler == null)
|
|
throw new ArgumentException("Unable to determine the response method", nameof(responseMethodBody));
|
|
|
|
|
|
var requestAttribute = message.GetType().GetCustomAttribute<RequestAttribute>();
|
|
if (requestAttribute?.Response == null)
|
|
throw new ArgumentException($"Request message {message.GetType().Name} must be marked with the Request attribute and a valid Response type", nameof(message));
|
|
|
|
var binding = config.Bindings.ForMethod(responseHandler);
|
|
if (binding == null)
|
|
throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler));
|
|
|
|
if (!binding.Accept(requestAttribute.Response))
|
|
throw new ArgumentException($"responseHandler must accept message of type {requestAttribute.Response}", nameof(responseHandler));
|
|
|
|
var responseHandleAttribute = binding.Method.GetCustomAttribute<ResponseHandlerAttribute>();
|
|
if (responseHandleAttribute == null)
|
|
throw new ArgumentException("responseHandler must be marked with the ResponseHandler attribute", nameof(responseHandler));
|
|
|
|
if (binding.QueueName == null)
|
|
throw new ArgumentException("responseHandler is not yet subscribed to a queue, TapetiConnection.Subscribe must be called before starting a request", nameof(responseHandler));
|
|
|
|
|
|
var properties = new MessageProperties
|
|
{
|
|
ReplyTo = binding.QueueName
|
|
};
|
|
|
|
await Publish(message, properties, IsMandatory(message));
|
|
}
|
|
|
|
|
|
/// <inheritdoc />
|
|
public async Task SendToQueue(string queueName, object message)
|
|
{
|
|
await PublishDirect(message, queueName, null, IsMandatory(message));
|
|
}
|
|
|
|
|
|
/// <inheritdoc />
|
|
public async Task Publish(object message, IMessageProperties? properties, bool mandatory)
|
|
{
|
|
var messageClass = message.GetType();
|
|
var exchange = exchangeStrategy.GetExchange(messageClass);
|
|
var routingKey = routingKeyStrategy.GetRoutingKey(messageClass);
|
|
|
|
await Publish(message, properties, exchange, routingKey, mandatory);
|
|
}
|
|
|
|
|
|
/// <inheritdoc />
|
|
public async Task PublishDirect(object message, string queueName, IMessageProperties? properties, bool mandatory)
|
|
{
|
|
await Publish(message, properties, null, queueName, mandatory);
|
|
}
|
|
|
|
|
|
private async Task Publish(object message, IMessageProperties? properties, string? exchange, string routingKey, bool mandatory)
|
|
{
|
|
var writableProperties = new MessageProperties(properties);
|
|
|
|
writableProperties.Timestamp ??= DateTime.UtcNow;
|
|
writableProperties.Persistent = true;
|
|
|
|
|
|
var context = new PublishContext
|
|
{
|
|
Config = config,
|
|
Exchange = exchange,
|
|
RoutingKey = routingKey,
|
|
Message = message,
|
|
Properties = writableProperties
|
|
};
|
|
|
|
|
|
await MiddlewareHelper.GoAsync(
|
|
config.Middleware.Publish,
|
|
async (handler, next) => await handler.Handle(context, next),
|
|
async () =>
|
|
{
|
|
var body = messageSerializer.Serialize(message, writableProperties);
|
|
await clientFactory().Publish(body, writableProperties, exchange, routingKey, mandatory);
|
|
});
|
|
}
|
|
|
|
|
|
private static bool IsMandatory(object message)
|
|
{
|
|
return message.GetType().GetCustomAttribute<MandatoryAttribute>() != null;
|
|
}
|
|
|
|
|
|
private class PublishContext : IPublishContext
|
|
{
|
|
public ITapetiConfig Config { get; init; } = null!;
|
|
public string? Exchange { get; set; }
|
|
public string RoutingKey { get; init; } = null!;
|
|
public object Message { get; init; } = null!;
|
|
public IMessageProperties? Properties { get; init; }
|
|
}
|
|
}
|
|
}
|