1
0
mirror of synced 2025-01-23 00:23:06 +01:00

Fixed #9: Allow subscribing to queues before consuming

This commit is contained in:
Mark van Renswoude 2017-02-12 15:18:12 +01:00
parent cc7b0cd042
commit 458af0640d
6 changed files with 51 additions and 19 deletions

View File

@ -24,6 +24,12 @@ namespace Tapeti.Config
}
public interface IDynamicQueue : IQueue
{
void SetName(string name);
}
public interface IBinding
{
Type Controller { get; }

View File

@ -9,17 +9,30 @@ namespace Tapeti.Connection
public class TapetiSubscriber : ISubscriber
{
private readonly Func<TapetiWorker> workerFactory;
private readonly List<IQueue> queues;
private bool consuming;
public TapetiSubscriber(Func<TapetiWorker> workerFactory)
public TapetiSubscriber(Func<TapetiWorker> workerFactory, IEnumerable<IQueue> queues)
{
this.workerFactory = workerFactory;
this.queues = queues.ToList();
}
public async Task BindQueues(IEnumerable<IQueue> queues)
public Task BindQueues()
{
await Task.WhenAll(queues.Select(queue => workerFactory().Subscribe(queue)).ToList());
return Task.WhenAll(queues.Select(queue => workerFactory().Subscribe(queue)).ToList());
}
public Task Resume()
{
if (consuming)
return Task.CompletedTask;
consuming = true;
return Task.WhenAll(queues.Select(queue => workerFactory().Consume(queue.Name, queue.Bindings)).ToList());
}
}
}

View File

@ -48,6 +48,9 @@ namespace Tapeti.Connection
public Task Consume(string queueName, IEnumerable<IBinding> bindings)
{
if (string.IsNullOrEmpty(queueName))
throw new ArgumentNullException(nameof(queueName));
return taskQueue.Value.Add(async () =>
{
(await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, dependencyResolver, bindings, messageMiddleware));
@ -55,32 +58,29 @@ namespace Tapeti.Connection
}
public async Task Subscribe(IQueue queue)
public Task Subscribe(IQueue queue)
{
var queueName = await taskQueue.Value.Add(async () =>
return taskQueue.Value.Add(async () =>
{
var channel = await GetChannel();
if (queue.Dynamic)
{
var dynamicQueue = channel.QueueDeclare();
(queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName);
foreach (var binding in queue.Bindings)
{
var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass);
channel.QueueBind(dynamicQueue.QueueName, exchangeStrategy.GetExchange(binding.MessageClass), routingKey);
var exchange = exchangeStrategy.GetExchange(binding.MessageClass);
channel.QueueBind(dynamicQueue.QueueName, exchange, routingKey);
(binding as IDynamicQueueBinding)?.SetQueueName(dynamicQueue.QueueName);
}
return dynamicQueue.QueueName;
}
channel.QueueDeclarePassive(queue.Name);
return queue.Name;
else
channel.QueueDeclarePassive(queue.Name);
}).Unwrap();
await Consume(queueName, queue.Bindings);
}

View File

@ -1,6 +1,9 @@
namespace Tapeti
using System.Threading.Tasks;
namespace Tapeti
{
public interface ISubscriber
{
Task Resume();
}
}

View File

@ -342,10 +342,10 @@ namespace Tapeti
}
protected class Queue : IQueue
protected class Queue : IDynamicQueue
{
public bool Dynamic { get; }
public string Name { get; }
public string Name { get; set; }
public IEnumerable<IBinding> Bindings { get; }
@ -355,6 +355,12 @@ namespace Tapeti
Name = queue.Name;
Bindings = bindings;
}
public void SetName(string name)
{
Name = name;
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Connection;
@ -25,10 +26,13 @@ namespace Tapeti
}
public async Task<ISubscriber> Subscribe()
public async Task<ISubscriber> Subscribe(bool startConsuming = true)
{
var subscriber = new TapetiSubscriber(() => worker.Value);
await subscriber.BindQueues(config.Queues);
var subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList());
await subscriber.BindQueues();
if (startConsuming)
await subscriber.Resume();
return subscriber;
}