1
0
mirror of synced 2024-11-16 14:53:50 +00:00

Added prefix option for dynamic queues

This commit is contained in:
Mark van Renswoude 2018-08-31 19:17:56 +02:00
parent 2f0b9ee847
commit 247da184fa
4 changed files with 78 additions and 17 deletions

View File

@ -9,5 +9,18 @@ namespace Tapeti.Annotations
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
public class DynamicQueueAttribute : Attribute public class DynamicQueueAttribute : Attribute
{ {
public string Prefix { get; set; }
/// <summary>
/// If prefix is specified, Tapeti will compose the queue name using the
/// prefix and a unique ID. If not specified, an empty queue name will be passed
/// to RabbitMQ thus letting it create a unique queue name.
/// </summary>
/// <param name="prefix"></param>
public DynamicQueueAttribute(string prefix = null)
{
Prefix = prefix;
}
} }
} }

View File

@ -70,7 +70,7 @@ namespace Tapeti.Connection
if (queue.Dynamic) if (queue.Dynamic)
{ {
var dynamicQueue = channel.QueueDeclare(); var dynamicQueue = channel.QueueDeclare(queue.Name);
(queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName); (queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName);
foreach (var binding in queue.Bindings) foreach (var binding in queue.Bindings)

View File

@ -22,7 +22,7 @@ namespace Tapeti
public class TapetiConfig public class TapetiConfig
{ {
private readonly Dictionary<string, List<Binding>> staticRegistrations = new Dictionary<string, List<Binding>>(); private readonly Dictionary<string, List<Binding>> staticRegistrations = new Dictionary<string, List<Binding>>();
private readonly Dictionary<Type, List<Binding>> dynamicRegistrations = new Dictionary<Type, List<Binding>>(); private readonly Dictionary<string, Dictionary<Type, List<Binding>>> dynamicRegistrations = new Dictionary<string, Dictionary<Type, List<Binding>>>();
private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>(); private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>();
private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>(); private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>();
@ -49,10 +49,36 @@ namespace Tapeti
var queues = new List<IQueue>(); var queues = new List<IQueue>();
queues.AddRange(staticRegistrations.Select(qb => new Queue(new QueueInfo { Dynamic = false, Name = qb.Key }, qb.Value))); queues.AddRange(staticRegistrations.Select(qb => new Queue(new QueueInfo { Dynamic = false, Name = qb.Key }, qb.Value)));
// Group all bindings with the same index into queues, this will
// ensure each message type is unique on their queue // We want to ensure each queue only has unique messages classes. This means we can requeue
// without the side-effect of calling other handlers for the same message class again as well.
//
// Since I had trouble deciphering this code after a year, here's an overview of how it achieves this grouping
// and how the bindingIndex is relevant:
//
// dynamicRegistrations:
// Key (prefix)
// ""
// Key (message class) Value (list of bindings)
// A binding1, binding2, binding3
// B binding4
// "prefix"
// A binding5, binding6
//
// By combining all bindings with the same index, per prefix, the following queues will be registered:
//
// Prefix Bindings
// "" binding1 (message A), binding4 (message B)
// "" binding2 (message A)
// "" binding3 (message A)
// "prefix" binding5 (message A)
// "prefix" binding6 (message A)
//
foreach (var prefixGroup in dynamicRegistrations)
{
var dynamicBindings = new List<List<Binding>>(); var dynamicBindings = new List<List<Binding>>();
foreach (var bindings in dynamicRegistrations.Values)
foreach (var bindings in prefixGroup.Value.Values)
{ {
while (dynamicBindings.Count < bindings.Count) while (dynamicBindings.Count < bindings.Count)
dynamicBindings.Add(new List<Binding>()); dynamicBindings.Add(new List<Binding>());
@ -61,7 +87,8 @@ namespace Tapeti
dynamicBindings[bindingIndex].Add(bindings[bindingIndex]); dynamicBindings[bindingIndex].Add(bindings[bindingIndex]);
} }
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl))); queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl)));
}
var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues); var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues);
(dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<IConfig>(config); (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<IConfig>(config);
@ -317,10 +344,21 @@ namespace Tapeti
protected void AddDynamicRegistration(IBindingContext context, Binding binding) protected void AddDynamicRegistration(IBindingContext context, Binding binding)
{ {
if (dynamicRegistrations.ContainsKey(context.MessageClass)) var prefix = binding.QueueInfo.Name ?? "";
dynamicRegistrations[context.MessageClass].Add(binding);
else if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary<Type, List<Binding>> prefixRegistrations))
dynamicRegistrations.Add(context.MessageClass, new List<Binding> { binding }); {
prefixRegistrations = new Dictionary<Type, List<Binding>>();
dynamicRegistrations.Add(prefix, prefixRegistrations);
}
if (!prefixRegistrations.TryGetValue(context.MessageClass, out List<Binding> bindings))
{
bindings = new List<Binding>();
prefixRegistrations.Add(context.MessageClass, bindings);
}
bindings.Add(binding);
} }
@ -333,7 +371,7 @@ namespace Tapeti
throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on {member.Name}"); throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on {member.Name}");
if (dynamicQueueAttribute != null) if (dynamicQueueAttribute != null)
return new QueueInfo { Dynamic = true }; return new QueueInfo { Dynamic = true, Name = dynamicQueueAttribute.Prefix };
if (durableQueueAttribute != null) if (durableQueueAttribute != null)
return new QueueInfo { Dynamic = false, Name = durableQueueAttribute.Name }; return new QueueInfo { Dynamic = false, Name = durableQueueAttribute.Name };
@ -342,6 +380,15 @@ namespace Tapeti
} }
protected string GetDynamicQueueName(string prefix)
{
if (String.IsNullOrEmpty(prefix))
return "";
return prefix + "." + Guid.NewGuid().ToString("N");
}
protected class QueueInfo protected class QueueInfo
{ {
public bool? Dynamic { get; set; } public bool? Dynamic { get; set; }

View File

@ -134,6 +134,7 @@ namespace Test
[DynamicQueue("custom.prefix")]
public void Polo(PoloMessage message) public void Polo(PoloMessage message)
{ {
Console.WriteLine(">> Polo"); Console.WriteLine(">> Polo");