From 145850c3c2bb5e229194e908b3ac2cb2868bd973 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Tue, 17 Mar 2020 15:21:39 +0100 Subject: [PATCH] Provide a way to disable queue verification - Backwards compatibility with incompatible queue definitions --- Tapeti/Config/ITapetiConfig.cs | 9 +++++++ Tapeti/Config/ITapetiConfigBuilder.cs | 18 ++++++++++++++ Tapeti/Connection/TapetiSubscriber.cs | 35 ++++++++++++++++++++++++--- Tapeti/TapetiConfig.cs | 24 +++++++++++++++++- 4 files changed, 82 insertions(+), 4 deletions(-) diff --git a/Tapeti/Config/ITapetiConfig.cs b/Tapeti/Config/ITapetiConfig.cs index 515ea0e..78f4022 100644 --- a/Tapeti/Config/ITapetiConfig.cs +++ b/Tapeti/Config/ITapetiConfig.cs @@ -50,6 +50,15 @@ namespace Tapeti.Config /// already be present when the connection is made. /// bool DeclareDurableQueues { get; } + + + /// + /// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled. + /// Defaults to true. Disable if you have queues with additional properties like a deadletter + /// exchange, which do not correspond to Tapeti's configuration, as these will cause an error + /// while verifying. + /// + bool VerifyDurableQueues { get; } } diff --git a/Tapeti/Config/ITapetiConfigBuilder.cs b/Tapeti/Config/ITapetiConfigBuilder.cs index 781648e..87699df 100644 --- a/Tapeti/Config/ITapetiConfigBuilder.cs +++ b/Tapeti/Config/ITapetiConfigBuilder.cs @@ -92,6 +92,24 @@ namespace Tapeti.Config /// feature to work, since AMQP does not provide a way to query existing bindings. /// ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled); + + + /// + /// Disables verifying durable queues at startup. Enabled by default. + /// Disable if you have queues with additional properties like a deadletter + /// exchange, which do not correspond to Tapeti's configuration, as these will cause an error + /// while verifying. + /// + ITapetiConfigBuilder DisableVerifyDurableQueues(); + + + /// + /// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled. + /// Defaults to true. Disable if you have queues with additional properties like a deadletter + /// exchange, which do not correspond to Tapeti's configuration, as these will cause an error + /// while verifying. + /// + ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled); } diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index c57a424..69be2d0 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -97,9 +97,14 @@ namespace Tapeti.Connection var routingKeyStrategy = config.DependencyResolver.Resolve(); var exchangeStrategy = config.DependencyResolver.Resolve(); - var bindingTarget = config.Features.DeclareDurableQueues - ? (CustomBindingTarget)new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken) - : new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); + CustomBindingTarget bindingTarget; + + if (config.Features.DeclareDurableQueues) + bindingTarget = new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); + else if (config.Features.VerifyDurableQueues) + bindingTarget = new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); + else + bindingTarget = new NoVerifyBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget))); await bindingTarget.Apply(); @@ -358,5 +363,29 @@ namespace Tapeti.Connection } } } + + + private class NoVerifyBindingTarget : CustomBindingTarget + { + public NoVerifyBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken) + { + } + + + public override Task BindDurable(Type messageClass, string queueName) + { + return Task.CompletedTask; + } + + public override Task BindDurableDirect(string queueName) + { + return Task.CompletedTask; + } + + public override Task BindDurableObsolete(string queueName) + { + return Task.CompletedTask; + } + } } } diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 6da6fae..2746fbc 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -167,6 +167,22 @@ namespace Tapeti } + /// + public ITapetiConfigBuilder DisableVerifyDurableQueues() + { + GetConfig().SetVerifyDurableQueues(false); + return this; + } + + + /// + public ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled) + { + GetConfig().SetVerifyDurableQueues(enabled); + return this; + } + + /// /// Registers the default implementation of various Tapeti interfaces into the IoC container. /// @@ -256,13 +272,19 @@ namespace Tapeti { features.DeclareDurableQueues = enabled; } + + public void SetVerifyDurableQueues(bool enabled) + { + features.VerifyDurableQueues = enabled; + } } internal class ConfigFeatures : ITapetiConfigFeatues { public bool PublisherConfirms { get; internal set; } = true; - public bool DeclareDurableQueues { get; internal set; } = true; + public bool DeclareDurableQueues { get; internal set; } + public bool VerifyDurableQueues { get; internal set; } = true; }