Added support for consuming incompatible durable queues without breaking

This commit is contained in:
Mark van Renswoude 2021-09-02 13:58:01 +02:00
parent c2a6b4b577
commit 5a90c1e0a5
5 changed files with 91 additions and 8 deletions

View File

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using Tapeti.Config;
using ISerilogLogger = Serilog.ILogger;
@ -93,6 +94,14 @@ namespace Tapeti.Serilog
seriLogger.Information("Tapeti: declaring {queueType} queue {queueName}", durable ? "durable" : "dynamic", queueName);
}
/// <inheritdoc />
public void QueueExistsWarning(string queueName, Dictionary<string, string> arguments)
{
seriLogger.Warning("Tapeti: durable queue {queueName} exists with incompatible x-arguments ({arguments}) and will not be redeclared, queue will be consumed as-is",
queueName,
arguments);
}
/// <inheritdoc />
public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
{

View File

@ -54,9 +54,7 @@ namespace Tapeti.Config
/// <summary>
/// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled.
/// Defaults to true. Disable if you have queues with additional properties like a deadletter
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
/// while verifying.
/// Defaults to true.
/// </summary>
bool VerifyDurableQueues { get; }
}

View File

@ -267,9 +267,28 @@ namespace Tapeti.Connection
}
private async Task<bool> GetDurableQueueDeclareRequired(string queueName)
{
var existingQueue = await GetQueueInfo(queueName);
if (existingQueue == null)
return true;
if (!existingQueue.Durable || existingQueue.AutoDelete || existingQueue.Exclusive)
throw new InvalidOperationException($"Durable queue {queueName} already exists with incompatible parameters, durable = {existingQueue.Durable} (expected True), autoDelete = {existingQueue.AutoDelete} (expected False), exclusive = {existingQueue.Exclusive} (expected False)");
if (existingQueue.Arguments.Count <= 0)
return true;
(logger as IBindingLogger)?.QueueExistsWarning(queueName, existingQueue.Arguments);
return false;
}
/// <inheritdoc />
public async Task DurableQueueDeclare(CancellationToken cancellationToken, string queueName, IEnumerable<QueueBinding> bindings)
{
var declareRequired = await GetDurableQueueDeclareRequired(queueName);
var existingBindings = (await GetQueueBindings(queueName)).ToList();
var currentBindings = bindings.ToList();
var bindingLogger = logger as IBindingLogger;
@ -279,9 +298,11 @@ namespace Tapeti.Connection
if (cancellationToken.IsCancellationRequested)
return;
bindingLogger?.QueueDeclare(queueName, true, false);
channel.QueueDeclare(queueName, true, false, false);
if (declareRequired)
{
bindingLogger?.QueueDeclare(queueName, true, false);
channel.QueueDeclare(queueName, true, false, false);
}
foreach (var binding in currentBindings.Except(existingBindings))
{
@ -301,6 +322,9 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task DurableQueueVerify(CancellationToken cancellationToken, string queueName)
{
if (!await GetDurableQueueDeclareRequired(queueName))
return;
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
{
if (cancellationToken.IsCancellationRequested)
@ -491,6 +515,24 @@ namespace Tapeti.Connection
private class ManagementQueueInfo
{
[JsonProperty("name")]
public string Name { get; set; }
[JsonProperty("vhost")]
public string VHost { get; set; }
[JsonProperty("durable")]
public bool Durable { get; set; }
[JsonProperty("auto_delete")]
public bool AutoDelete { get; set; }
[JsonProperty("exclusive")]
public bool Exclusive { get; set; }
[JsonProperty("arguments")]
public Dictionary<string, string> Arguments { get; set; }
[JsonProperty("messages")]
public uint Messages { get; set; }
}
@ -539,7 +581,7 @@ namespace Tapeti.Connection
public string PropertiesKey { get; set; }
}
private async Task<IEnumerable<QueueBinding>> GetQueueBindings(string queueName)
{
var virtualHostPath = Uri.EscapeDataString(connectionParams.VirtualHost);
@ -577,7 +619,13 @@ namespace Tapeti.Connection
private async Task<T> WithRetryableManagementAPI<T>(string path, Func<HttpResponseMessage, Task<T>> handleResponse)
{
var requestUri = new Uri($"http://{connectionParams.HostName}:{connectionParams.ManagementPort}/api/{path}");
// Workaround for: https://github.com/dotnet/runtime/issues/23581#issuecomment-354391321
// "localhost" can cause a 1 second delay *per call*. Not an issue in production scenarios, but annoying while debugging.
var hostName = connectionParams.HostName;
if (hostName.Equals("localhost", StringComparison.InvariantCultureIgnoreCase))
hostName = "127.0.0.1";
var requestUri = new Uri($"http://{hostName}:{connectionParams.ManagementPort}/api/{path}");
using var request = new HttpRequestMessage(HttpMethod.Get, requestUri);
var retryDelayIndex = 0;

View File

@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Text;
using Tapeti.Config;
namespace Tapeti.Default
@ -60,6 +62,21 @@ namespace Tapeti.Default
: $"[Tapeti] Verifying durable queue {queueName}");
}
/// <inheritdoc />
public void QueueExistsWarning(string queueName, Dictionary<string, string> arguments)
{
var argumentsText = new StringBuilder();
foreach (var pair in arguments)
{
if (argumentsText.Length > 0)
argumentsText.Append(", ");
argumentsText.Append($"{pair.Key} = {pair.Value}");
}
Console.WriteLine($"[Tapeti] Durable queue {queueName} exists with incompatible x-arguments ({argumentsText}) and will not be redeclared, queue will be consumed as-is");
}
/// <inheritdoc />
public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
{

View File

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using Tapeti.Config;
// ReSharper disable UnusedMember.Global
@ -130,6 +131,16 @@ namespace Tapeti
/// <param name="passive">Indicates whether the queue was declared as passive (to verify durable queues)</param>
void QueueDeclare(string queueName, bool durable, bool passive);
/// <summary>
/// Called when a durable queue would be declared but already exists with incompatible x-arguments. The existing
/// queue will be consumed without declaring to prevent errors during startup. This is used for compatibility with existing queues
/// not declared by Tapeti.
/// If the queue already exists but should be compatible QueueDeclare will be called instead.
/// </summary>
/// <param name="queueName">The name of the queue that is declared</param>
/// <param name="arguments">The x-arguments of the existing queue</param>
void QueueExistsWarning(string queueName, Dictionary<string, string> arguments);
/// <summary>
/// Called before a binding is added to a queue.
/// </summary>