diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs index 1c2118a..a05d11c 100644 --- a/Tapeti.Serilog/TapetiSeriLogger.cs +++ b/Tapeti.Serilog/TapetiSeriLogger.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using Tapeti.Config; using ISerilogLogger = Serilog.ILogger; @@ -93,6 +94,14 @@ namespace Tapeti.Serilog seriLogger.Information("Tapeti: declaring {queueType} queue {queueName}", durable ? "durable" : "dynamic", queueName); } + /// + public void QueueExistsWarning(string queueName, Dictionary arguments) + { + seriLogger.Warning("Tapeti: durable queue {queueName} exists with incompatible x-arguments ({arguments}) and will not be redeclared, queue will be consumed as-is", + queueName, + arguments); + } + /// public void QueueBind(string queueName, bool durable, string exchange, string routingKey) { diff --git a/Tapeti/Config/ITapetiConfig.cs b/Tapeti/Config/ITapetiConfig.cs index 78f4022..519eeb2 100644 --- a/Tapeti/Config/ITapetiConfig.cs +++ b/Tapeti/Config/ITapetiConfig.cs @@ -54,9 +54,7 @@ namespace Tapeti.Config /// /// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled. - /// Defaults to true. Disable if you have queues with additional properties like a deadletter - /// exchange, which do not correspond to Tapeti's configuration, as these will cause an error - /// while verifying. + /// Defaults to true. /// bool VerifyDurableQueues { get; } } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index eb8f282..15ddffb 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -267,9 +267,28 @@ namespace Tapeti.Connection } + private async Task GetDurableQueueDeclareRequired(string queueName) + { + var existingQueue = await GetQueueInfo(queueName); + if (existingQueue == null) + return true; + + if (!existingQueue.Durable || existingQueue.AutoDelete || existingQueue.Exclusive) + throw new InvalidOperationException($"Durable queue {queueName} already exists with incompatible parameters, durable = {existingQueue.Durable} (expected True), autoDelete = {existingQueue.AutoDelete} (expected False), exclusive = {existingQueue.Exclusive} (expected False)"); + + if (existingQueue.Arguments.Count <= 0) + return true; + + (logger as IBindingLogger)?.QueueExistsWarning(queueName, existingQueue.Arguments); + return false; + } + + /// public async Task DurableQueueDeclare(CancellationToken cancellationToken, string queueName, IEnumerable bindings) { + var declareRequired = await GetDurableQueueDeclareRequired(queueName); + var existingBindings = (await GetQueueBindings(queueName)).ToList(); var currentBindings = bindings.ToList(); var bindingLogger = logger as IBindingLogger; @@ -279,9 +298,11 @@ namespace Tapeti.Connection if (cancellationToken.IsCancellationRequested) return; - bindingLogger?.QueueDeclare(queueName, true, false); - channel.QueueDeclare(queueName, true, false, false); - + if (declareRequired) + { + bindingLogger?.QueueDeclare(queueName, true, false); + channel.QueueDeclare(queueName, true, false, false); + } foreach (var binding in currentBindings.Except(existingBindings)) { @@ -301,6 +322,9 @@ namespace Tapeti.Connection /// public async Task DurableQueueVerify(CancellationToken cancellationToken, string queueName) { + if (!await GetDurableQueueDeclareRequired(queueName)) + return; + await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => { if (cancellationToken.IsCancellationRequested) @@ -491,6 +515,24 @@ namespace Tapeti.Connection private class ManagementQueueInfo { + [JsonProperty("name")] + public string Name { get; set; } + + [JsonProperty("vhost")] + public string VHost { get; set; } + + [JsonProperty("durable")] + public bool Durable { get; set; } + + [JsonProperty("auto_delete")] + public bool AutoDelete { get; set; } + + [JsonProperty("exclusive")] + public bool Exclusive { get; set; } + + [JsonProperty("arguments")] + public Dictionary Arguments { get; set; } + [JsonProperty("messages")] public uint Messages { get; set; } } @@ -539,7 +581,7 @@ namespace Tapeti.Connection public string PropertiesKey { get; set; } } - + private async Task> GetQueueBindings(string queueName) { var virtualHostPath = Uri.EscapeDataString(connectionParams.VirtualHost); @@ -577,7 +619,13 @@ namespace Tapeti.Connection private async Task WithRetryableManagementAPI(string path, Func> handleResponse) { - var requestUri = new Uri($"http://{connectionParams.HostName}:{connectionParams.ManagementPort}/api/{path}"); + // Workaround for: https://github.com/dotnet/runtime/issues/23581#issuecomment-354391321 + // "localhost" can cause a 1 second delay *per call*. Not an issue in production scenarios, but annoying while debugging. + var hostName = connectionParams.HostName; + if (hostName.Equals("localhost", StringComparison.InvariantCultureIgnoreCase)) + hostName = "127.0.0.1"; + + var requestUri = new Uri($"http://{hostName}:{connectionParams.ManagementPort}/api/{path}"); using var request = new HttpRequestMessage(HttpMethod.Get, requestUri); var retryDelayIndex = 0; diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index 024bbac..41a8c08 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Text; using Tapeti.Config; namespace Tapeti.Default @@ -60,6 +62,21 @@ namespace Tapeti.Default : $"[Tapeti] Verifying durable queue {queueName}"); } + /// + public void QueueExistsWarning(string queueName, Dictionary arguments) + { + var argumentsText = new StringBuilder(); + foreach (var pair in arguments) + { + if (argumentsText.Length > 0) + argumentsText.Append(", "); + + argumentsText.Append($"{pair.Key} = {pair.Value}"); + } + + Console.WriteLine($"[Tapeti] Durable queue {queueName} exists with incompatible x-arguments ({argumentsText}) and will not be redeclared, queue will be consumed as-is"); + } + /// public void QueueBind(string queueName, bool durable, string exchange, string routingKey) { diff --git a/Tapeti/ILogger.cs b/Tapeti/ILogger.cs index c24a798..b9bd22f 100644 --- a/Tapeti/ILogger.cs +++ b/Tapeti/ILogger.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using Tapeti.Config; // ReSharper disable UnusedMember.Global @@ -130,6 +131,16 @@ namespace Tapeti /// Indicates whether the queue was declared as passive (to verify durable queues) void QueueDeclare(string queueName, bool durable, bool passive); + /// + /// Called when a durable queue would be declared but already exists with incompatible x-arguments. The existing + /// queue will be consumed without declaring to prevent errors during startup. This is used for compatibility with existing queues + /// not declared by Tapeti. + /// If the queue already exists but should be compatible QueueDeclare will be called instead. + /// + /// The name of the queue that is declared + /// The x-arguments of the existing queue + void QueueExistsWarning(string queueName, Dictionary arguments); + /// /// Called before a binding is added to a queue. ///