diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 3ab9bb3..77b77f6 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using Tapeti.Config; using Tapeti.Default; using System.Threading.Tasks; @@ -14,6 +15,7 @@ namespace Tapeti.Connection /// internal class TapetiConsumer : IConsumer { + private readonly CancellationToken cancellationToken; private readonly ITapetiConfig config; private readonly string queueName; private readonly List bindings; @@ -23,9 +25,9 @@ namespace Tapeti.Connection private readonly IMessageSerializer messageSerializer; - /// - public TapetiConsumer(ITapetiConfig config, string queueName, IEnumerable bindings) + public TapetiConsumer(CancellationToken cancellationToken, ITapetiConfig config, string queueName, IEnumerable bindings) { + this.cancellationToken = cancellationToken; this.config = config; this.queueName = queueName; this.bindings = bindings.ToList(); @@ -80,11 +82,8 @@ namespace Tapeti.Connection var messageType = message.GetType(); var validMessageType = false; - foreach (var binding in bindings) + foreach (var binding in bindings.Where(binding => binding.Accept(messageType))) { - if (!binding.Accept(messageType)) - continue; - var consumeResult = await InvokeUsingBinding(message, messageContextData, binding); validMessageType = true; @@ -135,6 +134,13 @@ namespace Tapeti.Connection private void HandleException(ExceptionStrategyContext exceptionContext) { + if (cancellationToken.IsCancellationRequested && IsTaskCanceledException(exceptionContext.Exception)) + { + // The service is most likely stopping, and the connection is gone anyways. + exceptionContext.SetConsumeResult(ConsumeResult.Requeue); + return; + } + try { exceptionStrategy.HandleException(exceptionContext); @@ -150,6 +156,21 @@ namespace Tapeti.Connection } + private static bool IsTaskCanceledException(Exception e) + { + switch (e) + { + case AggregateException aggregateException: + return aggregateException.InnerExceptions.Any(IsTaskCanceledException); + + case TaskCanceledException _: + return true; + + default: + return e.InnerException != null && IsTaskCanceledException(e.InnerException); + } + } + private struct MessageContextData { diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 9969ae0..34c08e1 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -138,7 +138,7 @@ namespace Tapeti.Connection consumerTags.AddRange(await Task.WhenAll(queues.Select(async group => { var queueName = group.Key; - var consumer = new TapetiConsumer(config, queueName, group); + var consumer = new TapetiConsumer(cancellationToken, config, queueName, group); return await clientFactory().Consume(cancellationToken, queueName, consumer); })));