using System; using System.Threading.Tasks; using RabbitMQ.Client; using Tapeti.Default; namespace Tapeti.Connection { /// /// Called to report the result of a consumed message back to RabbitMQ. /// /// The connection reference on which the consumed message was received /// The delivery tag of the consumed message /// The result which should be sent back public delegate Task ResponseFunc(long expectedConnectionReference, ulong deliveryTag, ConsumeResult result); /// /// /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// internal class TapetiBasicConsumer : DefaultBasicConsumer { private readonly IConsumer consumer; private readonly long connectionReference; private readonly ResponseFunc onRespond; /// public TapetiBasicConsumer(IConsumer consumer, long connectionReference, ResponseFunc onRespond) { this.consumer = consumer; this.connectionReference = connectionReference; this.onRespond = onRespond; } /// public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory body) { // RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing // from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support // is implemented we need to rethink the way the body is passed around and maybe deserialize it sooner // (which changes exception handling, which is now done in TapetiConsumer exclusively). // // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 var bodyArray = body.ToArray(); Task.Run(async () => { try { var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); await onRespond(connectionReference, deliveryTag, response); } catch { await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); } }); } } }