diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs index 42e3181..757292d 100644 --- a/Tapeti/Connection/TapetiBasicConsumer.cs +++ b/Tapeti/Connection/TapetiBasicConsumer.cs @@ -17,7 +17,7 @@ namespace Tapeti.Connection /// /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// - internal class TapetiBasicConsumer : DefaultBasicConsumer + internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer { private readonly IConsumer consumer; private readonly long connectionReference; @@ -34,7 +34,7 @@ namespace Tapeti.Connection /// - public override void HandleBasicDeliver(string consumerTag, + public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, @@ -50,20 +50,15 @@ namespace Tapeti.Connection // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 var bodyArray = body.ToArray(); - // Changing to AsyncDefaultBasicConsumer does not mean HandleBasicDeliver runs in parallel, the Task.Run would - // still be necessary, which is why TapetiBasicConsumer is a DefaultBasicConsumer. - Task.Run(async () => + try { - try - { - var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); - await onRespond(connectionReference, deliveryTag, response); - } - catch - { - await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); - } - }); + var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray).ConfigureAwait(false); + await onRespond(connectionReference, deliveryTag, response).ConfigureAwait(false); + } + catch + { + await onRespond(connectionReference, deliveryTag, ConsumeResult.Error).ConfigureAwait(false); + } } } } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 1c4b016..9ea0694 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -777,9 +777,13 @@ namespace Tapeti.Connection Password = connectionParams.Password, AutomaticRecoveryEnabled = false, TopologyRecoveryEnabled = false, - RequestedHeartbeat = TimeSpan.FromSeconds(30) + RequestedHeartbeat = TimeSpan.FromSeconds(30), + DispatchConsumersAsync = true }; + if (connectionParams.ConsumerDispatchConcurrency > 0) + connectionFactory.ConsumerDispatchConcurrency = connectionParams.ConsumerDispatchConcurrency; + if (connectionParams.ClientProperties != null) foreach (var pair in connectionParams.ClientProperties) { diff --git a/Tapeti/Helpers/ConnectionstringParser.cs b/Tapeti/Helpers/ConnectionstringParser.cs index c872a79..4445b7e 100644 --- a/Tapeti/Helpers/ConnectionstringParser.cs +++ b/Tapeti/Helpers/ConnectionstringParser.cs @@ -126,6 +126,7 @@ namespace Tapeti.Helpers case "password": result.Password = value; break; case "prefetchcount": result.PrefetchCount = ushort.Parse(value); break; case "managementport": result.ManagementPort = int.Parse(value); break; + case "consumerDispatchConcurrency": result.ConsumerDispatchConcurrency = int.Parse(value); break; } } } diff --git a/Tapeti/TapetiConnectionParams.cs b/Tapeti/TapetiConnectionParams.cs index 43f923f..5e8e50e 100644 --- a/Tapeti/TapetiConnectionParams.cs +++ b/Tapeti/TapetiConnectionParams.cs @@ -50,6 +50,17 @@ namespace Tapeti /// public int ManagementPort { get; set; } = 15672; + /// + /// The maximum number of consumers which are run concurrently. + /// + /// + /// The number of consumers is usually roughly equal to the number of queues consumed. + /// Do not set too high to avoid overloading the thread pool. + /// The RabbitMQ Client library defaults to 1. Due to older Tapeti versions implementing concurrency + /// effectively limited by the PrefetchCount, this will default to Environment.ProcessorCount instead. + /// + public int ConsumerDispatchConcurrency { get; set; } + /// /// Key-value pair of properties that are set on the connection. These will be visible in the RabbitMQ Management interface. /// Note that you can either set a new dictionary entirely, to allow for inline declaration, or use this property directly @@ -69,6 +80,7 @@ namespace Tapeti /// public TapetiConnectionParams() { + ConsumerDispatchConcurrency = Environment.ProcessorCount; } /// @@ -77,7 +89,7 @@ namespace Tapeti /// new TapetiConnectionParams(new Uri("amqp://username:password@hostname/")) /// new TapetiConnectionParams(new Uri("amqp://username:password@hostname:5672/virtualHost")) /// - public TapetiConnectionParams(Uri uri) + public TapetiConnectionParams(Uri uri) : this() { HostName = uri.Host; VirtualHost = string.IsNullOrEmpty(uri.AbsolutePath) ? "/" : uri.AbsolutePath;