1
0
mirror of synced 2024-11-23 17:33:49 +00:00

Changed to Async consumer with ConsumerDispatchConcurrency

This commit is contained in:
Mark van Renswoude 2024-04-08 14:19:16 +02:00
parent aa48b4bce3
commit db9e957726
4 changed files with 29 additions and 17 deletions

View File

@ -17,7 +17,7 @@ namespace Tapeti.Connection
/// <summary> /// <summary>
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
/// </summary> /// </summary>
internal class TapetiBasicConsumer : DefaultBasicConsumer internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer
{ {
private readonly IConsumer consumer; private readonly IConsumer consumer;
private readonly long connectionReference; private readonly long connectionReference;
@ -34,7 +34,7 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public override void HandleBasicDeliver(string consumerTag, public override async Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag, ulong deliveryTag,
bool redelivered, bool redelivered,
string exchange, string exchange,
@ -50,20 +50,15 @@ namespace Tapeti.Connection
// See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
var bodyArray = body.ToArray(); var bodyArray = body.ToArray();
// Changing to AsyncDefaultBasicConsumer does not mean HandleBasicDeliver runs in parallel, the Task.Run would
// still be necessary, which is why TapetiBasicConsumer is a DefaultBasicConsumer.
Task.Run(async () =>
{
try try
{ {
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray).ConfigureAwait(false);
await onRespond(connectionReference, deliveryTag, response); await onRespond(connectionReference, deliveryTag, response).ConfigureAwait(false);
} }
catch catch
{ {
await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); await onRespond(connectionReference, deliveryTag, ConsumeResult.Error).ConfigureAwait(false);
} }
});
} }
} }
} }

View File

@ -777,9 +777,13 @@ namespace Tapeti.Connection
Password = connectionParams.Password, Password = connectionParams.Password,
AutomaticRecoveryEnabled = false, AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false, TopologyRecoveryEnabled = false,
RequestedHeartbeat = TimeSpan.FromSeconds(30) RequestedHeartbeat = TimeSpan.FromSeconds(30),
DispatchConsumersAsync = true
}; };
if (connectionParams.ConsumerDispatchConcurrency > 0)
connectionFactory.ConsumerDispatchConcurrency = connectionParams.ConsumerDispatchConcurrency;
if (connectionParams.ClientProperties != null) if (connectionParams.ClientProperties != null)
foreach (var pair in connectionParams.ClientProperties) foreach (var pair in connectionParams.ClientProperties)
{ {

View File

@ -126,6 +126,7 @@ namespace Tapeti.Helpers
case "password": result.Password = value; break; case "password": result.Password = value; break;
case "prefetchcount": result.PrefetchCount = ushort.Parse(value); break; case "prefetchcount": result.PrefetchCount = ushort.Parse(value); break;
case "managementport": result.ManagementPort = int.Parse(value); break; case "managementport": result.ManagementPort = int.Parse(value); break;
case "consumerDispatchConcurrency": result.ConsumerDispatchConcurrency = int.Parse(value); break;
} }
} }
} }

View File

@ -50,6 +50,17 @@ namespace Tapeti
/// </summary> /// </summary>
public int ManagementPort { get; set; } = 15672; public int ManagementPort { get; set; } = 15672;
/// <summary>
/// The maximum number of consumers which are run concurrently.
/// </summary>
/// <remarks>
/// The number of consumers is usually roughly equal to the number of queues consumed.
/// Do not set too high to avoid overloading the thread pool.
/// The RabbitMQ Client library defaults to 1. Due to older Tapeti versions implementing concurrency
/// effectively limited by the PrefetchCount, this will default to Environment.ProcessorCount instead.
/// </remarks>
public int ConsumerDispatchConcurrency { get; set; }
/// <summary> /// <summary>
/// Key-value pair of properties that are set on the connection. These will be visible in the RabbitMQ Management interface. /// Key-value pair of properties that are set on the connection. These will be visible in the RabbitMQ Management interface.
/// Note that you can either set a new dictionary entirely, to allow for inline declaration, or use this property directly /// Note that you can either set a new dictionary entirely, to allow for inline declaration, or use this property directly
@ -69,6 +80,7 @@ namespace Tapeti
/// </summary> /// </summary>
public TapetiConnectionParams() public TapetiConnectionParams()
{ {
ConsumerDispatchConcurrency = Environment.ProcessorCount;
} }
/// <summary> /// <summary>
@ -77,7 +89,7 @@ namespace Tapeti
/// <example>new TapetiConnectionParams(new Uri("amqp://username:password@hostname/"))</example> /// <example>new TapetiConnectionParams(new Uri("amqp://username:password@hostname/"))</example>
/// <example>new TapetiConnectionParams(new Uri("amqp://username:password@hostname:5672/virtualHost"))</example> /// <example>new TapetiConnectionParams(new Uri("amqp://username:password@hostname:5672/virtualHost"))</example>
/// <param name="uri"></param> /// <param name="uri"></param>
public TapetiConnectionParams(Uri uri) public TapetiConnectionParams(Uri uri) : this()
{ {
HostName = uri.Host; HostName = uri.Host;
VirtualHost = string.IsNullOrEmpty(uri.AbsolutePath) ? "/" : uri.AbsolutePath; VirtualHost = string.IsNullOrEmpty(uri.AbsolutePath) ? "/" : uri.AbsolutePath;