From f4bef38a9ff7b0e6b8074a0a6c511a1104a6e25c Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 15 Jan 2021 11:29:31 +0100 Subject: [PATCH 1/3] Ignore TaskCanceledExceptions during shutdown --- Tapeti/Connection/TapetiConsumer.cs | 33 ++++++++++++++++++++++----- Tapeti/Connection/TapetiSubscriber.cs | 2 +- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 3ab9bb3..77b77f6 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using Tapeti.Config; using Tapeti.Default; using System.Threading.Tasks; @@ -14,6 +15,7 @@ namespace Tapeti.Connection /// internal class TapetiConsumer : IConsumer { + private readonly CancellationToken cancellationToken; private readonly ITapetiConfig config; private readonly string queueName; private readonly List bindings; @@ -23,9 +25,9 @@ namespace Tapeti.Connection private readonly IMessageSerializer messageSerializer; - /// - public TapetiConsumer(ITapetiConfig config, string queueName, IEnumerable bindings) + public TapetiConsumer(CancellationToken cancellationToken, ITapetiConfig config, string queueName, IEnumerable bindings) { + this.cancellationToken = cancellationToken; this.config = config; this.queueName = queueName; this.bindings = bindings.ToList(); @@ -80,11 +82,8 @@ namespace Tapeti.Connection var messageType = message.GetType(); var validMessageType = false; - foreach (var binding in bindings) + foreach (var binding in bindings.Where(binding => binding.Accept(messageType))) { - if (!binding.Accept(messageType)) - continue; - var consumeResult = await InvokeUsingBinding(message, messageContextData, binding); validMessageType = true; @@ -135,6 +134,13 @@ namespace Tapeti.Connection private void HandleException(ExceptionStrategyContext exceptionContext) { + if (cancellationToken.IsCancellationRequested && IsTaskCanceledException(exceptionContext.Exception)) + { + // The service is most likely stopping, and the connection is gone anyways. + exceptionContext.SetConsumeResult(ConsumeResult.Requeue); + return; + } + try { exceptionStrategy.HandleException(exceptionContext); @@ -150,6 +156,21 @@ namespace Tapeti.Connection } + private static bool IsTaskCanceledException(Exception e) + { + switch (e) + { + case AggregateException aggregateException: + return aggregateException.InnerExceptions.Any(IsTaskCanceledException); + + case TaskCanceledException _: + return true; + + default: + return e.InnerException != null && IsTaskCanceledException(e.InnerException); + } + } + private struct MessageContextData { diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 9969ae0..34c08e1 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -138,7 +138,7 @@ namespace Tapeti.Connection consumerTags.AddRange(await Task.WhenAll(queues.Select(async group => { var queueName = group.Key; - var consumer = new TapetiConsumer(config, queueName, group); + var consumer = new TapetiConsumer(cancellationToken, config, queueName, group); return await clientFactory().Consume(cancellationToken, queueName, consumer); }))); From 2431929ca3cb25961de022b2d43bfde324d826cc Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 18 Jan 2021 14:12:55 +0100 Subject: [PATCH 2/3] Added Purge verb to Tapeti.Cmd --- Tapeti.Cmd/Program.cs | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index f1bd5fe..6862f78 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -120,6 +120,17 @@ namespace Tapeti.Cmd } + [Verb("purge", HelpText = "Removes all messages from a queue destructively.")] + public class PurgeOptions : CommonOptions + { + [Option('q', "queue", Required = true, HelpText = "The queue to purge.")] + public string QueueName { get; set; } + + [Option("confirm", HelpText = "Confirms the purging of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] + public bool Confirm { get; set; } + } + + [Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")] public class ExampleOptions { @@ -129,11 +140,12 @@ namespace Tapeti.Cmd public static int Main(string[] args) { - return Parser.Default.ParseArguments(args) + return Parser.Default.ParseArguments(args) .MapResult( (ExportOptions o) => ExecuteVerb(o, RunExport), (ImportOptions o) => ExecuteVerb(o, RunImport), (ShovelOptions o) => ExecuteVerb(o, RunShovel), + (PurgeOptions o) => ExecuteVerb(o, RunPurge), (ExampleOptions o) => ExecuteVerb(o, RunExample), errs => { @@ -371,6 +383,30 @@ namespace Tapeti.Cmd } + private static void RunPurge(PurgeOptions options) + { + if (!options.Confirm) + { + Console.Write($"Do you want to purge the queue '{options.QueueName}'? (Y/N) "); + var answer = Console.ReadLine(); + + if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + return; + } + + uint messageCount; + + using (var connection = GetConnection(options)) + using (var channel = connection.CreateModel()) + { + messageCount = channel.QueuePurge(options.QueueName); + } + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'."); + + } + + private static void RunExample(ExampleOptions options) { using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false))) From 0b7c84a119d140702b72376a58749d23b22a9e40 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 18 Jan 2021 14:17:07 +0100 Subject: [PATCH 3/3] Ignore OperationCanceledException as well when shutting down --- Tapeti/Connection/TapetiConsumer.cs | 9 +++++---- Tapeti/Default/ControllerBindingContext.cs | 1 - Tapeti/Default/ControllerMethodBinding.cs | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 77b77f6..19fb870 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -134,7 +134,7 @@ namespace Tapeti.Connection private void HandleException(ExceptionStrategyContext exceptionContext) { - if (cancellationToken.IsCancellationRequested && IsTaskCanceledException(exceptionContext.Exception)) + if (cancellationToken.IsCancellationRequested && IgnoreExceptionDuringShutdown(exceptionContext.Exception)) { // The service is most likely stopping, and the connection is gone anyways. exceptionContext.SetConsumeResult(ConsumeResult.Requeue); @@ -156,18 +156,19 @@ namespace Tapeti.Connection } - private static bool IsTaskCanceledException(Exception e) + private static bool IgnoreExceptionDuringShutdown(Exception e) { switch (e) { case AggregateException aggregateException: - return aggregateException.InnerExceptions.Any(IsTaskCanceledException); + return aggregateException.InnerExceptions.Any(IgnoreExceptionDuringShutdown); case TaskCanceledException _: + case OperationCanceledException _: // thrown by CancellationTokenSource.ThrowIfCancellationRequested return true; default: - return e.InnerException != null && IsTaskCanceledException(e.InnerException); + return e.InnerException != null && IgnoreExceptionDuringShutdown(e.InnerException); } } diff --git a/Tapeti/Default/ControllerBindingContext.cs b/Tapeti/Default/ControllerBindingContext.cs index f86dc36..8eb0150 100644 --- a/Tapeti/Default/ControllerBindingContext.cs +++ b/Tapeti/Default/ControllerBindingContext.cs @@ -155,7 +155,6 @@ namespace Tapeti.Default public bool HasHandler => Handler != null; - /// public ControllerBindingResult(ParameterInfo info) { Info = info; diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index 4c334d8..1d52cb4 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -99,7 +99,6 @@ namespace Tapeti.Default public MethodInfo Method => bindingInfo.Method; - /// public ControllerMethodBinding(IDependencyResolver dependencyResolver, BindingInfo bindingInfo) { this.dependencyResolver = dependencyResolver;