diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index f1bd5fe..6862f78 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -120,6 +120,17 @@ namespace Tapeti.Cmd } + [Verb("purge", HelpText = "Removes all messages from a queue destructively.")] + public class PurgeOptions : CommonOptions + { + [Option('q', "queue", Required = true, HelpText = "The queue to purge.")] + public string QueueName { get; set; } + + [Option("confirm", HelpText = "Confirms the purging of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] + public bool Confirm { get; set; } + } + + [Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")] public class ExampleOptions { @@ -129,11 +140,12 @@ namespace Tapeti.Cmd public static int Main(string[] args) { - return Parser.Default.ParseArguments(args) + return Parser.Default.ParseArguments(args) .MapResult( (ExportOptions o) => ExecuteVerb(o, RunExport), (ImportOptions o) => ExecuteVerb(o, RunImport), (ShovelOptions o) => ExecuteVerb(o, RunShovel), + (PurgeOptions o) => ExecuteVerb(o, RunPurge), (ExampleOptions o) => ExecuteVerb(o, RunExample), errs => { @@ -371,6 +383,30 @@ namespace Tapeti.Cmd } + private static void RunPurge(PurgeOptions options) + { + if (!options.Confirm) + { + Console.Write($"Do you want to purge the queue '{options.QueueName}'? (Y/N) "); + var answer = Console.ReadLine(); + + if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + return; + } + + uint messageCount; + + using (var connection = GetConnection(options)) + using (var channel = connection.CreateModel()) + { + messageCount = channel.QueuePurge(options.QueueName); + } + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'."); + + } + + private static void RunExample(ExampleOptions options) { using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false))) diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 3ab9bb3..19fb870 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using Tapeti.Config; using Tapeti.Default; using System.Threading.Tasks; @@ -14,6 +15,7 @@ namespace Tapeti.Connection /// internal class TapetiConsumer : IConsumer { + private readonly CancellationToken cancellationToken; private readonly ITapetiConfig config; private readonly string queueName; private readonly List bindings; @@ -23,9 +25,9 @@ namespace Tapeti.Connection private readonly IMessageSerializer messageSerializer; - /// - public TapetiConsumer(ITapetiConfig config, string queueName, IEnumerable bindings) + public TapetiConsumer(CancellationToken cancellationToken, ITapetiConfig config, string queueName, IEnumerable bindings) { + this.cancellationToken = cancellationToken; this.config = config; this.queueName = queueName; this.bindings = bindings.ToList(); @@ -80,11 +82,8 @@ namespace Tapeti.Connection var messageType = message.GetType(); var validMessageType = false; - foreach (var binding in bindings) + foreach (var binding in bindings.Where(binding => binding.Accept(messageType))) { - if (!binding.Accept(messageType)) - continue; - var consumeResult = await InvokeUsingBinding(message, messageContextData, binding); validMessageType = true; @@ -135,6 +134,13 @@ namespace Tapeti.Connection private void HandleException(ExceptionStrategyContext exceptionContext) { + if (cancellationToken.IsCancellationRequested && IgnoreExceptionDuringShutdown(exceptionContext.Exception)) + { + // The service is most likely stopping, and the connection is gone anyways. + exceptionContext.SetConsumeResult(ConsumeResult.Requeue); + return; + } + try { exceptionStrategy.HandleException(exceptionContext); @@ -150,6 +156,22 @@ namespace Tapeti.Connection } + private static bool IgnoreExceptionDuringShutdown(Exception e) + { + switch (e) + { + case AggregateException aggregateException: + return aggregateException.InnerExceptions.Any(IgnoreExceptionDuringShutdown); + + case TaskCanceledException _: + case OperationCanceledException _: // thrown by CancellationTokenSource.ThrowIfCancellationRequested + return true; + + default: + return e.InnerException != null && IgnoreExceptionDuringShutdown(e.InnerException); + } + } + private struct MessageContextData { diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 9969ae0..34c08e1 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -138,7 +138,7 @@ namespace Tapeti.Connection consumerTags.AddRange(await Task.WhenAll(queues.Select(async group => { var queueName = group.Key; - var consumer = new TapetiConsumer(config, queueName, group); + var consumer = new TapetiConsumer(cancellationToken, config, queueName, group); return await clientFactory().Consume(cancellationToken, queueName, consumer); }))); diff --git a/Tapeti/Default/ControllerBindingContext.cs b/Tapeti/Default/ControllerBindingContext.cs index f86dc36..8eb0150 100644 --- a/Tapeti/Default/ControllerBindingContext.cs +++ b/Tapeti/Default/ControllerBindingContext.cs @@ -155,7 +155,6 @@ namespace Tapeti.Default public bool HasHandler => Handler != null; - /// public ControllerBindingResult(ParameterInfo info) { Info = info; diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index 4c334d8..1d52cb4 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -99,7 +99,6 @@ namespace Tapeti.Default public MethodInfo Method => bindingInfo.Method; - /// public ControllerMethodBinding(IDependencyResolver dependencyResolver, BindingInfo bindingInfo) { this.dependencyResolver = dependencyResolver;