From 458af0640d0bf2ae30332092e4669cdce396dd5e Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Sun, 12 Feb 2017 15:18:12 +0100 Subject: [PATCH] Fixed #9: Allow subscribing to queues before consuming --- Tapeti/Config/IConfig.cs | 6 ++++++ Tapeti/Connection/TapetiSubscriber.cs | 19 ++++++++++++++++--- Tapeti/Connection/TapetiWorker.cs | 20 ++++++++++---------- Tapeti/ISubscriber.cs | 5 ++++- Tapeti/TapetiConfig.cs | 10 ++++++++-- Tapeti/TapetiConnection.cs | 10 +++++++--- 6 files changed, 51 insertions(+), 19 deletions(-) diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs index e9b2ba9..daad63e 100644 --- a/Tapeti/Config/IConfig.cs +++ b/Tapeti/Config/IConfig.cs @@ -24,6 +24,12 @@ namespace Tapeti.Config } + public interface IDynamicQueue : IQueue + { + void SetName(string name); + } + + public interface IBinding { Type Controller { get; } diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index f6303cb..0202e1b 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -9,17 +9,30 @@ namespace Tapeti.Connection public class TapetiSubscriber : ISubscriber { private readonly Func workerFactory; + private readonly List queues; + private bool consuming; - public TapetiSubscriber(Func workerFactory) + public TapetiSubscriber(Func workerFactory, IEnumerable queues) { this.workerFactory = workerFactory; + this.queues = queues.ToList(); } - public async Task BindQueues(IEnumerable queues) + public Task BindQueues() + { + return Task.WhenAll(queues.Select(queue => workerFactory().Subscribe(queue)).ToList()); + } + + + public Task Resume() { - await Task.WhenAll(queues.Select(queue => workerFactory().Subscribe(queue)).ToList()); + if (consuming) + return Task.CompletedTask; + + consuming = true; + return Task.WhenAll(queues.Select(queue => workerFactory().Consume(queue.Name, queue.Bindings)).ToList()); } } } diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index 9c6bf04..c7affe9 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -48,6 +48,9 @@ namespace Tapeti.Connection public Task Consume(string queueName, IEnumerable bindings) { + if (string.IsNullOrEmpty(queueName)) + throw new ArgumentNullException(nameof(queueName)); + return taskQueue.Value.Add(async () => { (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, dependencyResolver, bindings, messageMiddleware)); @@ -55,32 +58,29 @@ namespace Tapeti.Connection } - public async Task Subscribe(IQueue queue) + public Task Subscribe(IQueue queue) { - var queueName = await taskQueue.Value.Add(async () => + return taskQueue.Value.Add(async () => { var channel = await GetChannel(); if (queue.Dynamic) { var dynamicQueue = channel.QueueDeclare(); + (queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName); foreach (var binding in queue.Bindings) { var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); - channel.QueueBind(dynamicQueue.QueueName, exchangeStrategy.GetExchange(binding.MessageClass), routingKey); + var exchange = exchangeStrategy.GetExchange(binding.MessageClass); + channel.QueueBind(dynamicQueue.QueueName, exchange, routingKey); (binding as IDynamicQueueBinding)?.SetQueueName(dynamicQueue.QueueName); } - - return dynamicQueue.QueueName; } - - channel.QueueDeclarePassive(queue.Name); - return queue.Name; + else + channel.QueueDeclarePassive(queue.Name); }).Unwrap(); - - await Consume(queueName, queue.Bindings); } diff --git a/Tapeti/ISubscriber.cs b/Tapeti/ISubscriber.cs index fae4328..06a76da 100644 --- a/Tapeti/ISubscriber.cs +++ b/Tapeti/ISubscriber.cs @@ -1,6 +1,9 @@ -namespace Tapeti +using System.Threading.Tasks; + +namespace Tapeti { public interface ISubscriber { + Task Resume(); } } diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 8f06369..b90299e 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -342,10 +342,10 @@ namespace Tapeti } - protected class Queue : IQueue + protected class Queue : IDynamicQueue { public bool Dynamic { get; } - public string Name { get; } + public string Name { get; set; } public IEnumerable Bindings { get; } @@ -355,6 +355,12 @@ namespace Tapeti Name = queue.Name; Bindings = bindings; } + + + public void SetName(string name) + { + Name = name; + } } diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index e59e1e2..0b0d0ae 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Threading.Tasks; using Tapeti.Config; using Tapeti.Connection; @@ -25,10 +26,13 @@ namespace Tapeti } - public async Task Subscribe() + public async Task Subscribe(bool startConsuming = true) { - var subscriber = new TapetiSubscriber(() => worker.Value); - await subscriber.BindQueues(config.Queues); + var subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList()); + await subscriber.BindQueues(); + + if (startConsuming) + await subscriber.Resume(); return subscriber; }