Fixed #38 Durable queues are verified more than once

Caused by non thread-safe code when applying bindings. Replaced Task.WhenAll with a simple loop, as the performance benefits are negligable and it's only at startup anyways.
Also changed binding methods to use ValueTask.
This commit is contained in:
Mark van Renswoude 2022-02-09 12:42:05 +01:00
parent 2a32d6352b
commit 3aee6f1c53
4 changed files with 48 additions and 47 deletions

View File

@ -31,7 +31,7 @@ namespace Tapeti.Transient
/// <inheritdoc />
public async Task Apply(IBindingTarget target)
public async ValueTask Apply(IBindingTarget target)
{
QueueName = await target.BindDynamicDirect(dynamicQueuePrefix);
router.TransientResponseQueueName = QueueName;
@ -46,17 +46,17 @@ namespace Tapeti.Transient
/// <inheritdoc />
public Task Invoke(IMessageContext context)
public ValueTask Invoke(IMessageContext context)
{
router.HandleMessage(context);
return Task.CompletedTask;
return default;
}
/// <inheritdoc />
public Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
public ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult)
{
return Task.CompletedTask;
return default;
}
}
}

View File

@ -41,7 +41,7 @@ namespace Tapeti.Config
/// Called after a connection is established to set up the binding.
/// </summary>
/// <param name="target"></param>
Task Apply(IBindingTarget target);
ValueTask Apply(IBindingTarget target);
/// <summary>
@ -55,7 +55,7 @@ namespace Tapeti.Config
/// Invokes the handler for the message as specified by the context.
/// </summary>
/// <param name="context"></param>
Task Invoke(IMessageContext context);
ValueTask Invoke(IMessageContext context);
/// <summary>
@ -64,7 +64,7 @@ namespace Tapeti.Config
/// <param name="context"></param>
/// <param name="consumeResult"></param>
/// <returns></returns>
Task Cleanup(IMessageContext context, ConsumeResult consumeResult);
ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult);
}
@ -80,7 +80,7 @@ namespace Tapeti.Config
/// </summary>
/// <param name="messageClass">The message class to be bound to the queue</param>
/// <param name="queueName">The name of the durable queue</param>
Task BindDurable(Type messageClass, string queueName);
ValueTask BindDurable(Type messageClass, string queueName);
/// <summary>
/// Binds the messageClass to a dynamic auto-delete queue.
@ -92,14 +92,14 @@ namespace Tapeti.Config
/// <param name="messageClass">The message class to be bound to the queue</param>
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <returns>The generated name of the dynamic queue</returns>
Task<string> BindDynamic(Type messageClass, string queuePrefix = null);
ValueTask<string> BindDynamic(Type messageClass, string queuePrefix = null);
/// <summary>
/// Declares a durable queue but does not add a binding for a messageClass' routing key.
/// Used for direct-to-queue messages.
/// </summary>
/// <param name="queueName">The name of the durable queue</param>
Task BindDurableDirect(string queueName);
ValueTask BindDurableDirect(string queueName);
/// <summary>
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
@ -108,7 +108,7 @@ namespace Tapeti.Config
/// <param name="messageClass">The message class which will be handled on the queue. It is not actually bound to the queue.</param>
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <returns>The generated name of the dynamic queue</returns>
Task<string> BindDynamicDirect(Type messageClass = null, string queuePrefix = null);
ValueTask<string> BindDynamicDirect(Type messageClass = null, string queuePrefix = null);
/// <summary>
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
@ -116,13 +116,13 @@ namespace Tapeti.Config
/// </summary>
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <returns>The generated name of the dynamic queue</returns>
Task<string> BindDynamicDirect(string queuePrefix = null);
ValueTask<string> BindDynamicDirect(string queuePrefix = null);
/// <summary>
/// Marks the specified durable queue as having an obsolete binding. If after all bindings have subscribed, the queue only contains obsolete
/// bindings and is empty, it will be removed.
/// </summary>
/// <param name="queueName">The name of the durable queue</param>
Task BindDurableObsolete(string queueName);
ValueTask BindDurableObsolete(string queueName);
}
}

View File

@ -119,7 +119,7 @@ namespace Tapeti.Connection
}
private async Task ApplyBindings(CancellationToken cancellationToken)
private async ValueTask ApplyBindings(CancellationToken cancellationToken)
{
var routingKeyStrategy = config.DependencyResolver.Resolve<IRoutingKeyStrategy>();
var exchangeStrategy = config.DependencyResolver.Resolve<IExchangeStrategy>();
@ -133,7 +133,9 @@ namespace Tapeti.Connection
else
bindingTarget = new NoVerifyBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget)));
foreach (var binding in config.Bindings)
await binding.Apply(bindingTarget);
await bindingTarget.Apply();
}
@ -183,12 +185,12 @@ namespace Tapeti.Connection
}
public abstract Task BindDurable(Type messageClass, string queueName);
public abstract Task BindDurableDirect(string queueName);
public abstract Task BindDurableObsolete(string queueName);
public abstract ValueTask BindDurable(Type messageClass, string queueName);
public abstract ValueTask BindDurableDirect(string queueName);
public abstract ValueTask BindDurableObsolete(string queueName);
public async Task<string> BindDynamic(Type messageClass, string queuePrefix = null)
public async ValueTask<string> BindDynamic(Type messageClass, string queuePrefix = null)
{
var result = await DeclareDynamicQueue(messageClass, queuePrefix);
if (!result.IsNewMessageClass)
@ -203,14 +205,14 @@ namespace Tapeti.Connection
}
public async Task<string> BindDynamicDirect(Type messageClass, string queuePrefix = null)
public async ValueTask<string> BindDynamicDirect(Type messageClass, string queuePrefix = null)
{
var result = await DeclareDynamicQueue(messageClass, queuePrefix);
return result.QueueName;
}
public async Task<string> BindDynamicDirect(string queuePrefix = null)
public async ValueTask<string> BindDynamicDirect(string queuePrefix = null)
{
// If we don't know the routing key, always create a new queue to ensure there is no overlap.
// Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either.
@ -285,7 +287,7 @@ namespace Tapeti.Connection
}
public override Task BindDurable(Type messageClass, string queueName)
public override ValueTask BindDurable(Type messageClass, string queueName)
{
// Collect the message classes per queue so we can determine afterwards
// if any of the bindings currently set on the durable queue are no
@ -300,23 +302,23 @@ namespace Tapeti.Connection
else if (!messageClasses.Contains(messageClass))
messageClasses.Add(messageClass);
return Task.CompletedTask;
return default;
}
public override Task BindDurableDirect(string queueName)
public override ValueTask BindDurableDirect(string queueName)
{
if (!durableQueues.ContainsKey(queueName))
durableQueues.Add(queueName, new List<Type>());
return Task.CompletedTask;
return default;
}
public override Task BindDurableObsolete(string queueName)
public override ValueTask BindDurableObsolete(string queueName)
{
obsoleteDurableQueues.Add(queueName);
return Task.CompletedTask;
return default;
}
@ -357,7 +359,7 @@ namespace Tapeti.Connection
private class PassiveDurableQueuesBindingTarget : CustomBindingTarget
{
private readonly List<string> durableQueues = new();
private readonly HashSet<string> durableQueues = new();
public PassiveDurableQueuesBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
@ -365,29 +367,28 @@ namespace Tapeti.Connection
}
public override async Task BindDurable(Type messageClass, string queueName)
public override async ValueTask BindDurable(Type messageClass, string queueName)
{
await VerifyDurableQueue(queueName);
}
public override async Task BindDurableDirect(string queueName)
public override async ValueTask BindDurableDirect(string queueName)
{
await VerifyDurableQueue(queueName);
}
public override Task BindDurableObsolete(string queueName)
public override ValueTask BindDurableObsolete(string queueName)
{
return Task.CompletedTask;
return default;
}
private async Task VerifyDurableQueue(string queueName)
{
if (!durableQueues.Contains(queueName))
{
await ClientFactory().DurableQueueVerify(CancellationToken, queueName);
durableQueues.Add(queueName);
}
if (!durableQueues.Add(queueName))
return;
await ClientFactory().DurableQueueVerify(CancellationToken, queueName);
}
}
@ -399,19 +400,19 @@ namespace Tapeti.Connection
}
public override Task BindDurable(Type messageClass, string queueName)
public override ValueTask BindDurable(Type messageClass, string queueName)
{
return Task.CompletedTask;
return default;
}
public override Task BindDurableDirect(string queueName)
public override ValueTask BindDurableDirect(string queueName)
{
return Task.CompletedTask;
return default;
}
public override Task BindDurableObsolete(string queueName)
public override ValueTask BindDurableObsolete(string queueName)
{
return Task.CompletedTask;
return default;
}
}
}

View File

@ -109,7 +109,7 @@ namespace Tapeti.Default
/// <inheritdoc />
public async Task Apply(IBindingTarget target)
public async ValueTask Apply(IBindingTarget target)
{
if (!bindingInfo.IsObsolete)
{
@ -157,7 +157,7 @@ namespace Tapeti.Default
/// <inheritdoc />
public async Task Invoke(IMessageContext context)
public async ValueTask Invoke(IMessageContext context)
{
var controller = Method.IsStatic ? null : dependencyResolver.Resolve(bindingInfo.ControllerType);
context.Store(new ControllerMessageContextPayload(controller, context.Binding as IControllerMethodBinding));
@ -174,7 +174,7 @@ namespace Tapeti.Default
/// <inheritdoc />
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
public async ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult)
{
await MiddlewareHelper.GoAsync(
bindingInfo.CleanupMiddleware,