diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs index 24badec..42e3181 100644 --- a/Tapeti/Connection/TapetiBasicConsumer.cs +++ b/Tapeti/Connection/TapetiBasicConsumer.cs @@ -17,7 +17,7 @@ namespace Tapeti.Connection /// /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// - internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer + internal class TapetiBasicConsumer : DefaultBasicConsumer { private readonly IConsumer consumer; private readonly long connectionReference; @@ -34,7 +34,7 @@ namespace Tapeti.Connection /// - public override async Task HandleBasicDeliver(string consumerTag, + public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, @@ -49,16 +49,21 @@ namespace Tapeti.Connection // // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 var bodyArray = body.ToArray(); - - try + + // Changing to AsyncDefaultBasicConsumer does not mean HandleBasicDeliver runs in parallel, the Task.Run would + // still be necessary, which is why TapetiBasicConsumer is a DefaultBasicConsumer. + Task.Run(async () => { - var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); - await onRespond(connectionReference, deliveryTag, response); - } - catch - { - await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); - } + try + { + var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); + await onRespond(connectionReference, deliveryTag, response); + } + catch + { + await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); + } + }); } } } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index b23038e..1c4b016 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -777,8 +777,7 @@ namespace Tapeti.Connection Password = connectionParams.Password, AutomaticRecoveryEnabled = false, TopologyRecoveryEnabled = false, - RequestedHeartbeat = TimeSpan.FromSeconds(30), - DispatchConsumersAsync = true + RequestedHeartbeat = TimeSpan.FromSeconds(30) }; if (connectionParams.ClientProperties != null)