Reverted async changes as they caused message handlers to no longer run in parallel

This commit is contained in:
Mark van Renswoude 2023-04-06 07:25:01 +02:00
parent 1bd284166d
commit ab2cc1c1bb
2 changed files with 17 additions and 13 deletions

View File

@ -17,7 +17,7 @@ namespace Tapeti.Connection
/// <summary>
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
/// </summary>
internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer
internal class TapetiBasicConsumer : DefaultBasicConsumer
{
private readonly IConsumer consumer;
private readonly long connectionReference;
@ -34,7 +34,7 @@ namespace Tapeti.Connection
/// <inheritdoc />
public override async Task HandleBasicDeliver(string consumerTag,
public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
@ -49,16 +49,21 @@ namespace Tapeti.Connection
//
// See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
var bodyArray = body.ToArray();
try
// Changing to AsyncDefaultBasicConsumer does not mean HandleBasicDeliver runs in parallel, the Task.Run would
// still be necessary, which is why TapetiBasicConsumer is a DefaultBasicConsumer.
Task.Run(async () =>
{
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
await onRespond(connectionReference, deliveryTag, response);
}
catch
{
await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
}
try
{
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
await onRespond(connectionReference, deliveryTag, response);
}
catch
{
await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
}
});
}
}
}

View File

@ -777,8 +777,7 @@ namespace Tapeti.Connection
Password = connectionParams.Password,
AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false,
RequestedHeartbeat = TimeSpan.FromSeconds(30),
DispatchConsumersAsync = true
RequestedHeartbeat = TimeSpan.FromSeconds(30)
};
if (connectionParams.ClientProperties != null)