From 247da184fa9ea6ec47dafe3e4744b04b30fdd676 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 31 Aug 2018 19:17:56 +0200 Subject: [PATCH] Added prefix option for dynamic queues --- Tapeti.Annotations/DynamicQueueAttribute.cs | 13 ++++ Tapeti/Connection/TapetiWorker.cs | 2 +- Tapeti/TapetiConfig.cs | 79 ++++++++++++++++----- Test/MarcoController.cs | 1 + 4 files changed, 78 insertions(+), 17 deletions(-) diff --git a/Tapeti.Annotations/DynamicQueueAttribute.cs b/Tapeti.Annotations/DynamicQueueAttribute.cs index 3d730c9..5fe9525 100644 --- a/Tapeti.Annotations/DynamicQueueAttribute.cs +++ b/Tapeti.Annotations/DynamicQueueAttribute.cs @@ -9,5 +9,18 @@ namespace Tapeti.Annotations [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] public class DynamicQueueAttribute : Attribute { + public string Prefix { get; set; } + + + /// + /// If prefix is specified, Tapeti will compose the queue name using the + /// prefix and a unique ID. If not specified, an empty queue name will be passed + /// to RabbitMQ thus letting it create a unique queue name. + /// + /// + public DynamicQueueAttribute(string prefix = null) + { + Prefix = prefix; + } } } diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index e539bf7..ccd6b91 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -70,7 +70,7 @@ namespace Tapeti.Connection if (queue.Dynamic) { - var dynamicQueue = channel.QueueDeclare(); + var dynamicQueue = channel.QueueDeclare(queue.Name); (queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName); foreach (var binding in queue.Bindings) diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 117c4c9..7923635 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -22,7 +22,7 @@ namespace Tapeti public class TapetiConfig { private readonly Dictionary> staticRegistrations = new Dictionary>(); - private readonly Dictionary> dynamicRegistrations = new Dictionary>(); + private readonly Dictionary>> dynamicRegistrations = new Dictionary>>(); private readonly List bindingMiddleware = new List(); private readonly List messageMiddleware = new List(); @@ -49,20 +49,47 @@ namespace Tapeti var queues = new List(); queues.AddRange(staticRegistrations.Select(qb => new Queue(new QueueInfo { Dynamic = false, Name = qb.Key }, qb.Value))); - // Group all bindings with the same index into queues, this will - // ensure each message type is unique on their queue - var dynamicBindings = new List>(); - foreach (var bindings in dynamicRegistrations.Values) + + // We want to ensure each queue only has unique messages classes. This means we can requeue + // without the side-effect of calling other handlers for the same message class again as well. + // + // Since I had trouble deciphering this code after a year, here's an overview of how it achieves this grouping + // and how the bindingIndex is relevant: + // + // dynamicRegistrations: + // Key (prefix) + // "" + // Key (message class) Value (list of bindings) + // A binding1, binding2, binding3 + // B binding4 + // "prefix" + // A binding5, binding6 + // + // By combining all bindings with the same index, per prefix, the following queues will be registered: + // + // Prefix Bindings + // "" binding1 (message A), binding4 (message B) + // "" binding2 (message A) + // "" binding3 (message A) + // "prefix" binding5 (message A) + // "prefix" binding6 (message A) + // + foreach (var prefixGroup in dynamicRegistrations) { - while (dynamicBindings.Count < bindings.Count) - dynamicBindings.Add(new List()); + var dynamicBindings = new List>(); - for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++) - dynamicBindings[bindingIndex].Add(bindings[bindingIndex]); + foreach (var bindings in prefixGroup.Value.Values) + { + while (dynamicBindings.Count < bindings.Count) + dynamicBindings.Add(new List()); + + for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++) + dynamicBindings[bindingIndex].Add(bindings[bindingIndex]); + } + + queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl))); } - queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl))); - var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues); (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); @@ -317,10 +344,21 @@ namespace Tapeti protected void AddDynamicRegistration(IBindingContext context, Binding binding) { - if (dynamicRegistrations.ContainsKey(context.MessageClass)) - dynamicRegistrations[context.MessageClass].Add(binding); - else - dynamicRegistrations.Add(context.MessageClass, new List { binding }); + var prefix = binding.QueueInfo.Name ?? ""; + + if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary> prefixRegistrations)) + { + prefixRegistrations = new Dictionary>(); + dynamicRegistrations.Add(prefix, prefixRegistrations); + } + + if (!prefixRegistrations.TryGetValue(context.MessageClass, out List bindings)) + { + bindings = new List(); + prefixRegistrations.Add(context.MessageClass, bindings); + } + + bindings.Add(binding); } @@ -333,7 +371,7 @@ namespace Tapeti throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on {member.Name}"); if (dynamicQueueAttribute != null) - return new QueueInfo { Dynamic = true }; + return new QueueInfo { Dynamic = true, Name = dynamicQueueAttribute.Prefix }; if (durableQueueAttribute != null) return new QueueInfo { Dynamic = false, Name = durableQueueAttribute.Name }; @@ -342,6 +380,15 @@ namespace Tapeti } + protected string GetDynamicQueueName(string prefix) + { + if (String.IsNullOrEmpty(prefix)) + return ""; + + return prefix + "." + Guid.NewGuid().ToString("N"); + } + + protected class QueueInfo { public bool? Dynamic { get; set; } diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index fe78c4b..a2df352 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -134,6 +134,7 @@ namespace Test + [DynamicQueue("custom.prefix")] public void Polo(PoloMessage message) { Console.WriteLine(">> Polo");