Merge branch 'release/2.3.1'

This commit is contained in:
Mark van Renswoude 2021-01-18 14:17:25 +01:00
commit dc9f097f77
5 changed files with 66 additions and 10 deletions

View File

@ -120,6 +120,17 @@ namespace Tapeti.Cmd
}
[Verb("purge", HelpText = "Removes all messages from a queue destructively.")]
public class PurgeOptions : CommonOptions
{
[Option('q', "queue", Required = true, HelpText = "The queue to purge.")]
public string QueueName { get; set; }
[Option("confirm", HelpText = "Confirms the purging of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)]
public bool Confirm { get; set; }
}
[Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")]
public class ExampleOptions
{
@ -129,11 +140,12 @@ namespace Tapeti.Cmd
public static int Main(string[] args)
{
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions, ExampleOptions>(args)
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions, PurgeOptions, ExampleOptions>(args)
.MapResult(
(ExportOptions o) => ExecuteVerb(o, RunExport),
(ImportOptions o) => ExecuteVerb(o, RunImport),
(ShovelOptions o) => ExecuteVerb(o, RunShovel),
(PurgeOptions o) => ExecuteVerb(o, RunPurge),
(ExampleOptions o) => ExecuteVerb(o, RunExample),
errs =>
{
@ -371,6 +383,30 @@ namespace Tapeti.Cmd
}
private static void RunPurge(PurgeOptions options)
{
if (!options.Confirm)
{
Console.Write($"Do you want to purge the queue '{options.QueueName}'? (Y/N) ");
var answer = Console.ReadLine();
if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase))
return;
}
uint messageCount;
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
messageCount = channel.QueuePurge(options.QueueName);
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'.");
}
private static void RunExample(ExampleOptions options)
{
using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false)))

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Tapeti.Config;
using Tapeti.Default;
using System.Threading.Tasks;
@ -14,6 +15,7 @@ namespace Tapeti.Connection
/// </summary>
internal class TapetiConsumer : IConsumer
{
private readonly CancellationToken cancellationToken;
private readonly ITapetiConfig config;
private readonly string queueName;
private readonly List<IBinding> bindings;
@ -23,9 +25,9 @@ namespace Tapeti.Connection
private readonly IMessageSerializer messageSerializer;
/// <inheritdoc />
public TapetiConsumer(ITapetiConfig config, string queueName, IEnumerable<IBinding> bindings)
public TapetiConsumer(CancellationToken cancellationToken, ITapetiConfig config, string queueName, IEnumerable<IBinding> bindings)
{
this.cancellationToken = cancellationToken;
this.config = config;
this.queueName = queueName;
this.bindings = bindings.ToList();
@ -80,11 +82,8 @@ namespace Tapeti.Connection
var messageType = message.GetType();
var validMessageType = false;
foreach (var binding in bindings)
foreach (var binding in bindings.Where(binding => binding.Accept(messageType)))
{
if (!binding.Accept(messageType))
continue;
var consumeResult = await InvokeUsingBinding(message, messageContextData, binding);
validMessageType = true;
@ -135,6 +134,13 @@ namespace Tapeti.Connection
private void HandleException(ExceptionStrategyContext exceptionContext)
{
if (cancellationToken.IsCancellationRequested && IgnoreExceptionDuringShutdown(exceptionContext.Exception))
{
// The service is most likely stopping, and the connection is gone anyways.
exceptionContext.SetConsumeResult(ConsumeResult.Requeue);
return;
}
try
{
exceptionStrategy.HandleException(exceptionContext);
@ -150,6 +156,22 @@ namespace Tapeti.Connection
}
private static bool IgnoreExceptionDuringShutdown(Exception e)
{
switch (e)
{
case AggregateException aggregateException:
return aggregateException.InnerExceptions.Any(IgnoreExceptionDuringShutdown);
case TaskCanceledException _:
case OperationCanceledException _: // thrown by CancellationTokenSource.ThrowIfCancellationRequested
return true;
default:
return e.InnerException != null && IgnoreExceptionDuringShutdown(e.InnerException);
}
}
private struct MessageContextData
{

View File

@ -138,7 +138,7 @@ namespace Tapeti.Connection
consumerTags.AddRange(await Task.WhenAll(queues.Select(async group =>
{
var queueName = group.Key;
var consumer = new TapetiConsumer(config, queueName, group);
var consumer = new TapetiConsumer(cancellationToken, config, queueName, group);
return await clientFactory().Consume(cancellationToken, queueName, consumer);
})));

View File

@ -155,7 +155,6 @@ namespace Tapeti.Default
public bool HasHandler => Handler != null;
/// <inheritdoc />
public ControllerBindingResult(ParameterInfo info)
{
Info = info;

View File

@ -99,7 +99,6 @@ namespace Tapeti.Default
public MethodInfo Method => bindingInfo.Method;
/// <inheritdoc />
public ControllerMethodBinding(IDependencyResolver dependencyResolver, BindingInfo bindingInfo)
{
this.dependencyResolver = dependencyResolver;