diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs
index f55bb45..13967e7 100644
--- a/Tapeti.Transient/TransientGenericBinding.cs
+++ b/Tapeti.Transient/TransientGenericBinding.cs
@@ -31,7 +31,7 @@ namespace Tapeti.Transient
///
- public async Task Apply(IBindingTarget target)
+ public async ValueTask Apply(IBindingTarget target)
{
QueueName = await target.BindDynamicDirect(dynamicQueuePrefix);
router.TransientResponseQueueName = QueueName;
@@ -46,17 +46,17 @@ namespace Tapeti.Transient
///
- public Task Invoke(IMessageContext context)
+ public ValueTask Invoke(IMessageContext context)
{
router.HandleMessage(context);
- return Task.CompletedTask;
+ return default;
}
///
- public Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
+ public ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult)
{
- return Task.CompletedTask;
+ return default;
}
}
}
\ No newline at end of file
diff --git a/Tapeti/Config/IBinding.cs b/Tapeti/Config/IBinding.cs
index 8cbb45f..dcaa387 100644
--- a/Tapeti/Config/IBinding.cs
+++ b/Tapeti/Config/IBinding.cs
@@ -41,7 +41,7 @@ namespace Tapeti.Config
/// Called after a connection is established to set up the binding.
///
///
- Task Apply(IBindingTarget target);
+ ValueTask Apply(IBindingTarget target);
///
@@ -55,7 +55,7 @@ namespace Tapeti.Config
/// Invokes the handler for the message as specified by the context.
///
///
- Task Invoke(IMessageContext context);
+ ValueTask Invoke(IMessageContext context);
///
@@ -64,7 +64,7 @@ namespace Tapeti.Config
///
///
///
- Task Cleanup(IMessageContext context, ConsumeResult consumeResult);
+ ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult);
}
@@ -80,7 +80,7 @@ namespace Tapeti.Config
///
/// The message class to be bound to the queue
/// The name of the durable queue
- Task BindDurable(Type messageClass, string queueName);
+ ValueTask BindDurable(Type messageClass, string queueName);
///
/// Binds the messageClass to a dynamic auto-delete queue.
@@ -92,14 +92,14 @@ namespace Tapeti.Config
/// The message class to be bound to the queue
/// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.
/// The generated name of the dynamic queue
- Task BindDynamic(Type messageClass, string queuePrefix = null);
+ ValueTask BindDynamic(Type messageClass, string queuePrefix = null);
///
/// Declares a durable queue but does not add a binding for a messageClass' routing key.
/// Used for direct-to-queue messages.
///
/// The name of the durable queue
- Task BindDurableDirect(string queueName);
+ ValueTask BindDurableDirect(string queueName);
///
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
@@ -108,7 +108,7 @@ namespace Tapeti.Config
/// The message class which will be handled on the queue. It is not actually bound to the queue.
/// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.
/// The generated name of the dynamic queue
- Task BindDynamicDirect(Type messageClass = null, string queuePrefix = null);
+ ValueTask BindDynamicDirect(Type messageClass = null, string queuePrefix = null);
///
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
@@ -116,13 +116,13 @@ namespace Tapeti.Config
///
/// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.
/// The generated name of the dynamic queue
- Task BindDynamicDirect(string queuePrefix = null);
+ ValueTask BindDynamicDirect(string queuePrefix = null);
///
/// Marks the specified durable queue as having an obsolete binding. If after all bindings have subscribed, the queue only contains obsolete
/// bindings and is empty, it will be removed.
///
/// The name of the durable queue
- Task BindDurableObsolete(string queueName);
+ ValueTask BindDurableObsolete(string queueName);
}
}
diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs
index 35cbf75..a46d1c8 100644
--- a/Tapeti/Connection/TapetiSubscriber.cs
+++ b/Tapeti/Connection/TapetiSubscriber.cs
@@ -119,7 +119,7 @@ namespace Tapeti.Connection
}
- private async Task ApplyBindings(CancellationToken cancellationToken)
+ private async ValueTask ApplyBindings(CancellationToken cancellationToken)
{
var routingKeyStrategy = config.DependencyResolver.Resolve();
var exchangeStrategy = config.DependencyResolver.Resolve();
@@ -133,7 +133,9 @@ namespace Tapeti.Connection
else
bindingTarget = new NoVerifyBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
- await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget)));
+ foreach (var binding in config.Bindings)
+ await binding.Apply(bindingTarget);
+
await bindingTarget.Apply();
}
@@ -183,12 +185,12 @@ namespace Tapeti.Connection
}
- public abstract Task BindDurable(Type messageClass, string queueName);
- public abstract Task BindDurableDirect(string queueName);
- public abstract Task BindDurableObsolete(string queueName);
+ public abstract ValueTask BindDurable(Type messageClass, string queueName);
+ public abstract ValueTask BindDurableDirect(string queueName);
+ public abstract ValueTask BindDurableObsolete(string queueName);
- public async Task BindDynamic(Type messageClass, string queuePrefix = null)
+ public async ValueTask BindDynamic(Type messageClass, string queuePrefix = null)
{
var result = await DeclareDynamicQueue(messageClass, queuePrefix);
if (!result.IsNewMessageClass)
@@ -203,14 +205,14 @@ namespace Tapeti.Connection
}
- public async Task BindDynamicDirect(Type messageClass, string queuePrefix = null)
+ public async ValueTask BindDynamicDirect(Type messageClass, string queuePrefix = null)
{
var result = await DeclareDynamicQueue(messageClass, queuePrefix);
return result.QueueName;
}
- public async Task BindDynamicDirect(string queuePrefix = null)
+ public async ValueTask BindDynamicDirect(string queuePrefix = null)
{
// If we don't know the routing key, always create a new queue to ensure there is no overlap.
// Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either.
@@ -285,7 +287,7 @@ namespace Tapeti.Connection
}
- public override Task BindDurable(Type messageClass, string queueName)
+ public override ValueTask BindDurable(Type messageClass, string queueName)
{
// Collect the message classes per queue so we can determine afterwards
// if any of the bindings currently set on the durable queue are no
@@ -300,23 +302,23 @@ namespace Tapeti.Connection
else if (!messageClasses.Contains(messageClass))
messageClasses.Add(messageClass);
- return Task.CompletedTask;
+ return default;
}
- public override Task BindDurableDirect(string queueName)
+ public override ValueTask BindDurableDirect(string queueName)
{
if (!durableQueues.ContainsKey(queueName))
durableQueues.Add(queueName, new List());
- return Task.CompletedTask;
+ return default;
}
- public override Task BindDurableObsolete(string queueName)
+ public override ValueTask BindDurableObsolete(string queueName)
{
obsoleteDurableQueues.Add(queueName);
- return Task.CompletedTask;
+ return default;
}
@@ -357,7 +359,7 @@ namespace Tapeti.Connection
private class PassiveDurableQueuesBindingTarget : CustomBindingTarget
{
- private readonly List durableQueues = new();
+ private readonly HashSet durableQueues = new();
public PassiveDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
@@ -365,29 +367,28 @@ namespace Tapeti.Connection
}
- public override async Task BindDurable(Type messageClass, string queueName)
+ public override async ValueTask BindDurable(Type messageClass, string queueName)
{
await VerifyDurableQueue(queueName);
}
- public override async Task BindDurableDirect(string queueName)
+ public override async ValueTask BindDurableDirect(string queueName)
{
await VerifyDurableQueue(queueName);
}
- public override Task BindDurableObsolete(string queueName)
+ public override ValueTask BindDurableObsolete(string queueName)
{
- return Task.CompletedTask;
+ return default;
}
private async Task VerifyDurableQueue(string queueName)
{
- if (!durableQueues.Contains(queueName))
- {
- await ClientFactory().DurableQueueVerify(CancellationToken, queueName);
- durableQueues.Add(queueName);
- }
+ if (!durableQueues.Add(queueName))
+ return;
+
+ await ClientFactory().DurableQueueVerify(CancellationToken, queueName);
}
}
@@ -399,19 +400,19 @@ namespace Tapeti.Connection
}
- public override Task BindDurable(Type messageClass, string queueName)
+ public override ValueTask BindDurable(Type messageClass, string queueName)
{
- return Task.CompletedTask;
+ return default;
}
- public override Task BindDurableDirect(string queueName)
+ public override ValueTask BindDurableDirect(string queueName)
{
- return Task.CompletedTask;
+ return default;
}
- public override Task BindDurableObsolete(string queueName)
+ public override ValueTask BindDurableObsolete(string queueName)
{
- return Task.CompletedTask;
+ return default;
}
}
}
diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs
index 3884cf8..52aa0eb 100644
--- a/Tapeti/Default/ControllerMethodBinding.cs
+++ b/Tapeti/Default/ControllerMethodBinding.cs
@@ -109,7 +109,7 @@ namespace Tapeti.Default
///
- public async Task Apply(IBindingTarget target)
+ public async ValueTask Apply(IBindingTarget target)
{
if (!bindingInfo.IsObsolete)
{
@@ -157,7 +157,7 @@ namespace Tapeti.Default
///
- public async Task Invoke(IMessageContext context)
+ public async ValueTask Invoke(IMessageContext context)
{
var controller = Method.IsStatic ? null : dependencyResolver.Resolve(bindingInfo.ControllerType);
context.Store(new ControllerMessageContextPayload(controller, context.Binding as IControllerMethodBinding));
@@ -174,7 +174,7 @@ namespace Tapeti.Default
///
- public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
+ public async ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult)
{
await MiddlewareHelper.GoAsync(
bindingInfo.CleanupMiddleware,