Provide a way to disable queue verification

- Backwards compatibility with incompatible queue definitions
This commit is contained in:
Mark van Renswoude 2020-03-17 15:21:39 +01:00
parent c1cd2823ca
commit 145850c3c2
4 changed files with 82 additions and 4 deletions

View File

@ -50,6 +50,15 @@ namespace Tapeti.Config
/// already be present when the connection is made.
/// </summary>
bool DeclareDurableQueues { get; }
/// <summary>
/// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled.
/// Defaults to true. Disable if you have queues with additional properties like a deadletter
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
/// while verifying.
/// </summary>
bool VerifyDurableQueues { get; }
}

View File

@ -92,6 +92,24 @@ namespace Tapeti.Config
/// feature to work, since AMQP does not provide a way to query existing bindings.
/// </remarks>
ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled);
/// <summary>
/// Disables verifying durable queues at startup. Enabled by default.
/// Disable if you have queues with additional properties like a deadletter
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
/// while verifying.
/// </summary>
ITapetiConfigBuilder DisableVerifyDurableQueues();
/// <summary>
/// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled.
/// Defaults to true. Disable if you have queues with additional properties like a deadletter
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
/// while verifying.
/// </summary>
ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled);
}

View File

@ -97,9 +97,14 @@ namespace Tapeti.Connection
var routingKeyStrategy = config.DependencyResolver.Resolve<IRoutingKeyStrategy>();
var exchangeStrategy = config.DependencyResolver.Resolve<IExchangeStrategy>();
var bindingTarget = config.Features.DeclareDurableQueues
? (CustomBindingTarget)new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
: new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
CustomBindingTarget bindingTarget;
if (config.Features.DeclareDurableQueues)
bindingTarget = new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
else if (config.Features.VerifyDurableQueues)
bindingTarget = new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
else
bindingTarget = new NoVerifyBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget)));
await bindingTarget.Apply();
@ -358,5 +363,29 @@ namespace Tapeti.Connection
}
}
}
private class NoVerifyBindingTarget : CustomBindingTarget
{
public NoVerifyBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
{
}
public override Task BindDurable(Type messageClass, string queueName)
{
return Task.CompletedTask;
}
public override Task BindDurableDirect(string queueName)
{
return Task.CompletedTask;
}
public override Task BindDurableObsolete(string queueName)
{
return Task.CompletedTask;
}
}
}
}

View File

@ -167,6 +167,22 @@ namespace Tapeti
}
/// <inheritdoc />
public ITapetiConfigBuilder DisableVerifyDurableQueues()
{
GetConfig().SetVerifyDurableQueues(false);
return this;
}
/// <inheritdoc />
public ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled)
{
GetConfig().SetVerifyDurableQueues(enabled);
return this;
}
/// <summary>
/// Registers the default implementation of various Tapeti interfaces into the IoC container.
/// </summary>
@ -256,13 +272,19 @@ namespace Tapeti
{
features.DeclareDurableQueues = enabled;
}
public void SetVerifyDurableQueues(bool enabled)
{
features.VerifyDurableQueues = enabled;
}
}
internal class ConfigFeatures : ITapetiConfigFeatues
{
public bool PublisherConfirms { get; internal set; } = true;
public bool DeclareDurableQueues { get; internal set; } = true;
public bool DeclareDurableQueues { get; internal set; }
public bool VerifyDurableQueues { get; internal set; } = true;
}