Merge branch 'release/2.3' into develop

This commit is contained in:
Mark van Renswoude 2021-01-15 10:39:41 +01:00
commit b62475c830
4 changed files with 61 additions and 13 deletions

View File

@ -77,8 +77,14 @@ namespace Tapeti.Connection
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
/// <param name="queueName"></param>
/// <param name="consumer">The consumer implementation which will receive the messages from the queue</param>
Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
/// <returns>The consumer tag as returned by BasicConsume.</returns>
Task<string> Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
/// <summary>
/// Stops the consumer with the specified tag.
/// </summary>
/// <param name="consumerTag">The consumer tag as returned by Consume.</param>
Task Cancel(string consumerTag);
/// <summary>
/// Creates a durable queue if it does not already exist, and updates the bindings.
@ -118,7 +124,6 @@ namespace Tapeti.Connection
/// <param name="binding">The binding to add to the dynamic queue</param>
Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding);
/// <summary>
/// Closes the connection to RabbitMQ gracefully.
/// </summary>

View File

@ -72,7 +72,6 @@ namespace Tapeti.Connection
}
/// <inheritdoc />
public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams)
{
this.config = config;
@ -184,29 +183,48 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
public async Task<string> Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
{
if (deletedQueues.Contains(queueName))
return;
return null;
if (string.IsNullOrEmpty(queueName))
throw new ArgumentNullException(nameof(queueName));
string consumerTag = null;
await QueueWithRetryableChannel(channel =>
{
if (cancellationToken.IsCancellationRequested)
return;
var basicConsumer = new TapetiBasicConsumer(consumer, Respond);
channel.BasicConsume(queueName, false, basicConsumer);
consumerTag = channel.BasicConsume(queueName, false, basicConsumer);
});
return consumerTag;
}
/// <inheritdoc />
public async Task Cancel(string consumerTag)
{
if (isClosing || string.IsNullOrEmpty(consumerTag))
return;
// No need for a retryable channel here, if the connection is lost
// so is the consumer.
await Queue(channel =>
{
channel.BasicCancel(consumerTag);
});
}
private async Task Respond(ulong deliveryTag, ConsumeResult result)
{
await taskQueue.Value.Add(() =>
await Queue(channel =>
{
// No need for a retryable channel here, if the connection is lost we can't
// use the deliveryTag anymore.
@ -214,15 +232,15 @@ namespace Tapeti.Connection
{
case ConsumeResult.Success:
case ConsumeResult.ExternalRequeue:
GetChannel().BasicAck(deliveryTag, false);
channel.BasicAck(deliveryTag, false);
break;
case ConsumeResult.Error:
GetChannel().BasicNack(deliveryTag, false, false);
channel.BasicNack(deliveryTag, false, false);
break;
case ConsumeResult.Requeue:
GetChannel().BasicNack(deliveryTag, false, true);
channel.BasicNack(deliveryTag, false, true);
break;
default:

View File

@ -13,6 +13,7 @@ namespace Tapeti.Connection
private readonly Func<ITapetiClient> clientFactory;
private readonly ITapetiConfig config;
private bool consuming;
private readonly List<string> consumerTags = new List<string>();
private CancellationTokenSource initializeCancellationTokenSource;
@ -50,6 +51,8 @@ namespace Tapeti.Connection
{
initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = null;
consumerTags.Clear();
}
@ -65,6 +68,8 @@ namespace Tapeti.Connection
initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = new CancellationTokenSource();
consumerTags.Clear();
cancellationToken = initializeCancellationTokenSource.Token;
// ReSharper disable once MethodSupportsCancellation
@ -91,6 +96,21 @@ namespace Tapeti.Connection
}
/// <inheritdoc />
public async Task Stop()
{
if (!consuming)
return;
initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = null;
await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag)));
consumerTags.Clear();
consuming = false;
}
private async Task ApplyBindings(CancellationToken cancellationToken)
{
@ -115,13 +135,13 @@ namespace Tapeti.Connection
{
var queues = config.Bindings.GroupBy(binding => binding.QueueName);
await Task.WhenAll(queues.Select(async group =>
consumerTags.AddRange(await Task.WhenAll(queues.Select(async group =>
{
var queueName = group.Key;
var consumer = new TapetiConsumer(config, queueName, group);
await clientFactory().Consume(cancellationToken, queueName, consumer);
}));
return await clientFactory().Consume(cancellationToken, queueName, consumer);
})));
}

View File

@ -13,5 +13,10 @@ namespace Tapeti
/// Starts consuming from the subscribed queues if not already started.
/// </summary>
Task Resume();
/// <summary>
/// Stops consuming from the subscribed queues.
/// </summary>
Task Stop();
}
}