1
0
mirror of synced 2024-11-16 14:53:50 +00:00
Tapeti/Connection/TapetiConsumer.cs

130 lines
5.1 KiB
C#
Raw Normal View History

using System;
2016-12-11 14:08:58 +00:00
using System.Collections.Generic;
using System.Linq;
using RabbitMQ.Client;
2016-12-11 14:08:58 +00:00
using Tapeti.Config;
using Tapeti.Helpers;
namespace Tapeti.Connection
{
public class TapetiConsumer : DefaultBasicConsumer
{
private readonly TapetiWorker worker;
private readonly string queueName;
2016-12-11 14:08:58 +00:00
private readonly IDependencyResolver dependencyResolver;
private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware;
private readonly List<IBinding> bindings;
private readonly IExceptionStrategy exceptionStrategy;
public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable<IBinding> bindings, IReadOnlyList<IMessageMiddleware> messageMiddleware)
{
this.worker = worker;
this.queueName = queueName;
2016-12-11 14:08:58 +00:00
this.dependencyResolver = dependencyResolver;
this.messageMiddleware = messageMiddleware;
this.bindings = bindings.ToList();
exceptionStrategy = dependencyResolver.Resolve<IExceptionStrategy>();
}
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IBasicProperties properties, byte[] body)
{
try
{
2016-12-11 14:08:58 +00:00
var message = dependencyResolver.Resolve<IMessageSerializer>().Deserialize(body, properties);
if (message == null)
throw new ArgumentException("Empty message");
var validMessageType = false;
using (var context = new MessageContext
2016-12-11 14:08:58 +00:00
{
DependencyResolver = dependencyResolver,
Queue = queueName,
RoutingKey = routingKey,
Message = message,
Properties = properties
})
{
try
{
foreach (var binding in bindings)
{
if (!binding.Accept(context, message).Result)
continue;
context.Controller = dependencyResolver.Resolve(binding.Controller);
context.Binding = binding;
// ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas
MiddlewareHelper.GoAsync(
binding.MessageMiddleware != null
? messageMiddleware.Concat(binding.MessageMiddleware).ToList()
: messageMiddleware,
async (handler, next) => await handler.Handle(context, next),
() => binding.Invoke(context, message)
).Wait();
// ReSharper restore AccessToDisposedClosure
validMessageType = true;
}
if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
}
catch (Exception e)
{
worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, UnwrapException(e)));
}
2016-12-11 14:08:58 +00:00
}
worker.Respond(deliveryTag, ConsumeResponse.Ack);
}
catch (Exception e)
{
worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, UnwrapException(e)));
}
}
private static Exception UnwrapException(Exception exception)
{
// In async/await style code this is handled similarly. For synchronous
// code using Tasks we have to unwrap these ourselves to get the proper
// exception directly instead of "Errors occured". We might lose
// some stack traces in the process though.
var aggregateException = exception as AggregateException;
if (aggregateException != null && aggregateException.InnerExceptions.Count == 1)
throw aggregateException.InnerExceptions[0];
return UnwrapException(exception);
}
2016-12-11 14:08:58 +00:00
protected class MessageContext : IMessageContext
{
public IDependencyResolver DependencyResolver { get; set; }
2016-12-11 14:08:58 +00:00
public object Controller { get; set; }
public IBinding Binding { get; set; }
public string Queue { get; set; }
public string RoutingKey { get; set; }
2016-12-11 14:08:58 +00:00
public object Message { get; set; }
public IBasicProperties Properties { get; set; }
2016-12-11 14:08:58 +00:00
public IDictionary<string, object> Items { get; } = new Dictionary<string, object>();
public void Dispose()
{
foreach (var value in Items.Values)
(value as IDisposable)?.Dispose();
}
2016-12-11 14:08:58 +00:00
}
}
}