diff --git a/Tapeti.Annotations/DynamicQueueAttribute.cs b/Tapeti.Annotations/DynamicQueueAttribute.cs
index 3d730c9..5fe9525 100644
--- a/Tapeti.Annotations/DynamicQueueAttribute.cs
+++ b/Tapeti.Annotations/DynamicQueueAttribute.cs
@@ -9,5 +9,18 @@ namespace Tapeti.Annotations
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
public class DynamicQueueAttribute : Attribute
{
+ public string Prefix { get; set; }
+
+
+ ///
+ /// If prefix is specified, Tapeti will compose the queue name using the
+ /// prefix and a unique ID. If not specified, an empty queue name will be passed
+ /// to RabbitMQ thus letting it create a unique queue name.
+ ///
+ ///
+ public DynamicQueueAttribute(string prefix = null)
+ {
+ Prefix = prefix;
+ }
}
}
diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs
index e539bf7..ccd6b91 100644
--- a/Tapeti/Connection/TapetiWorker.cs
+++ b/Tapeti/Connection/TapetiWorker.cs
@@ -70,7 +70,7 @@ namespace Tapeti.Connection
if (queue.Dynamic)
{
- var dynamicQueue = channel.QueueDeclare();
+ var dynamicQueue = channel.QueueDeclare(queue.Name);
(queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName);
foreach (var binding in queue.Bindings)
diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs
index 117c4c9..7923635 100644
--- a/Tapeti/TapetiConfig.cs
+++ b/Tapeti/TapetiConfig.cs
@@ -22,7 +22,7 @@ namespace Tapeti
public class TapetiConfig
{
private readonly Dictionary> staticRegistrations = new Dictionary>();
- private readonly Dictionary> dynamicRegistrations = new Dictionary>();
+ private readonly Dictionary>> dynamicRegistrations = new Dictionary>>();
private readonly List bindingMiddleware = new List();
private readonly List messageMiddleware = new List();
@@ -49,20 +49,47 @@ namespace Tapeti
var queues = new List();
queues.AddRange(staticRegistrations.Select(qb => new Queue(new QueueInfo { Dynamic = false, Name = qb.Key }, qb.Value)));
- // Group all bindings with the same index into queues, this will
- // ensure each message type is unique on their queue
- var dynamicBindings = new List>();
- foreach (var bindings in dynamicRegistrations.Values)
+
+ // We want to ensure each queue only has unique messages classes. This means we can requeue
+ // without the side-effect of calling other handlers for the same message class again as well.
+ //
+ // Since I had trouble deciphering this code after a year, here's an overview of how it achieves this grouping
+ // and how the bindingIndex is relevant:
+ //
+ // dynamicRegistrations:
+ // Key (prefix)
+ // ""
+ // Key (message class) Value (list of bindings)
+ // A binding1, binding2, binding3
+ // B binding4
+ // "prefix"
+ // A binding5, binding6
+ //
+ // By combining all bindings with the same index, per prefix, the following queues will be registered:
+ //
+ // Prefix Bindings
+ // "" binding1 (message A), binding4 (message B)
+ // "" binding2 (message A)
+ // "" binding3 (message A)
+ // "prefix" binding5 (message A)
+ // "prefix" binding6 (message A)
+ //
+ foreach (var prefixGroup in dynamicRegistrations)
{
- while (dynamicBindings.Count < bindings.Count)
- dynamicBindings.Add(new List());
+ var dynamicBindings = new List>();
- for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++)
- dynamicBindings[bindingIndex].Add(bindings[bindingIndex]);
+ foreach (var bindings in prefixGroup.Value.Values)
+ {
+ while (dynamicBindings.Count < bindings.Count)
+ dynamicBindings.Add(new List());
+
+ for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++)
+ dynamicBindings[bindingIndex].Add(bindings[bindingIndex]);
+ }
+
+ queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl)));
}
- queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl)));
-
var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues);
(dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config);
@@ -317,10 +344,21 @@ namespace Tapeti
protected void AddDynamicRegistration(IBindingContext context, Binding binding)
{
- if (dynamicRegistrations.ContainsKey(context.MessageClass))
- dynamicRegistrations[context.MessageClass].Add(binding);
- else
- dynamicRegistrations.Add(context.MessageClass, new List { binding });
+ var prefix = binding.QueueInfo.Name ?? "";
+
+ if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary> prefixRegistrations))
+ {
+ prefixRegistrations = new Dictionary>();
+ dynamicRegistrations.Add(prefix, prefixRegistrations);
+ }
+
+ if (!prefixRegistrations.TryGetValue(context.MessageClass, out List bindings))
+ {
+ bindings = new List();
+ prefixRegistrations.Add(context.MessageClass, bindings);
+ }
+
+ bindings.Add(binding);
}
@@ -333,7 +371,7 @@ namespace Tapeti
throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on {member.Name}");
if (dynamicQueueAttribute != null)
- return new QueueInfo { Dynamic = true };
+ return new QueueInfo { Dynamic = true, Name = dynamicQueueAttribute.Prefix };
if (durableQueueAttribute != null)
return new QueueInfo { Dynamic = false, Name = durableQueueAttribute.Name };
@@ -342,6 +380,15 @@ namespace Tapeti
}
+ protected string GetDynamicQueueName(string prefix)
+ {
+ if (String.IsNullOrEmpty(prefix))
+ return "";
+
+ return prefix + "." + Guid.NewGuid().ToString("N");
+ }
+
+
protected class QueueInfo
{
public bool? Dynamic { get; set; }
diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs
index fe78c4b..a2df352 100644
--- a/Test/MarcoController.cs
+++ b/Test/MarcoController.cs
@@ -134,6 +134,7 @@ namespace Test
+ [DynamicQueue("custom.prefix")]
public void Polo(PoloMessage message)
{
Console.WriteLine(">> Polo");