Merge pull request #3 from MvRens/feature/DynamicQueueStrategy
Added prefix option for dynamic queues
This commit is contained in:
commit
455a7a204e
@ -9,5 +9,18 @@ namespace Tapeti.Annotations
|
||||
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
|
||||
public class DynamicQueueAttribute : Attribute
|
||||
{
|
||||
public string Prefix { get; set; }
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// If prefix is specified, Tapeti will compose the queue name using the
|
||||
/// prefix and a unique ID. If not specified, an empty queue name will be passed
|
||||
/// to RabbitMQ thus letting it create a unique queue name.
|
||||
/// </summary>
|
||||
/// <param name="prefix"></param>
|
||||
public DynamicQueueAttribute(string prefix = null)
|
||||
{
|
||||
Prefix = prefix;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -70,7 +70,7 @@ namespace Tapeti.Connection
|
||||
|
||||
if (queue.Dynamic)
|
||||
{
|
||||
var dynamicQueue = channel.QueueDeclare();
|
||||
var dynamicQueue = channel.QueueDeclare(queue.Name);
|
||||
(queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName);
|
||||
|
||||
foreach (var binding in queue.Bindings)
|
||||
|
@ -22,7 +22,7 @@ namespace Tapeti
|
||||
public class TapetiConfig
|
||||
{
|
||||
private readonly Dictionary<string, List<Binding>> staticRegistrations = new Dictionary<string, List<Binding>>();
|
||||
private readonly Dictionary<Type, List<Binding>> dynamicRegistrations = new Dictionary<Type, List<Binding>>();
|
||||
private readonly Dictionary<string, Dictionary<Type, List<Binding>>> dynamicRegistrations = new Dictionary<string, Dictionary<Type, List<Binding>>>();
|
||||
|
||||
private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>();
|
||||
private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>();
|
||||
@ -49,10 +49,36 @@ namespace Tapeti
|
||||
var queues = new List<IQueue>();
|
||||
queues.AddRange(staticRegistrations.Select(qb => new Queue(new QueueInfo { Dynamic = false, Name = qb.Key }, qb.Value)));
|
||||
|
||||
// Group all bindings with the same index into queues, this will
|
||||
// ensure each message type is unique on their queue
|
||||
|
||||
// We want to ensure each queue only has unique messages classes. This means we can requeue
|
||||
// without the side-effect of calling other handlers for the same message class again as well.
|
||||
//
|
||||
// Since I had trouble deciphering this code after a year, here's an overview of how it achieves this grouping
|
||||
// and how the bindingIndex is relevant:
|
||||
//
|
||||
// dynamicRegistrations:
|
||||
// Key (prefix)
|
||||
// ""
|
||||
// Key (message class) Value (list of bindings)
|
||||
// A binding1, binding2, binding3
|
||||
// B binding4
|
||||
// "prefix"
|
||||
// A binding5, binding6
|
||||
//
|
||||
// By combining all bindings with the same index, per prefix, the following queues will be registered:
|
||||
//
|
||||
// Prefix Bindings
|
||||
// "" binding1 (message A), binding4 (message B)
|
||||
// "" binding2 (message A)
|
||||
// "" binding3 (message A)
|
||||
// "prefix" binding5 (message A)
|
||||
// "prefix" binding6 (message A)
|
||||
//
|
||||
foreach (var prefixGroup in dynamicRegistrations)
|
||||
{
|
||||
var dynamicBindings = new List<List<Binding>>();
|
||||
foreach (var bindings in dynamicRegistrations.Values)
|
||||
|
||||
foreach (var bindings in prefixGroup.Value.Values)
|
||||
{
|
||||
while (dynamicBindings.Count < bindings.Count)
|
||||
dynamicBindings.Add(new List<Binding>());
|
||||
@ -61,7 +87,8 @@ namespace Tapeti
|
||||
dynamicBindings[bindingIndex].Add(bindings[bindingIndex]);
|
||||
}
|
||||
|
||||
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl)));
|
||||
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl)));
|
||||
}
|
||||
|
||||
var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues);
|
||||
(dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<IConfig>(config);
|
||||
@ -317,10 +344,21 @@ namespace Tapeti
|
||||
|
||||
protected void AddDynamicRegistration(IBindingContext context, Binding binding)
|
||||
{
|
||||
if (dynamicRegistrations.ContainsKey(context.MessageClass))
|
||||
dynamicRegistrations[context.MessageClass].Add(binding);
|
||||
else
|
||||
dynamicRegistrations.Add(context.MessageClass, new List<Binding> { binding });
|
||||
var prefix = binding.QueueInfo.Name ?? "";
|
||||
|
||||
if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary<Type, List<Binding>> prefixRegistrations))
|
||||
{
|
||||
prefixRegistrations = new Dictionary<Type, List<Binding>>();
|
||||
dynamicRegistrations.Add(prefix, prefixRegistrations);
|
||||
}
|
||||
|
||||
if (!prefixRegistrations.TryGetValue(context.MessageClass, out List<Binding> bindings))
|
||||
{
|
||||
bindings = new List<Binding>();
|
||||
prefixRegistrations.Add(context.MessageClass, bindings);
|
||||
}
|
||||
|
||||
bindings.Add(binding);
|
||||
}
|
||||
|
||||
|
||||
@ -333,7 +371,7 @@ namespace Tapeti
|
||||
throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on {member.Name}");
|
||||
|
||||
if (dynamicQueueAttribute != null)
|
||||
return new QueueInfo { Dynamic = true };
|
||||
return new QueueInfo { Dynamic = true, Name = dynamicQueueAttribute.Prefix };
|
||||
|
||||
if (durableQueueAttribute != null)
|
||||
return new QueueInfo { Dynamic = false, Name = durableQueueAttribute.Name };
|
||||
@ -342,6 +380,15 @@ namespace Tapeti
|
||||
}
|
||||
|
||||
|
||||
protected string GetDynamicQueueName(string prefix)
|
||||
{
|
||||
if (String.IsNullOrEmpty(prefix))
|
||||
return "";
|
||||
|
||||
return prefix + "." + Guid.NewGuid().ToString("N");
|
||||
}
|
||||
|
||||
|
||||
protected class QueueInfo
|
||||
{
|
||||
public bool? Dynamic { get; set; }
|
||||
|
@ -134,6 +134,7 @@ namespace Test
|
||||
|
||||
|
||||
|
||||
[DynamicQueue("custom.prefix")]
|
||||
public void Polo(PoloMessage message)
|
||||
{
|
||||
Console.WriteLine(">> Polo");
|
||||
|
Loading…
Reference in New Issue
Block a user