From 7143ad3c2f80993041720c914842d0320b788c79 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 17 Nov 2022 16:47:07 +0100 Subject: [PATCH] Implemented QueueArgumentsAttribute (untested) --- Examples/ExampleLib/ExampleConsoleApp.cs | 2 +- Tapeti.Flow/Default/FlowProvider.cs | 2 +- Tapeti.Flow/Default/FlowStore.cs | 6 +- Tapeti.Flow/FlowHelpers/LockCollection.cs | 2 +- Tapeti.Flow/FlowHelpers/MethodSerializer.cs | 2 +- Tapeti.Serilog/Default/DiagnosticContext.cs | 2 +- Tapeti.Serilog/TapetiSeriLogger.cs | 5 +- Tapeti.Tests/Tapeti.Tests.csproj | 10 +- Tapeti.Transient/TransientGenericBinding.cs | 2 +- Tapeti.Transient/TransientRouter.cs | 2 +- Tapeti/Config/IBinding.cs | 16 ++- Tapeti/Connection/ITapetiClient.cs | 27 +++-- Tapeti/Connection/TapetiChannel.cs | 2 +- Tapeti/Connection/TapetiClient.cs | 65 ++++++---- Tapeti/Connection/TapetiSubscriber.cs | 111 +++++++++++------- Tapeti/Default/ConsoleLogger.cs | 13 +- Tapeti/Default/ControllerBindingContext.cs | 2 +- Tapeti/Default/ControllerMethodBinding.cs | 14 ++- Tapeti/Default/JsonMessageSerializer.cs | 4 +- Tapeti/Default/MessageContext.cs | 4 +- Tapeti/Default/MessageProperties.cs | 2 +- .../Default/NamespaceMatchExchangeStrategy.cs | 2 +- Tapeti/Default/TypeNameRoutingKeyStrategy.cs | 4 +- Tapeti/Helpers/DictionaryHelper.cs | 30 +++++ Tapeti/ILogger.cs | 5 +- Tapeti/Tapeti.csproj | 4 +- Tapeti/TapetiConfig.cs | 12 +- Tapeti/TapetiConfigControllers.cs | 56 ++++++++- Tapeti/Tasks/SingleThreadTaskQueue.cs | 6 +- 29 files changed, 281 insertions(+), 133 deletions(-) create mode 100644 Tapeti/Helpers/DictionaryHelper.cs diff --git a/Examples/ExampleLib/ExampleConsoleApp.cs b/Examples/ExampleLib/ExampleConsoleApp.cs index cc248be..feabdec 100644 --- a/Examples/ExampleLib/ExampleConsoleApp.cs +++ b/Examples/ExampleLib/ExampleConsoleApp.cs @@ -24,7 +24,7 @@ namespace ExampleLib private readonly IDependencyContainer dependencyResolver; private readonly int expectedDoneCount; private int doneCount; - private readonly TaskCompletionSource doneSignal = new TaskCompletionSource(); + private readonly TaskCompletionSource doneSignal = new(); /// Uses Tapeti's IDependencyContainer interface so you can easily switch an example to your favourite IoC container diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 2fa8a2d..3c8c515 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -304,7 +304,7 @@ namespace Tapeti.Flow.Default private readonly ITapetiConfig config; private readonly FlowProvider flowProvider; - private readonly List requests = new List(); + private readonly List requests = new(); public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider) diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index 42a99e5..99fe5f0 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -29,9 +29,9 @@ namespace Tapeti.Flow.Default } } - private readonly ConcurrentDictionary flowStates = new ConcurrentDictionary(); - private readonly ConcurrentDictionary continuationLookup = new ConcurrentDictionary(); - private readonly LockCollection locks = new LockCollection(EqualityComparer.Default); + private readonly ConcurrentDictionary flowStates = new(); + private readonly ConcurrentDictionary continuationLookup = new(); + private readonly LockCollection locks = new(EqualityComparer.Default); private HashSet validatedMethods; private readonly IFlowRepository repository; diff --git a/Tapeti.Flow/FlowHelpers/LockCollection.cs b/Tapeti.Flow/FlowHelpers/LockCollection.cs index 309e916..2c0372e 100644 --- a/Tapeti.Flow/FlowHelpers/LockCollection.cs +++ b/Tapeti.Flow/FlowHelpers/LockCollection.cs @@ -60,7 +60,7 @@ namespace Tapeti.Flow.FlowHelpers internal volatile LockItem Next; private readonly Dictionary locks; - private readonly TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); private readonly T key; public LockItem(Dictionary locks, T key) diff --git a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs b/Tapeti.Flow/FlowHelpers/MethodSerializer.cs index 9f8b09e..229ad65 100644 --- a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs +++ b/Tapeti.Flow/FlowHelpers/MethodSerializer.cs @@ -18,7 +18,7 @@ namespace Tapeti.Flow.FlowHelpers } - private static readonly Regex DeserializeRegex = new Regex("^(?.+?)@(?.+?):(?.+?)$"); + private static readonly Regex DeserializeRegex = new("^(?.+?)@(?.+?):(?.+?)$"); /// diff --git a/Tapeti.Serilog/Default/DiagnosticContext.cs b/Tapeti.Serilog/Default/DiagnosticContext.cs index 387ebe1..0f7f291 100644 --- a/Tapeti.Serilog/Default/DiagnosticContext.cs +++ b/Tapeti.Serilog/Default/DiagnosticContext.cs @@ -10,7 +10,7 @@ namespace Tapeti.Serilog.Default public class DiagnosticContext : IDiagnosticContext { private readonly global::Serilog.ILogger logger; - private readonly List properties = new List(); + private readonly List properties = new(); /// diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs index 98c7864..f13afbd 100644 --- a/Tapeti.Serilog/TapetiSeriLogger.cs +++ b/Tapeti.Serilog/TapetiSeriLogger.cs @@ -129,10 +129,11 @@ namespace Tapeti.Serilog } /// - public void QueueExistsWarning(string queueName, Dictionary arguments) + public void QueueExistsWarning(string queueName, IReadOnlyDictionary existingArguments, IReadOnlyDictionary arguments) { - seriLogger.Warning("Tapeti: durable queue {queueName} exists with incompatible x-arguments ({arguments}) and will not be redeclared, queue will be consumed as-is", + seriLogger.Warning("Tapeti: durable queue {queueName} exists with incompatible x-arguments ({existingArguments} vs. {arguments}) and will not be redeclared, queue will be consumed as-is", queueName, + existingArguments, arguments); } diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index 2e53986..2588df8 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -8,6 +8,12 @@ 1701;1702 + + + + + + @@ -22,8 +28,4 @@ - - - - diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs index 13967e7..a514466 100644 --- a/Tapeti.Transient/TransientGenericBinding.cs +++ b/Tapeti.Transient/TransientGenericBinding.cs @@ -33,7 +33,7 @@ namespace Tapeti.Transient /// public async ValueTask Apply(IBindingTarget target) { - QueueName = await target.BindDynamicDirect(dynamicQueuePrefix); + QueueName = await target.BindDynamicDirect(dynamicQueuePrefix, null); router.TransientResponseQueueName = QueueName; } diff --git a/Tapeti.Transient/TransientRouter.cs b/Tapeti.Transient/TransientRouter.cs index 2aeea29..7c0e94d 100644 --- a/Tapeti.Transient/TransientRouter.cs +++ b/Tapeti.Transient/TransientRouter.cs @@ -13,7 +13,7 @@ namespace Tapeti.Transient internal class TransientRouter { private readonly int defaultTimeoutMs; - private readonly ConcurrentDictionary> map = new ConcurrentDictionary>(); + private readonly ConcurrentDictionary> map = new(); /// /// The generated name of the dynamic queue to which responses should be sent. diff --git a/Tapeti/Config/IBinding.cs b/Tapeti/Config/IBinding.cs index dcaa387..269fb08 100644 --- a/Tapeti/Config/IBinding.cs +++ b/Tapeti/Config/IBinding.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace Tapeti.Config @@ -80,7 +81,8 @@ namespace Tapeti.Config /// /// The message class to be bound to the queue /// The name of the durable queue - ValueTask BindDurable(Type messageClass, string queueName); + /// Optional arguments + ValueTask BindDurable(Type messageClass, string queueName, IReadOnlyDictionary arguments); /// /// Binds the messageClass to a dynamic auto-delete queue. @@ -91,15 +93,17 @@ namespace Tapeti.Config /// /// The message class to be bound to the queue /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. + /// Optional arguments /// The generated name of the dynamic queue - ValueTask BindDynamic(Type messageClass, string queuePrefix = null); + ValueTask BindDynamic(Type messageClass, string queuePrefix, IReadOnlyDictionary arguments); /// /// Declares a durable queue but does not add a binding for a messageClass' routing key. /// Used for direct-to-queue messages. /// /// The name of the durable queue - ValueTask BindDurableDirect(string queueName); + /// Optional arguments + ValueTask BindDurableDirect(string queueName, IReadOnlyDictionary arguments); /// /// Declares a dynamic queue but does not add a binding for a messageClass' routing key. @@ -107,16 +111,18 @@ namespace Tapeti.Config /// /// The message class which will be handled on the queue. It is not actually bound to the queue. /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. + /// Optional arguments /// The generated name of the dynamic queue - ValueTask BindDynamicDirect(Type messageClass = null, string queuePrefix = null); + ValueTask BindDynamicDirect(Type messageClass, string queuePrefix, IReadOnlyDictionary arguments); /// /// Declares a dynamic queue but does not add a binding for a messageClass' routing key. /// Used for direct-to-queue messages. Guarantees a unique queue. /// /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. + /// Optional arguments /// The generated name of the dynamic queue - ValueTask BindDynamicDirect(string queuePrefix = null); + ValueTask BindDynamicDirect(string queuePrefix, IReadOnlyDictionary arguments); /// /// Marks the specified durable queue as having an obsolete binding. If after all bindings have subscribed, the queue only contains obsolete diff --git a/Tapeti/Connection/ITapetiClient.cs b/Tapeti/Connection/ITapetiClient.cs index e71c3fc..af1b560 100644 --- a/Tapeti/Connection/ITapetiClient.cs +++ b/Tapeti/Connection/ITapetiClient.cs @@ -74,11 +74,11 @@ namespace Tapeti.Connection /// /// Starts a consumer for the specified queue, using the provided bindings to handle messages. /// - /// Cancelled when the connection is lost /// /// The consumer implementation which will receive the messages from the queue + /// Cancelled when the connection is lost /// The consumer tag as returned by BasicConsume. - Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer); + Task Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken); /// /// Stops the consumer with the specified tag. @@ -89,40 +89,43 @@ namespace Tapeti.Connection /// /// Creates a durable queue if it does not already exist, and updates the bindings. /// - /// Cancelled when the connection is lost /// The name of the queue to create /// A list of bindings. Any bindings already on the queue which are not in this list will be removed - Task DurableQueueDeclare(CancellationToken cancellationToken, string queueName, IEnumerable bindings); + /// Optional arguments + /// Cancelled when the connection is lost + Task DurableQueueDeclare(string queueName, IEnumerable bindings, IReadOnlyDictionary arguments, CancellationToken cancellationToken); /// /// Verifies a durable queue exists. Will raise an exception if it does not. /// - /// Cancelled when the connection is lost /// The name of the queue to verify - Task DurableQueueVerify(CancellationToken cancellationToken, string queueName); + /// Optional arguments + /// Cancelled when the connection is lost + Task DurableQueueVerify(string queueName, IReadOnlyDictionary arguments, CancellationToken cancellationToken); /// /// Deletes a durable queue. /// - /// Cancelled when the connection is lost /// The name of the queue to delete /// If true, the queue will only be deleted if it is empty otherwise all bindings will be removed. If false, the queue is deleted even if there are queued messages. - Task DurableQueueDelete(CancellationToken cancellationToken, string queueName, bool onlyIfEmpty = true); + /// Cancelled when the connection is lost + Task DurableQueueDelete(string queueName, bool onlyIfEmpty, CancellationToken cancellationToken); /// /// Creates a dynamic queue. /// - /// Cancelled when the connection is lost /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. - Task DynamicQueueDeclare(CancellationToken cancellationToken, string queuePrefix = null); + /// Optional arguments + /// Cancelled when the connection is lost + Task DynamicQueueDeclare(string queuePrefix, IReadOnlyDictionary arguments, CancellationToken cancellationToken); /// /// Add a binding to a dynamic queue. /// - /// Cancelled when the connection is lost /// The name of the dynamic queue previously created using DynamicQueueDeclare /// The binding to add to the dynamic queue - Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding); + /// Cancelled when the connection is lost + Task DynamicQueueBind(string queueName, QueueBinding binding, CancellationToken cancellationToken); /// /// Closes the connection to RabbitMQ gracefully. diff --git a/Tapeti/Connection/TapetiChannel.cs b/Tapeti/Connection/TapetiChannel.cs index 902b5cf..0efe5c3 100644 --- a/Tapeti/Connection/TapetiChannel.cs +++ b/Tapeti/Connection/TapetiChannel.cs @@ -20,7 +20,7 @@ namespace Tapeti.Connection internal class TapetiChannel { private readonly Func modelFactory; - private readonly object taskQueueLock = new object(); + private readonly object taskQueueLock = new(); private SingleThreadTaskQueue taskQueue; private readonly ModelProvider modelProvider; diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 9ff960f..4ff3251 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -13,6 +13,7 @@ using RabbitMQ.Client.Exceptions; using Tapeti.Config; using Tapeti.Default; using Tapeti.Exceptions; +using Tapeti.Helpers; namespace Tapeti.Connection { @@ -50,7 +51,7 @@ namespace Tapeti.Connection private readonly HttpClient managementClient; // These fields must be locked using connectionLock - private readonly object connectionLock = new object(); + private readonly object connectionLock = new(); private long connectionReference; private RabbitMQ.Client.IConnection connection; private IModel consumeChannelModel; @@ -61,12 +62,12 @@ namespace Tapeti.Connection // These fields are for use in a single TapetiChannel's queue only! private ulong lastDeliveryTag; - private readonly HashSet deletedQueues = new HashSet(); + private readonly HashSet deletedQueues = new(); // These fields must be locked using confirmLock, since the callbacks for BasicAck/BasicReturn can run in a different thread - private readonly object confirmLock = new object(); - private readonly Dictionary confirmMessages = new Dictionary(); - private readonly Dictionary returnRoutingKeys = new Dictionary(); + private readonly object confirmLock = new(); + private readonly Dictionary confirmMessages = new(); + private readonly Dictionary returnRoutingKeys = new(); private class ConfirmMessageInfo @@ -198,7 +199,7 @@ namespace Tapeti.Connection /// - public async Task Consume(CancellationToken cancellationToken, string queueName, IConsumer consumer) + public async Task Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken) { if (deletedQueues.Contains(queueName)) return null; @@ -285,7 +286,7 @@ namespace Tapeti.Connection } - private async Task GetDurableQueueDeclareRequired(string queueName) + private async Task GetDurableQueueDeclareRequired(string queueName, IReadOnlyDictionary arguments) { var existingQueue = await GetQueueInfo(queueName); if (existingQueue == null) @@ -294,18 +295,22 @@ namespace Tapeti.Connection if (!existingQueue.Durable || existingQueue.AutoDelete || existingQueue.Exclusive) throw new InvalidOperationException($"Durable queue {queueName} already exists with incompatible parameters, durable = {existingQueue.Durable} (expected True), autoDelete = {existingQueue.AutoDelete} (expected False), exclusive = {existingQueue.Exclusive} (expected False)"); - if (existingQueue.Arguments.Count <= 0) + if (arguments == null && existingQueue.Arguments.Count == 0) return true; - - (logger as IBindingLogger)?.QueueExistsWarning(queueName, existingQueue.Arguments); + + if (existingQueue.Arguments.NullSafeSameValues(arguments)) + return true; + + (logger as IBindingLogger)?.QueueExistsWarning(queueName, existingQueue.Arguments, arguments); return false; } - + + /// - public async Task DurableQueueDeclare(CancellationToken cancellationToken, string queueName, IEnumerable bindings) + public async Task DurableQueueDeclare(string queueName, IEnumerable bindings, IReadOnlyDictionary arguments, CancellationToken cancellationToken) { - var declareRequired = await GetDurableQueueDeclareRequired(queueName); + var declareRequired = await GetDurableQueueDeclareRequired(queueName, arguments); var existingBindings = (await GetQueueBindings(queueName)).ToList(); var currentBindings = bindings.ToList(); @@ -319,7 +324,7 @@ namespace Tapeti.Connection if (declareRequired) { bindingLogger?.QueueDeclare(queueName, true, false); - channel.QueueDeclare(queueName, true, false, false); + channel.QueueDeclare(queueName, true, false, false, GetDeclareArguments(arguments)); } foreach (var binding in currentBindings.Except(existingBindings)) @@ -337,10 +342,20 @@ namespace Tapeti.Connection }); } - /// - public async Task DurableQueueVerify(CancellationToken cancellationToken, string queueName) + + private static IDictionary GetDeclareArguments(IReadOnlyDictionary arguments) { - if (!await GetDurableQueueDeclareRequired(queueName)) + if (arguments == null || arguments.Count == 0) + return null; + + return arguments.ToDictionary(p => p.Key, p => (object)Encoding.UTF8.GetBytes(p.Value)); + } + + + /// + public async Task DurableQueueVerify(string queueName, IReadOnlyDictionary arguments, CancellationToken cancellationToken) + { + if (!await GetDurableQueueDeclareRequired(queueName, arguments)) return; await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => @@ -355,7 +370,7 @@ namespace Tapeti.Connection /// - public async Task DurableQueueDelete(CancellationToken cancellationToken, string queueName, bool onlyIfEmpty = true) + public async Task DurableQueueDelete(string queueName, bool onlyIfEmpty, CancellationToken cancellationToken) { if (!onlyIfEmpty) { @@ -440,7 +455,7 @@ namespace Tapeti.Connection /// - public async Task DynamicQueueDeclare(CancellationToken cancellationToken, string queuePrefix = null) + public async Task DynamicQueueDeclare(string queuePrefix, IReadOnlyDictionary arguments, CancellationToken cancellationToken) { string queueName = null; var bindingLogger = logger as IBindingLogger; @@ -454,11 +469,11 @@ namespace Tapeti.Connection { queueName = queuePrefix + "." + Guid.NewGuid().ToString("N"); bindingLogger?.QueueDeclare(queueName, false, false); - channel.QueueDeclare(queueName); + channel.QueueDeclare(queueName, arguments: GetDeclareArguments(arguments)); } else { - queueName = channel.QueueDeclare().QueueName; + queueName = channel.QueueDeclare(arguments: GetDeclareArguments(arguments)).QueueName; bindingLogger?.QueueDeclare(queueName, false, false); } }); @@ -467,7 +482,7 @@ namespace Tapeti.Connection } /// - public async Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding) + public async Task DynamicQueueBind(string queueName, QueueBinding binding, CancellationToken cancellationToken) { await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => { @@ -523,7 +538,7 @@ namespace Tapeti.Connection } - private static readonly List TransientStatusCodes = new List() + private static readonly List TransientStatusCodes = new() { HttpStatusCode.GatewayTimeout, HttpStatusCode.RequestTimeout, @@ -675,7 +690,7 @@ namespace Tapeti.Connection } - private readonly HashSet declaredExchanges = new HashSet(); + private readonly HashSet declaredExchanges = new(); private void DeclareExchange(IModel channel, string exchange) { @@ -842,7 +857,7 @@ namespace Tapeti.Connection GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(_ => { }); }; - capturedPublishChannelModel.ModelShutdown += (sender, args) => + capturedPublishChannelModel.ModelShutdown += (_, _) => { lock (connectionLock) { diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 0c026ab..3b467f3 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using Tapeti.Config; +using Tapeti.Helpers; namespace Tapeti.Connection { @@ -13,7 +14,7 @@ namespace Tapeti.Connection private readonly Func clientFactory; private readonly ITapetiConfig config; private bool consuming; - private readonly List consumerTags = new List(); + private readonly List consumerTags = new(); private CancellationTokenSource initializeCancellationTokenSource; @@ -149,7 +150,7 @@ namespace Tapeti.Connection var queueName = group.Key; var consumer = new TapetiConsumer(cancellationToken, config, queueName, group); - return await clientFactory().Consume(cancellationToken, queueName, consumer); + return await clientFactory().Consume(queueName, consumer, cancellationToken); }))).Where(t => t != null)); } @@ -165,9 +166,10 @@ namespace Tapeti.Connection { public string QueueName; public List MessageClasses; + public IReadOnlyDictionary Arguments; } - private readonly Dictionary> dynamicQueues = new Dictionary>(); + private readonly Dictionary> dynamicQueues = new(); protected CustomBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) @@ -185,38 +187,38 @@ namespace Tapeti.Connection } - public abstract ValueTask BindDurable(Type messageClass, string queueName); - public abstract ValueTask BindDurableDirect(string queueName); + public abstract ValueTask BindDurable(Type messageClass, string queueName, IReadOnlyDictionary arguments); + public abstract ValueTask BindDurableDirect(string queueName, IReadOnlyDictionary arguments); public abstract ValueTask BindDurableObsolete(string queueName); - public async ValueTask BindDynamic(Type messageClass, string queuePrefix = null) + public async ValueTask BindDynamic(Type messageClass, string queuePrefix, IReadOnlyDictionary arguments) { - var result = await DeclareDynamicQueue(messageClass, queuePrefix); + var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments); if (!result.IsNewMessageClass) return result.QueueName; var routingKey = RoutingKeyStrategy.GetRoutingKey(messageClass); var exchange = ExchangeStrategy.GetExchange(messageClass); - await ClientFactory().DynamicQueueBind(CancellationToken, result.QueueName, new QueueBinding(exchange, routingKey)); + await ClientFactory().DynamicQueueBind(result.QueueName, new QueueBinding(exchange, routingKey), CancellationToken); return result.QueueName; } - public async ValueTask BindDynamicDirect(Type messageClass, string queuePrefix = null) + public async ValueTask BindDynamicDirect(Type messageClass, string queuePrefix, IReadOnlyDictionary arguments) { - var result = await DeclareDynamicQueue(messageClass, queuePrefix); + var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments); return result.QueueName; } - public async ValueTask BindDynamicDirect(string queuePrefix = null) + public async ValueTask BindDynamicDirect(string queuePrefix, IReadOnlyDictionary arguments) { // If we don't know the routing key, always create a new queue to ensure there is no overlap. // Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either. - return await ClientFactory().DynamicQueueDeclare(CancellationToken, queuePrefix); + return await ClientFactory().DynamicQueueDeclare(queuePrefix, arguments, CancellationToken); } @@ -226,7 +228,7 @@ namespace Tapeti.Connection public bool IsNewMessageClass; } - private async Task DeclareDynamicQueue(Type messageClass, string queuePrefix) + private async Task DeclareDynamicQueue(Type messageClass, string queuePrefix, IReadOnlyDictionary arguments) { // Group by prefix var key = queuePrefix ?? ""; @@ -241,7 +243,7 @@ namespace Tapeti.Connection foreach (var existingQueueInfo in prefixQueues) { // ReSharper disable once InvertIf - if (!existingQueueInfo.MessageClasses.Contains(messageClass)) + if (!existingQueueInfo.MessageClasses.Contains(messageClass) && existingQueueInfo.Arguments.NullSafeSameValues(arguments)) { // Allow this routing key in the existing dynamic queue var result = new DeclareDynamicQueueResult @@ -258,11 +260,12 @@ namespace Tapeti.Connection } // Declare a new queue - var queueName = await ClientFactory().DynamicQueueDeclare(CancellationToken, queuePrefix); + var queueName = await ClientFactory().DynamicQueueDeclare(queuePrefix, arguments, CancellationToken); var queueInfo = new DynamicQueueInfo { QueueName = queueName, - MessageClasses = new List { messageClass } + MessageClasses = new List { messageClass }, + Arguments = arguments }; prefixQueues.Add(queueInfo); @@ -278,8 +281,15 @@ namespace Tapeti.Connection private class DeclareDurableQueuesBindingTarget : CustomBindingTarget { - private readonly Dictionary> durableQueues = new Dictionary>(); - private readonly HashSet obsoleteDurableQueues = new HashSet(); + private struct DurableQueueInfo + { + public List MessageClasses; + public IReadOnlyDictionary Arguments; + } + + + private readonly Dictionary durableQueues = new(); + private readonly HashSet obsoleteDurableQueues = new(); public DeclareDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken) @@ -287,29 +297,50 @@ namespace Tapeti.Connection } - public override ValueTask BindDurable(Type messageClass, string queueName) + public override ValueTask BindDurable(Type messageClass, string queueName, IReadOnlyDictionary arguments) { // Collect the message classes per queue so we can determine afterwards // if any of the bindings currently set on the durable queue are no // longer valid and should be removed. - if (!durableQueues.TryGetValue(queueName, out var messageClasses)) + if (!durableQueues.TryGetValue(queueName, out var durableQueueInfo)) { - durableQueues.Add(queueName, new List + durableQueues.Add(queueName, new DurableQueueInfo { - messageClass + MessageClasses = new List + { + messageClass + }, + Arguments = arguments }); } - else if (!messageClasses.Contains(messageClass)) - messageClasses.Add(messageClass); + else + { + if (!durableQueueInfo.Arguments.NullSafeSameValues(arguments)) + throw new TopologyConfigurationException($"Multiple conflicting QueueArguments attributes specified for queue {queueName}"); + if (!durableQueueInfo.MessageClasses.Contains(messageClass)) + durableQueueInfo.MessageClasses.Add(messageClass); + } + return default; - } + } - public override ValueTask BindDurableDirect(string queueName) + public override ValueTask BindDurableDirect(string queueName, IReadOnlyDictionary arguments) { - if (!durableQueues.ContainsKey(queueName)) - durableQueues.Add(queueName, new List()); + if (!durableQueues.TryGetValue(queueName, out var durableQueueInfo)) + { + durableQueues.Add(queueName, new DurableQueueInfo + { + MessageClasses = new List(), + Arguments = arguments + }); + } + else + { + if (!durableQueueInfo.Arguments.NullSafeSameValues(arguments)) + throw new TopologyConfigurationException($"Multiple conflicting QueueArguments attributes specified for queue {queueName}"); + } return default; } @@ -334,7 +365,7 @@ namespace Tapeti.Connection { await Task.WhenAll(durableQueues.Select(async queue => { - var bindings = queue.Value.Select(messageClass => + var bindings = queue.Value.MessageClasses.Select(messageClass => { var exchange = ExchangeStrategy.GetExchange(messageClass); var routingKey = RoutingKeyStrategy.GetRoutingKey(messageClass); @@ -342,7 +373,7 @@ namespace Tapeti.Connection return new QueueBinding(exchange, routingKey); }); - await client.DurableQueueDeclare(CancellationToken, queue.Key, bindings); + await client.DurableQueueDeclare(queue.Key, bindings, queue.Value.Arguments, CancellationToken); })); } @@ -351,7 +382,7 @@ namespace Tapeti.Connection { await Task.WhenAll(obsoleteDurableQueues.Except(durableQueues.Keys).Select(async queue => { - await client.DurableQueueDelete(CancellationToken, queue); + await client.DurableQueueDelete(queue, true, CancellationToken); })); } } @@ -359,7 +390,7 @@ namespace Tapeti.Connection private class PassiveDurableQueuesBindingTarget : CustomBindingTarget { - private readonly HashSet durableQueues = new HashSet(); + private readonly HashSet durableQueues = new(); public PassiveDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken) @@ -367,14 +398,14 @@ namespace Tapeti.Connection } - public override async ValueTask BindDurable(Type messageClass, string queueName) + public override async ValueTask BindDurable(Type messageClass, string queueName, IReadOnlyDictionary arguments) { - await VerifyDurableQueue(queueName); + await VerifyDurableQueue(queueName, arguments); } - public override async ValueTask BindDurableDirect(string queueName) + public override async ValueTask BindDurableDirect(string queueName, IReadOnlyDictionary arguments) { - await VerifyDurableQueue(queueName); + await VerifyDurableQueue(queueName, arguments); } public override ValueTask BindDurableObsolete(string queueName) @@ -383,12 +414,12 @@ namespace Tapeti.Connection } - private async Task VerifyDurableQueue(string queueName) + private async Task VerifyDurableQueue(string queueName, IReadOnlyDictionary arguments) { if (!durableQueues.Add(queueName)) return; - await ClientFactory().DurableQueueVerify(CancellationToken, queueName); + await ClientFactory().DurableQueueVerify(queueName, arguments, CancellationToken); } } @@ -400,12 +431,12 @@ namespace Tapeti.Connection } - public override ValueTask BindDurable(Type messageClass, string queueName) + public override ValueTask BindDurable(Type messageClass, string queueName, IReadOnlyDictionary arguments) { return default; } - public override ValueTask BindDurableDirect(string queueName) + public override ValueTask BindDurableDirect(string queueName, IReadOnlyDictionary arguments) { return default; } diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index b1a66ca..8b6e955 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -81,7 +81,13 @@ namespace Tapeti.Default } /// - public void QueueExistsWarning(string queueName, Dictionary arguments) + public void QueueExistsWarning(string queueName, IReadOnlyDictionary existingArguments, IReadOnlyDictionary arguments) + { + Console.WriteLine($"[Tapeti] Durable queue {queueName} exists with incompatible x-arguments ({GetArgumentsText(existingArguments)} vs. {GetArgumentsText(arguments)}) and will not be redeclared, queue will be consumed as-is"); + } + + + private static string GetArgumentsText(IReadOnlyDictionary arguments) { var argumentsText = new StringBuilder(); foreach (var pair in arguments) @@ -91,10 +97,11 @@ namespace Tapeti.Default argumentsText.Append($"{pair.Key} = {pair.Value}"); } - - Console.WriteLine($"[Tapeti] Durable queue {queueName} exists with incompatible x-arguments ({argumentsText}) and will not be redeclared, queue will be consumed as-is"); + + return argumentsText.ToString(); } + /// public void QueueBind(string queueName, bool durable, string exchange, string routingKey) { diff --git a/Tapeti/Default/ControllerBindingContext.cs b/Tapeti/Default/ControllerBindingContext.cs index 9318c98..57ee673 100644 --- a/Tapeti/Default/ControllerBindingContext.cs +++ b/Tapeti/Default/ControllerBindingContext.cs @@ -9,7 +9,7 @@ namespace Tapeti.Default internal class ControllerBindingContext : IControllerBindingContext { private BindingTargetMode? bindingTargetMode; - private readonly List middleware = new List(); + private readonly List middleware = new(); private readonly List parameters; private readonly ControllerBindingResult result; diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index 24267e4..a2b0614 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -117,10 +117,10 @@ namespace Tapeti.Default { case BindingTargetMode.Default: if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic) - QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name); + QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); else { - await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name); + await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); QueueName = bindingInfo.QueueInfo.Name; } @@ -128,10 +128,10 @@ namespace Tapeti.Default case BindingTargetMode.Direct: if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic) - QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name); + QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); else { - await target.BindDurableDirect(bindingInfo.QueueInfo.Name); + await target.BindDurableDirect(bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); QueueName = bindingInfo.QueueInfo.Name; } @@ -316,7 +316,11 @@ namespace Tapeti.Default /// public string Name { get; set; } - + /// + /// Optional arguments (x-arguments) passed when declaring the queue. + /// + public IReadOnlyDictionary QueueArguments { get; set; } + /// /// Determines if the QueueInfo properties contain a valid combination. /// diff --git a/Tapeti/Default/JsonMessageSerializer.cs b/Tapeti/Default/JsonMessageSerializer.cs index 61b5252..8f92928 100644 --- a/Tapeti/Default/JsonMessageSerializer.cs +++ b/Tapeti/Default/JsonMessageSerializer.cs @@ -16,8 +16,8 @@ namespace Tapeti.Default private const string ClassTypeHeader = "classType"; - private readonly ConcurrentDictionary deserializedTypeNames = new ConcurrentDictionary(); - private readonly ConcurrentDictionary serializedTypeNames = new ConcurrentDictionary(); + private readonly ConcurrentDictionary deserializedTypeNames = new(); + private readonly ConcurrentDictionary serializedTypeNames = new(); private readonly JsonSerializerSettings serializerSettings; diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs index ca98640..3b72e77 100644 --- a/Tapeti/Default/MessageContext.cs +++ b/Tapeti/Default/MessageContext.cs @@ -8,7 +8,7 @@ namespace Tapeti.Default { internal class MessageContext : IMessageContext { - private readonly Dictionary payloads = new Dictionary(); + private readonly Dictionary payloads = new(); /// @@ -117,7 +117,7 @@ namespace Tapeti.Default // ReSharper disable once InconsistentNaming public class KeyValuePayload : IMessageContextPayload, IDisposable, IAsyncDisposable { - private readonly Dictionary items = new Dictionary(); + private readonly Dictionary items = new(); public KeyValuePayload(string key, object value) diff --git a/Tapeti/Default/MessageProperties.cs b/Tapeti/Default/MessageProperties.cs index 3184da9..8227934 100644 --- a/Tapeti/Default/MessageProperties.cs +++ b/Tapeti/Default/MessageProperties.cs @@ -10,7 +10,7 @@ namespace Tapeti.Default /// public class MessageProperties : IMessageProperties { - private readonly Dictionary headers = new Dictionary(); + private readonly Dictionary headers = new(); /// diff --git a/Tapeti/Default/NamespaceMatchExchangeStrategy.cs b/Tapeti/Default/NamespaceMatchExchangeStrategy.cs index 3c4465e..54d16d0 100644 --- a/Tapeti/Default/NamespaceMatchExchangeStrategy.cs +++ b/Tapeti/Default/NamespaceMatchExchangeStrategy.cs @@ -13,7 +13,7 @@ namespace Tapeti.Default /// public class NamespaceMatchExchangeStrategy : IExchangeStrategy { - private static readonly Regex NamespaceRegex = new Regex("^(Messaging\\.)?(?[^\\.]+)", RegexOptions.Compiled | RegexOptions.Singleline); + private static readonly Regex NamespaceRegex = new("^(Messaging\\.)?(?[^\\.]+)", RegexOptions.Compiled | RegexOptions.Singleline); /// diff --git a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs index f31af08..35ee8d8 100644 --- a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs +++ b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs @@ -28,9 +28,9 @@ namespace Tapeti.Default (?(?<=[A-Z])[A-Z](?=[a-z])|[A-Z]) )"; - private static readonly Regex SeparatorRegex = new Regex(SeparatorPattern, RegexOptions.IgnorePatternWhitespace | RegexOptions.Compiled); + private static readonly Regex SeparatorRegex = new(SeparatorPattern, RegexOptions.IgnorePatternWhitespace | RegexOptions.Compiled); - private static readonly ConcurrentDictionary RoutingKeyCache = new ConcurrentDictionary(); + private static readonly ConcurrentDictionary RoutingKeyCache = new(); /// diff --git a/Tapeti/Helpers/DictionaryHelper.cs b/Tapeti/Helpers/DictionaryHelper.cs new file mode 100644 index 0000000..ea41b7a --- /dev/null +++ b/Tapeti/Helpers/DictionaryHelper.cs @@ -0,0 +1,30 @@ +using System.Collections.Generic; + +namespace Tapeti.Helpers +{ + /// + /// Provides extension methods for dictionaries. + /// + public static class DictionaryHelper + { + /// + /// Checks if two dictionaries are considered compatible. If either is null they are considered empty. + /// + public static bool NullSafeSameValues(this IReadOnlyDictionary arguments1, IReadOnlyDictionary arguments2) + { + if (arguments1 == null || arguments2 == null) + return (arguments1 == null || arguments1.Count == 0) && (arguments2 == null || arguments2.Count == 0); + + if (arguments1.Count != arguments2.Count) + return false; + + foreach (var pair in arguments1) + { + if (!arguments2.TryGetValue(pair.Key, out var value2) || value2 != arguments1[pair.Key]) + return false; + } + + return true; + } + } +} diff --git a/Tapeti/ILogger.cs b/Tapeti/ILogger.cs index b9bd22f..58e4c48 100644 --- a/Tapeti/ILogger.cs +++ b/Tapeti/ILogger.cs @@ -138,8 +138,9 @@ namespace Tapeti /// If the queue already exists but should be compatible QueueDeclare will be called instead. /// /// The name of the queue that is declared - /// The x-arguments of the existing queue - void QueueExistsWarning(string queueName, Dictionary arguments); + /// The x-arguments of the existing queue + /// The x-arguments of the queue that would be declared + void QueueExistsWarning(string queueName, IReadOnlyDictionary existingArguments, IReadOnlyDictionary arguments); /// /// Called before a binding is added to a queue. diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 8d7cdb7..4dae106 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -1,4 +1,4 @@ - + net6.0;net7.0 @@ -42,6 +42,6 @@ - + diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index be4d4e1..ec2234c 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -18,7 +18,7 @@ namespace Tapeti public class TapetiConfig : ITapetiConfigBuilder, ITapetiConfigBuilderAccess { private Config config; - private readonly List bindingMiddleware = new List(); + private readonly List bindingMiddleware = new(); /// @@ -225,9 +225,9 @@ namespace Tapeti /// internal class Config : ITapetiConfig { - private readonly ConfigFeatures features = new ConfigFeatures(); - private readonly ConfigMiddleware middleware = new ConfigMiddleware(); - private readonly ConfigBindings bindings = new ConfigBindings(); + private readonly ConfigFeatures features = new(); + private readonly ConfigMiddleware middleware = new(); + private readonly ConfigBindings bindings = new(); public IDependencyResolver DependencyResolver { get; } public ITapetiConfigFeatues Features => features; @@ -291,8 +291,8 @@ namespace Tapeti internal class ConfigMiddleware : ITapetiConfigMiddleware { - private readonly List messageMiddleware = new List(); - private readonly List publishMiddleware = new List(); + private readonly List messageMiddleware = new(); + private readonly List publishMiddleware = new(); public IReadOnlyList Message => messageMiddleware; diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs index 7d11b38..95e7643 100644 --- a/Tapeti/TapetiConfigControllers.cs +++ b/Tapeti/TapetiConfigControllers.cs @@ -1,4 +1,5 @@ -using System; +using System; +using System.Collections.Generic; using System.Linq; using System.Reflection; using Tapeti.Annotations; @@ -79,7 +80,7 @@ namespace Tapeti } var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo; - if (!(methodQueueInfo is { IsValid: true })) + if (methodQueueInfo is not { IsValid: true }) throw new TopologyConfigurationException( $"Method {method.Name} or controller {controller.Name} requires a queue attribute"); @@ -136,12 +137,59 @@ namespace Tapeti if (dynamicQueueAttribute != null && durableQueueAttribute != null) throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on controller {member.DeclaringType?.Name} method {member.Name}"); + + var queueArgumentsAttribute = member.GetCustomAttribute(); + if (dynamicQueueAttribute != null) - return new ControllerMethodBinding.QueueInfo { QueueType = QueueType.Dynamic, Name = dynamicQueueAttribute.Prefix }; + return new ControllerMethodBinding.QueueInfo { QueueType = QueueType.Dynamic, Name = dynamicQueueAttribute.Prefix, QueueArguments = GetQueueArguments(queueArgumentsAttribute) }; return durableQueueAttribute != null - ? new ControllerMethodBinding.QueueInfo { QueueType = QueueType.Durable, Name = durableQueueAttribute.Name } + ? new ControllerMethodBinding.QueueInfo { QueueType = QueueType.Durable, Name = durableQueueAttribute.Name, QueueArguments = GetQueueArguments(queueArgumentsAttribute) } : null; } + + + private static IReadOnlyDictionary GetQueueArguments(QueueArgumentsAttribute queueArgumentsAttribute) + { + if (queueArgumentsAttribute == null) + return null; + + #if NETSTANDARD2_1_OR_GREATER + var arguments = new Dictionary(queueArgumentsAttribute.CustomArguments); + #else + var arguments = new Dictionary(); + foreach (var pair in queueArgumentsAttribute.CustomArguments) + arguments.Add(pair.Key, pair.Value); + #endif + + if (queueArgumentsAttribute.MaxLength > 0) + arguments.Add(@"x-max-length", queueArgumentsAttribute.MaxLength.ToString()); + + if (queueArgumentsAttribute.MaxLengthBytes > 0) + arguments.Add(@"x-max-length-bytes", queueArgumentsAttribute.MaxLengthBytes.ToString()); + + if (queueArgumentsAttribute.MessageTTL > 0) + arguments.Add(@"x-message-ttl", queueArgumentsAttribute.MessageTTL.ToString()); + + switch (queueArgumentsAttribute.Overflow) + { + case RabbitMQOverflow.NotSpecified: + break; + case RabbitMQOverflow.DropHead: + arguments.Add(@"x-overflow", @"drop-head"); + break; + case RabbitMQOverflow.RejectPublish: + arguments.Add(@"x-overflow", @"reject-publish"); + break; + case RabbitMQOverflow.RejectPublishDeadletter: + arguments.Add(@"x-overflow", @"reject-publish-dlx"); + break; + default: + throw new ArgumentOutOfRangeException(nameof(queueArgumentsAttribute.Overflow), queueArgumentsAttribute.Overflow, "Unsupported Overflow value"); + } + + + return arguments.Count > 0 ? arguments : null; + } } } diff --git a/Tapeti/Tasks/SingleThreadTaskQueue.cs b/Tapeti/Tasks/SingleThreadTaskQueue.cs index 9272a1e..c08a1d4 100644 --- a/Tapeti/Tasks/SingleThreadTaskQueue.cs +++ b/Tapeti/Tasks/SingleThreadTaskQueue.cs @@ -12,10 +12,10 @@ namespace Tapeti.Tasks /// public class SingleThreadTaskQueue : IDisposable { - private readonly object previousTaskLock = new object(); + private readonly object previousTaskLock = new(); private Task previousTask = Task.CompletedTask; - private readonly Lazy singleThreadScheduler = new Lazy(); + private readonly Lazy singleThreadScheduler = new(); /// @@ -70,7 +70,7 @@ namespace Tapeti.Tasks public override int MaximumConcurrencyLevel => 1; - private readonly Queue scheduledTasks = new Queue(); + private readonly Queue scheduledTasks = new(); private bool disposed;