From ab2cc1c1bbb7fa2aa50083f38e237e4dc1ebceaa Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 6 Apr 2023 07:25:01 +0200 Subject: [PATCH] Reverted async changes as they caused message handlers to no longer run in parallel --- Tapeti/Connection/TapetiBasicConsumer.cs | 27 ++++++++++++++---------- Tapeti/Connection/TapetiClient.cs | 3 +-- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs index 24badec..42e3181 100644 --- a/Tapeti/Connection/TapetiBasicConsumer.cs +++ b/Tapeti/Connection/TapetiBasicConsumer.cs @@ -17,7 +17,7 @@ namespace Tapeti.Connection /// /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// - internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer + internal class TapetiBasicConsumer : DefaultBasicConsumer { private readonly IConsumer consumer; private readonly long connectionReference; @@ -34,7 +34,7 @@ namespace Tapeti.Connection /// - public override async Task HandleBasicDeliver(string consumerTag, + public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, @@ -49,16 +49,21 @@ namespace Tapeti.Connection // // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 var bodyArray = body.ToArray(); - - try + + // Changing to AsyncDefaultBasicConsumer does not mean HandleBasicDeliver runs in parallel, the Task.Run would + // still be necessary, which is why TapetiBasicConsumer is a DefaultBasicConsumer. + Task.Run(async () => { - var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); - await onRespond(connectionReference, deliveryTag, response); - } - catch - { - await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); - } + try + { + var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); + await onRespond(connectionReference, deliveryTag, response); + } + catch + { + await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); + } + }); } } } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index b23038e..1c4b016 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -777,8 +777,7 @@ namespace Tapeti.Connection Password = connectionParams.Password, AutomaticRecoveryEnabled = false, TopologyRecoveryEnabled = false, - RequestedHeartbeat = TimeSpan.FromSeconds(30), - DispatchConsumersAsync = true + RequestedHeartbeat = TimeSpan.FromSeconds(30) }; if (connectionParams.ClientProperties != null)