Added Stop method to ISubscriber
This commit is contained in:
parent
70a9c04fc7
commit
f1a4ab1c67
@ -77,8 +77,14 @@ namespace Tapeti.Connection
|
|||||||
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
|
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
|
||||||
/// <param name="queueName"></param>
|
/// <param name="queueName"></param>
|
||||||
/// <param name="consumer">The consumer implementation which will receive the messages from the queue</param>
|
/// <param name="consumer">The consumer implementation which will receive the messages from the queue</param>
|
||||||
Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
|
/// <returns>The consumer tag as returned by BasicConsume.</returns>
|
||||||
|
Task<string> Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Stops the consumer with the specified tag.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="consumerTag">The consumer tag as returned by Consume.</param>
|
||||||
|
Task Cancel(string consumerTag);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Creates a durable queue if it does not already exist, and updates the bindings.
|
/// Creates a durable queue if it does not already exist, and updates the bindings.
|
||||||
@ -118,7 +124,6 @@ namespace Tapeti.Connection
|
|||||||
/// <param name="binding">The binding to add to the dynamic queue</param>
|
/// <param name="binding">The binding to add to the dynamic queue</param>
|
||||||
Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding);
|
Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding);
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Closes the connection to RabbitMQ gracefully.
|
/// Closes the connection to RabbitMQ gracefully.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
@ -72,7 +72,6 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
|
||||||
public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams)
|
public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams)
|
||||||
{
|
{
|
||||||
this.config = config;
|
this.config = config;
|
||||||
@ -184,29 +183,48 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
|
public async Task<string> Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer)
|
||||||
{
|
{
|
||||||
if (deletedQueues.Contains(queueName))
|
if (deletedQueues.Contains(queueName))
|
||||||
return;
|
return null;
|
||||||
|
|
||||||
if (string.IsNullOrEmpty(queueName))
|
if (string.IsNullOrEmpty(queueName))
|
||||||
throw new ArgumentNullException(nameof(queueName));
|
throw new ArgumentNullException(nameof(queueName));
|
||||||
|
|
||||||
|
|
||||||
|
string consumerTag = null;
|
||||||
|
|
||||||
await QueueWithRetryableChannel(channel =>
|
await QueueWithRetryableChannel(channel =>
|
||||||
{
|
{
|
||||||
if (cancellationToken.IsCancellationRequested)
|
if (cancellationToken.IsCancellationRequested)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
var basicConsumer = new TapetiBasicConsumer(consumer, Respond);
|
var basicConsumer = new TapetiBasicConsumer(consumer, Respond);
|
||||||
channel.BasicConsume(queueName, false, basicConsumer);
|
consumerTag = channel.BasicConsume(queueName, false, basicConsumer);
|
||||||
|
});
|
||||||
|
|
||||||
|
return consumerTag;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task Cancel(string consumerTag)
|
||||||
|
{
|
||||||
|
if (isClosing || string.IsNullOrEmpty(consumerTag))
|
||||||
|
return;
|
||||||
|
|
||||||
|
// No need for a retryable channel here, if the connection is lost
|
||||||
|
// so is the consumer.
|
||||||
|
await Queue(channel =>
|
||||||
|
{
|
||||||
|
channel.BasicCancel(consumerTag);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private async Task Respond(ulong deliveryTag, ConsumeResult result)
|
private async Task Respond(ulong deliveryTag, ConsumeResult result)
|
||||||
{
|
{
|
||||||
await taskQueue.Value.Add(() =>
|
await Queue(channel =>
|
||||||
{
|
{
|
||||||
// No need for a retryable channel here, if the connection is lost we can't
|
// No need for a retryable channel here, if the connection is lost we can't
|
||||||
// use the deliveryTag anymore.
|
// use the deliveryTag anymore.
|
||||||
@ -214,15 +232,15 @@ namespace Tapeti.Connection
|
|||||||
{
|
{
|
||||||
case ConsumeResult.Success:
|
case ConsumeResult.Success:
|
||||||
case ConsumeResult.ExternalRequeue:
|
case ConsumeResult.ExternalRequeue:
|
||||||
GetChannel().BasicAck(deliveryTag, false);
|
channel.BasicAck(deliveryTag, false);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case ConsumeResult.Error:
|
case ConsumeResult.Error:
|
||||||
GetChannel().BasicNack(deliveryTag, false, false);
|
channel.BasicNack(deliveryTag, false, false);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case ConsumeResult.Requeue:
|
case ConsumeResult.Requeue:
|
||||||
GetChannel().BasicNack(deliveryTag, false, true);
|
channel.BasicNack(deliveryTag, false, true);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
@ -13,6 +13,7 @@ namespace Tapeti.Connection
|
|||||||
private readonly Func<ITapetiClient> clientFactory;
|
private readonly Func<ITapetiClient> clientFactory;
|
||||||
private readonly ITapetiConfig config;
|
private readonly ITapetiConfig config;
|
||||||
private bool consuming;
|
private bool consuming;
|
||||||
|
private readonly List<string> consumerTags = new List<string>();
|
||||||
|
|
||||||
private CancellationTokenSource initializeCancellationTokenSource;
|
private CancellationTokenSource initializeCancellationTokenSource;
|
||||||
|
|
||||||
@ -50,6 +51,8 @@ namespace Tapeti.Connection
|
|||||||
{
|
{
|
||||||
initializeCancellationTokenSource?.Cancel();
|
initializeCancellationTokenSource?.Cancel();
|
||||||
initializeCancellationTokenSource = null;
|
initializeCancellationTokenSource = null;
|
||||||
|
|
||||||
|
consumerTags.Clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -65,6 +68,8 @@ namespace Tapeti.Connection
|
|||||||
initializeCancellationTokenSource?.Cancel();
|
initializeCancellationTokenSource?.Cancel();
|
||||||
initializeCancellationTokenSource = new CancellationTokenSource();
|
initializeCancellationTokenSource = new CancellationTokenSource();
|
||||||
|
|
||||||
|
consumerTags.Clear();
|
||||||
|
|
||||||
cancellationToken = initializeCancellationTokenSource.Token;
|
cancellationToken = initializeCancellationTokenSource.Token;
|
||||||
|
|
||||||
// ReSharper disable once MethodSupportsCancellation
|
// ReSharper disable once MethodSupportsCancellation
|
||||||
@ -91,6 +96,21 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task Stop()
|
||||||
|
{
|
||||||
|
if (!consuming)
|
||||||
|
return;
|
||||||
|
|
||||||
|
initializeCancellationTokenSource?.Cancel();
|
||||||
|
initializeCancellationTokenSource = null;
|
||||||
|
|
||||||
|
await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag)));
|
||||||
|
|
||||||
|
consumerTags.Clear();
|
||||||
|
consuming = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private async Task ApplyBindings(CancellationToken cancellationToken)
|
private async Task ApplyBindings(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
@ -115,13 +135,13 @@ namespace Tapeti.Connection
|
|||||||
{
|
{
|
||||||
var queues = config.Bindings.GroupBy(binding => binding.QueueName);
|
var queues = config.Bindings.GroupBy(binding => binding.QueueName);
|
||||||
|
|
||||||
await Task.WhenAll(queues.Select(async group =>
|
consumerTags.AddRange(await Task.WhenAll(queues.Select(async group =>
|
||||||
{
|
{
|
||||||
var queueName = group.Key;
|
var queueName = group.Key;
|
||||||
var consumer = new TapetiConsumer(config, queueName, group);
|
var consumer = new TapetiConsumer(config, queueName, group);
|
||||||
|
|
||||||
await clientFactory().Consume(cancellationToken, queueName, consumer);
|
return await clientFactory().Consume(cancellationToken, queueName, consumer);
|
||||||
}));
|
})));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -13,5 +13,10 @@ namespace Tapeti
|
|||||||
/// Starts consuming from the subscribed queues if not already started.
|
/// Starts consuming from the subscribed queues if not already started.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
Task Resume();
|
Task Resume();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Stops consuming from the subscribed queues.
|
||||||
|
/// </summary>
|
||||||
|
Task Stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user