diff --git a/Examples/ExampleLib/ExampleConsoleApp.cs b/Examples/ExampleLib/ExampleConsoleApp.cs
index e6ff049..f7f4d98 100644
--- a/Examples/ExampleLib/ExampleConsoleApp.cs
+++ b/Examples/ExampleLib/ExampleConsoleApp.cs
@@ -61,6 +61,11 @@ namespace ExampleLib
public async Task WaitAsync()
{
await doneSignal.Task;
+
+ // This is a hack, because the signal is often given in a message handler before the message can be
+ // acknowledged, causing it to be put back on the queue because the connection is closed.
+ // This short delay allows consumers to finish. This is not an issue in a proper service application.
+ await Task.Delay(500);
}
diff --git a/Tapeti.Annotations/Tapeti.Annotations.nuspec b/Tapeti.Annotations/Tapeti.Annotations.nuspec
index 68e8268..87e40c7 100644
--- a/Tapeti.Annotations/Tapeti.Annotations.nuspec
+++ b/Tapeti.Annotations/Tapeti.Annotations.nuspec
@@ -6,7 +6,7 @@
Tapeti Annotations
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Annotations.png
false
diff --git a/Tapeti.Autofac/Tapeti.Autofac.nuspec b/Tapeti.Autofac/Tapeti.Autofac.nuspec
index 023ea84..d788f49 100644
--- a/Tapeti.Autofac/Tapeti.Autofac.nuspec
+++ b/Tapeti.Autofac/Tapeti.Autofac.nuspec
@@ -6,7 +6,7 @@
Tapeti Autofac
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png
false
diff --git a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.nuspec b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.nuspec
index 6486a33..ac652ed 100644
--- a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.nuspec
+++ b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.nuspec
@@ -6,7 +6,7 @@
Tapeti Castle Windsor
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png
false
diff --git a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.nuspec b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.nuspec
index db70921..d1e1aa1 100644
--- a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.nuspec
+++ b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.nuspec
@@ -6,7 +6,7 @@
Tapeti DataAnnotations Extensions
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.DataAnnotations.png
false
diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.nuspec b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.nuspec
index ddf90b8..02b18c4 100644
--- a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.nuspec
+++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.nuspec
@@ -6,7 +6,7 @@
Tapeti DataAnnotations
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.DataAnnotations.png
false
diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.nuspec b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.nuspec
index 81a6fe0..15b5488 100644
--- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.nuspec
+++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.nuspec
@@ -6,7 +6,7 @@
Tapeti Flow SQL
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.SQL.png
false
diff --git a/Tapeti.Flow/Tapeti.Flow.nuspec b/Tapeti.Flow/Tapeti.Flow.nuspec
index 27e8251..87504d0 100644
--- a/Tapeti.Flow/Tapeti.Flow.nuspec
+++ b/Tapeti.Flow/Tapeti.Flow.nuspec
@@ -6,7 +6,7 @@
Tapeti Flow
Menno van Lavieren, Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.png
false
diff --git a/Tapeti.Ninject/Tapeti.Ninject.nuspec b/Tapeti.Ninject/Tapeti.Ninject.nuspec
index 1a4051f..c0709e1 100644
--- a/Tapeti.Ninject/Tapeti.Ninject.nuspec
+++ b/Tapeti.Ninject/Tapeti.Ninject.nuspec
@@ -6,7 +6,7 @@
Tapeti Ninject
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png
false
diff --git a/Tapeti.Serilog/Tapeti.Serilog.nuspec b/Tapeti.Serilog/Tapeti.Serilog.nuspec
index 26fd39c..451ec08 100644
--- a/Tapeti.Serilog/Tapeti.Serilog.nuspec
+++ b/Tapeti.Serilog/Tapeti.Serilog.nuspec
@@ -6,7 +6,7 @@
Tapeti Serilog
Hans Mulder
Hans Mulder
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Serilog.png
false
diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs
index ae168f1..3c55330 100644
--- a/Tapeti.Serilog/TapetiSeriLogger.cs
+++ b/Tapeti.Serilog/TapetiSeriLogger.cs
@@ -71,5 +71,14 @@ namespace Tapeti.Serilog
contextLogger.Error(exception, "Tapeti: exception in message handler");
}
+
+ ///
+ public void QueueObsolete(string queueName, bool deleted, uint messageCount)
+ {
+ if (deleted)
+ seriLogger.Information("Tapeti: obsolete queue {queue} has been deleted", queueName);
+ else
+ seriLogger.Information("Tapeti: obsolete queue {queue} has been unbound but not yet deleted, {messageCount} messages remaining", queueName, messageCount);
+ }
}
}
diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.nuspec b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.nuspec
index 8bfbe3d..7005734 100644
--- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.nuspec
+++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.nuspec
@@ -6,7 +6,7 @@
Tapeti SimpleInjector
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png
false
diff --git a/Tapeti.Transient/Tapeti.Transient.nuspec b/Tapeti.Transient/Tapeti.Transient.nuspec
index ad29af0..41e1fa2 100644
--- a/Tapeti.Transient/Tapeti.Transient.nuspec
+++ b/Tapeti.Transient/Tapeti.Transient.nuspec
@@ -6,7 +6,7 @@
Tapeti Transient
Menno van Lavieren, Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.png
false
diff --git a/Tapeti.UnityContainer/Tapeti.UnityContainer.nuspec b/Tapeti.UnityContainer/Tapeti.UnityContainer.nuspec
index 0d40763..356278a 100644
--- a/Tapeti.UnityContainer/Tapeti.UnityContainer.nuspec
+++ b/Tapeti.UnityContainer/Tapeti.UnityContainer.nuspec
@@ -6,7 +6,7 @@
Tapeti UnityContainer
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png
false
diff --git a/Tapeti/Config/IBinding.cs b/Tapeti/Config/IBinding.cs
index 98b5cb6..8cbb45f 100644
--- a/Tapeti/Config/IBinding.cs
+++ b/Tapeti/Config/IBinding.cs
@@ -117,5 +117,12 @@ namespace Tapeti.Config
/// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.
/// The generated name of the dynamic queue
Task BindDynamicDirect(string queuePrefix = null);
+
+ ///
+ /// Marks the specified durable queue as having an obsolete binding. If after all bindings have subscribed, the queue only contains obsolete
+ /// bindings and is empty, it will be removed.
+ ///
+ /// The name of the durable queue
+ Task BindDurableObsolete(string queueName);
}
}
diff --git a/Tapeti/Connection/ITapetiClient.cs b/Tapeti/Connection/ITapetiClient.cs
index ce83888..3a28053 100644
--- a/Tapeti/Connection/ITapetiClient.cs
+++ b/Tapeti/Connection/ITapetiClient.cs
@@ -91,6 +91,13 @@ namespace Tapeti.Connection
/// The name of the queue to verify
Task DurableQueueVerify(string queueName);
+ ///
+ /// Deletes a durable queue.
+ ///
+ /// The name of the queue to delete
+ /// If true, the queue will only be deleted if it is empty otherwise all bindings will be removed. If false, the queue is deleted even if there are queued messages.
+ Task DurableQueueDelete(string queueName, bool onlyIfEmpty = true);
+
///
/// Creates a dynamic queue.
///
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index ec71b06..ba1c00e 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -50,6 +50,7 @@ namespace Tapeti.Connection
private ulong lastDeliveryTag;
private DateTime connectedDateTime;
private readonly HttpClient managementClient;
+ private readonly HashSet deletedQueues = new HashSet();
// These fields must be locked, since the callbacks for BasicAck/BasicReturn can run in a different thread
private readonly object confirmLock = new object();
@@ -185,16 +186,16 @@ namespace Tapeti.Connection
///
public async Task Consume(string queueName, IConsumer consumer)
{
+ if (deletedQueues.Contains(queueName))
+ return;
+
if (string.IsNullOrEmpty(queueName))
throw new ArgumentNullException(nameof(queueName));
- await taskQueue.Value.Add(() =>
- {
- WithRetryableChannel(channel =>
- {
- var basicConsumer = new TapetiBasicConsumer(consumer, Respond);
- channel.BasicConsume(queueName, false, basicConsumer);
- });
+ await QueueWithRetryableChannel(channel =>
+ {
+ var basicConsumer = new TapetiBasicConsumer(consumer, Respond);
+ channel.BasicConsume(queueName, false, basicConsumer);
});
}
@@ -223,7 +224,6 @@ namespace Tapeti.Connection
default:
throw new ArgumentOutOfRangeException(nameof(result), result, null);
}
-
});
}
@@ -255,32 +255,106 @@ namespace Tapeti.Connection
///
public async Task DurableQueueVerify(string queueName)
{
- await taskQueue.Value.Add(() =>
- {
- WithRetryableChannel(channel =>
- {
- channel.QueueDeclarePassive(queueName);
- });
+ await QueueWithRetryableChannel(channel =>
+ {
+ channel.QueueDeclarePassive(queueName);
});
}
+
+ ///
+ public async Task DurableQueueDelete(string queueName, bool onlyIfEmpty = true)
+ {
+ if (!onlyIfEmpty)
+ {
+ uint deletedMessages = 0;
+
+ await QueueWithRetryableChannel(channel =>
+ {
+ deletedMessages = channel.QueueDelete(queueName);
+ });
+
+ deletedQueues.Add(queueName);
+ logger.QueueObsolete(queueName, true, deletedMessages);
+ return;
+ }
+
+
+ await taskQueue.Value.Add(async () =>
+ {
+ bool retry;
+ do
+ {
+ retry = false;
+
+ // Get queue information from the Management API, since the AMQP operations will
+ // throw an error if the queue does not exist or still contains messages and resets
+ // the connection. The resulting reconnect will cause subscribers to reset.
+ var queueInfo = await GetQueueInfo(queueName);
+ if (queueInfo == null)
+ {
+ deletedQueues.Add(queueName);
+ return;
+ }
+
+ if (queueInfo.Messages == 0)
+ {
+ // Still pass onlyIfEmpty to prevent concurrency issues if a message arrived between
+ // the call to the Management API and deleting the queue. Because the QueueWithRetryableChannel
+ // includes the GetQueueInfo, the next time around it should have Messages > 0
+ try
+ {
+ WithRetryableChannel(channel =>
+ {
+ channel.QueueDelete(queueName, false, true);
+ });
+
+ deletedQueues.Add(queueName);
+ logger.QueueObsolete(queueName, true, 0);
+ }
+ catch (OperationInterruptedException e)
+ {
+ if (e.ShutdownReason.ReplyCode == RabbitMQ.Client.Framing.Constants.PreconditionFailed)
+ retry = true;
+ else
+ throw;
+ }
+ }
+ else
+ {
+ // Remove all bindings instead
+ var existingBindings = (await GetQueueBindings(queueName)).ToList();
+
+ if (existingBindings.Count > 0)
+ {
+ WithRetryableChannel(channel =>
+ {
+ foreach (var binding in existingBindings)
+ channel.QueueUnbind(queueName, binding.Exchange, binding.RoutingKey);
+ });
+ }
+
+ logger.QueueObsolete(queueName, false, queueInfo.Messages);
+ }
+ } while (retry);
+ });
+ }
+
+
///
public async Task DynamicQueueDeclare(string queuePrefix = null)
{
string queueName = null;
- await taskQueue.Value.Add(() =>
+ await QueueWithRetryableChannel(channel =>
{
- WithRetryableChannel(channel =>
+ if (!string.IsNullOrEmpty(queuePrefix))
{
- if (!string.IsNullOrEmpty(queuePrefix))
- {
- queueName = queuePrefix + "." + Guid.NewGuid().ToString("N");
- channel.QueueDeclare(queueName);
- }
- else
- queueName = channel.QueueDeclare().QueueName;
- });
+ queueName = queuePrefix + "." + Guid.NewGuid().ToString("N");
+ channel.QueueDeclare(queueName);
+ }
+ else
+ queueName = channel.QueueDeclare().QueueName;
});
return queueName;
@@ -289,13 +363,10 @@ namespace Tapeti.Connection
///
public async Task DynamicQueueBind(string queueName, QueueBinding binding)
{
- await taskQueue.Value.Add(() =>
+ await QueueWithRetryableChannel(channel =>
{
- WithRetryableChannel(channel =>
- {
- DeclareExchange(channel, binding.Exchange);
- channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
- });
+ DeclareExchange(channel, binding.Exchange);
+ channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
});
}
@@ -335,18 +406,31 @@ namespace Tapeti.Connection
HttpStatusCode.ServiceUnavailable
};
- private static readonly TimeSpan[] ExponentialBackoff =
+
+ private class ManagementQueueInfo
{
- TimeSpan.FromSeconds(1),
- TimeSpan.FromSeconds(2),
- TimeSpan.FromSeconds(3),
- TimeSpan.FromSeconds(5),
- TimeSpan.FromSeconds(8),
- TimeSpan.FromSeconds(13),
- TimeSpan.FromSeconds(21),
- TimeSpan.FromSeconds(34),
- TimeSpan.FromSeconds(55)
- };
+ [JsonProperty("messages")]
+ public uint Messages { get; set; }
+ }
+
+
+
+ private async Task GetQueueInfo(string queueName)
+ {
+ var virtualHostPath = Uri.EscapeDataString(connectionParams.VirtualHost);
+ var queuePath = Uri.EscapeDataString(queueName);
+
+ return await WithRetryableManagementAPI($"queues/{virtualHostPath}/{queuePath}", async response =>
+ {
+ if (response.StatusCode == HttpStatusCode.NotFound)
+ return null;
+
+ response.EnsureSuccessStatusCode();
+
+ var content = await response.Content.ReadAsStringAsync();
+ return JsonConvert.DeserializeObject(content);
+ });
+ }
private class ManagementBinding
@@ -378,10 +462,42 @@ namespace Tapeti.Connection
{
var virtualHostPath = Uri.EscapeDataString(connectionParams.VirtualHost);
var queuePath = Uri.EscapeDataString(queueName);
- var requestUri = new Uri($"http://{connectionParams.HostName}:{connectionParams.ManagementPort}/api/queues/{virtualHostPath}/{queuePath}/bindings");
+
+ return await WithRetryableManagementAPI($"queues/{virtualHostPath}/{queuePath}/bindings", async response =>
+ {
+ response.EnsureSuccessStatusCode();
+
+ var content = await response.Content.ReadAsStringAsync();
+ var bindings = JsonConvert.DeserializeObject>(content);
+
+ // Filter out the binding to an empty source, which is always present for direct-to-queue routing
+ return bindings
+ .Where(binding => !string.IsNullOrEmpty(binding.Source))
+ .Select(binding => new QueueBinding(binding.Source, binding.RoutingKey));
+ });
+ }
+
+
+ private static readonly TimeSpan[] ExponentialBackoff =
+ {
+ TimeSpan.FromSeconds(1),
+ TimeSpan.FromSeconds(2),
+ TimeSpan.FromSeconds(3),
+ TimeSpan.FromSeconds(5),
+ TimeSpan.FromSeconds(8),
+ TimeSpan.FromSeconds(13),
+ TimeSpan.FromSeconds(21),
+ TimeSpan.FromSeconds(34),
+ TimeSpan.FromSeconds(55)
+ };
+
+
+ private async Task WithRetryableManagementAPI(string path, Func> handleResponse)
+ {
+ var requestUri = new Uri($"http://{connectionParams.HostName}:{connectionParams.ManagementPort}/api/{path}");
using (var request = new HttpRequestMessage(HttpMethod.Get, requestUri))
- {
+ {
var retryDelayIndex = 0;
while (true)
@@ -389,15 +505,7 @@ namespace Tapeti.Connection
try
{
var response = await managementClient.SendAsync(request);
- response.EnsureSuccessStatusCode();
-
- var content = await response.Content.ReadAsStringAsync();
- var bindings = JsonConvert.DeserializeObject>(content);
-
- // Filter out the binding to an empty source, which is always present for direct-to-queue routing
- return bindings
- .Where(binding => !string.IsNullOrEmpty(binding.Source))
- .Select(binding => new QueueBinding(binding.Source, binding.RoutingKey));
+ return await handleResponse(response);
}
catch (TimeoutException)
{
@@ -435,6 +543,15 @@ namespace Tapeti.Connection
}
+ private async Task QueueWithRetryableChannel(Action operation)
+ {
+ await taskQueue.Value.Add(() =>
+ {
+ WithRetryableChannel(operation);
+ });
+ }
+
+
///
/// Only call this from a task in the taskQueue to ensure IModel is only used
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs
index a6184b1..f0d3ec0 100644
--- a/Tapeti/Connection/TapetiSubscriber.cs
+++ b/Tapeti/Connection/TapetiSubscriber.cs
@@ -91,6 +91,7 @@ namespace Tapeti.Connection
public abstract Task BindDurable(Type messageClass, string queueName);
public abstract Task BindDurableDirect(string queueName);
+ public abstract Task BindDurableObsolete(string queueName);
public async Task BindDynamic(Type messageClass, string queuePrefix = null)
@@ -182,6 +183,7 @@ namespace Tapeti.Connection
private class DeclareDurableQueuesBindingTarget : CustomBindingTarget
{
private readonly Dictionary> durableQueues = new Dictionary>();
+ private readonly HashSet obsoleteDurableQueues = new HashSet();
public DeclareDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy) : base(clientFactory, routingKeyStrategy, exchangeStrategy)
@@ -217,10 +219,23 @@ namespace Tapeti.Connection
}
+ public override Task BindDurableObsolete(string queueName)
+ {
+ obsoleteDurableQueues.Add(queueName);
+ return Task.CompletedTask;
+ }
+
+
public override async Task Apply()
{
var worker = ClientFactory();
+ await DeclareQueues(worker);
+ await DeleteObsoleteQueues(worker);
+ }
+
+ private async Task DeclareQueues(ITapetiClient worker)
+ {
await Task.WhenAll(durableQueues.Select(async queue =>
{
var bindings = queue.Value.Select(messageClass =>
@@ -234,6 +249,15 @@ namespace Tapeti.Connection
await worker.DurableQueueDeclare(queue.Key, bindings);
}));
}
+
+
+ private async Task DeleteObsoleteQueues(ITapetiClient worker)
+ {
+ await Task.WhenAll(obsoleteDurableQueues.Except(durableQueues.Keys).Select(async queue =>
+ {
+ await worker.DurableQueueDelete(queue);
+ }));
+ }
}
@@ -257,6 +281,11 @@ namespace Tapeti.Connection
await VerifyDurableQueue(queueName);
}
+ public override Task BindDurableObsolete(string queueName)
+ {
+ return Task.CompletedTask;
+ }
+
private async Task VerifyDurableQueue(string queueName)
{
diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs
index d42f46c..abfc92d 100644
--- a/Tapeti/Default/ConsoleLogger.cs
+++ b/Tapeti/Default/ConsoleLogger.cs
@@ -45,5 +45,13 @@ namespace Tapeti.Default
Console.WriteLine();
Console.WriteLine(exception);
}
+
+ ///
+ public void QueueObsolete(string queueName, bool deleted, uint messageCount)
+ {
+ Console.WriteLine(deleted
+ ? $"[Tapeti] Obsolete queue was deleted: {queueName}"
+ : $"[Tapeti] Obsolete queue bindings removed: {queueName}, {messageCount} messages remaining");
+ }
}
}
diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs
index 4136894..a0e5bfa 100644
--- a/Tapeti/Default/ControllerMethodBinding.cs
+++ b/Tapeti/Default/ControllerMethodBinding.cs
@@ -46,6 +46,12 @@ namespace Tapeti.Default
///
public BindingTargetMode BindingTargetMode;
+ ///
+ /// Indicates if the method or controller is marked with the Obsolete attribute, indicating it should
+ /// only handle messages already in the queue and not bind to the routing key for new messages.
+ ///
+ public bool IsObsolete;
+
///
/// Value factories for the method parameters.
///
@@ -106,32 +112,40 @@ namespace Tapeti.Default
///
public async Task Apply(IBindingTarget target)
{
- switch (bindingInfo.BindingTargetMode)
+ if (!bindingInfo.IsObsolete)
{
- case BindingTargetMode.Default:
- if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic)
- QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
- else
- {
- await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
- QueueName = bindingInfo.QueueInfo.Name;
- }
+ switch (bindingInfo.BindingTargetMode)
+ {
+ case BindingTargetMode.Default:
+ if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic)
+ QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
+ else
+ {
+ await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
+ QueueName = bindingInfo.QueueInfo.Name;
+ }
- break;
+ break;
- case BindingTargetMode.Direct:
- if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic)
- QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
- else
- {
- await target.BindDurableDirect(bindingInfo.QueueInfo.Name);
- QueueName = bindingInfo.QueueInfo.Name;
- }
+ case BindingTargetMode.Direct:
+ if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic)
+ QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
+ else
+ {
+ await target.BindDurableDirect(bindingInfo.QueueInfo.Name);
+ QueueName = bindingInfo.QueueInfo.Name;
+ }
- break;
+ break;
- default:
- throw new ArgumentOutOfRangeException(nameof(bindingInfo.BindingTargetMode), bindingInfo.BindingTargetMode, "Invalid BindingTargetMode");
+ default:
+ throw new ArgumentOutOfRangeException(nameof(bindingInfo.BindingTargetMode), bindingInfo.BindingTargetMode, "Invalid BindingTargetMode");
+ }
+ }
+ else if (bindingInfo.QueueInfo.QueueType == QueueType.Durable)
+ {
+ await target.BindDurableObsolete(bindingInfo.QueueInfo.Name);
+ QueueName = bindingInfo.QueueInfo.Name;
}
}
diff --git a/Tapeti/Default/DevNullLogger.cs b/Tapeti/Default/DevNullLogger.cs
index 5e247ea..aae6a27 100644
--- a/Tapeti/Default/DevNullLogger.cs
+++ b/Tapeti/Default/DevNullLogger.cs
@@ -28,5 +28,10 @@ namespace Tapeti.Default
public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
}
+
+ ///
+ public void QueueObsolete(string queueName, bool deleted, uint messageCount)
+ {
+ }
}
}
diff --git a/Tapeti/ILogger.cs b/Tapeti/ILogger.cs
index cbba1c7..c02e123 100644
--- a/Tapeti/ILogger.cs
+++ b/Tapeti/ILogger.cs
@@ -42,5 +42,13 @@ namespace Tapeti
///
/// Indicates the action taken by the exception handler
void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult);
+
+ ///
+ /// Called when a queue is determined to be obsolete.
+ ///
+ ///
+ /// True if the queue was empty and has been deleted, false if there are still messages to process
+ /// If deleted, the number of messages purged, otherwise the number of messages still in the queue
+ void QueueObsolete(string queueName, bool deleted, uint messageCount);
}
}
diff --git a/Tapeti/Tapeti.nuspec b/Tapeti/Tapeti.nuspec
index 72e9360..c484c35 100644
--- a/Tapeti/Tapeti.nuspec
+++ b/Tapeti/Tapeti.nuspec
@@ -6,7 +6,7 @@
Tapeti
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.png
false
diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs
index 916488d..dcd8127 100644
--- a/Tapeti/TapetiConfigControllers.cs
+++ b/Tapeti/TapetiConfigControllers.cs
@@ -40,6 +40,9 @@ namespace Tapeti
var controllerQueueInfo = GetQueueInfo(controller);
(builderAccess.DependencyResolver as IDependencyContainer)?.RegisterController(controller);
+ var controllerIsObsolete = controller.GetCustomAttribute() != null;
+
+
foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance)
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false)
.Select(m => (MethodInfo)m))
@@ -50,6 +53,9 @@ namespace Tapeti
$"Method {method.Name} or controller {controller.Name} requires a queue attribute");
+ var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute() != null;
+
+
var context = new ControllerBindingContext(method.GetParameters(), method.ReturnParameter)
{
Controller = controller,
@@ -83,6 +89,7 @@ namespace Tapeti
QueueInfo = methodQueueInfo,
MessageClass = context.MessageClass,
BindingTargetMode = context.BindingTargetMode,
+ IsObsolete = methodIsObsolete,
ParameterFactories = context.GetParameterHandlers(),
ResultHandler = context.GetResultHandler(),
diff --git a/docs/indepth.rst b/docs/indepth.rst
index 131f273..c4e4fe6 100644
--- a/docs/indepth.rst
+++ b/docs/indepth.rst
@@ -45,9 +45,41 @@ To enable the automatic creation of durable queues, call EnableDeclareDurableQue
.Build();
-The queue will be bound to all message classes for which you have defined a message handler. If the queue already existed and contains bindings which are no longer valid, those bindings will be removed. Note however that if there are still messages of that type in the queue they will be consumed and cause an exception.
+The queue will be bound to all message classes for which you have defined a message handler. If the queue already existed and contains bindings which are no longer valid, those bindings will be removed. Note however that if there are still messages of that type in the queue they will be consumed and cause an exception. To keep the queue backwards compatible, see the next section on migrating durable queues.
-At the time of writing there is no special support for obsolete queues. Once a durable queue is no longer referenced in the service it will remain in RabbitMQ along with any messages in it, without a consumer. This allows you to inspect the contents, perform any migrating steps necessary and delete the queue manually.
+
+Migrating durable queues
+------------------------
+.. note:: This section assumes you are using EnableDeclareDurableQueues.
+
+As your service evolves so can your message handlers. Perhaps a message no longer needs to handled, or you want to split them into another queue.
+
+If you remove a message handler the binding will also be removed from the queue, but there may still be messages of that type in the queue. Since these have nowhere to go, they will cause an error and be lost.
+
+Instead of removing the message handler you can mark it with the standard .NET ``[Obsolete]`` attribute:
+
+::
+
+ [MessageController]
+ [DurableQueue("monitoring")]
+ public class ObsoleteMonitoringController
+ {
+ [Obsolete]
+ public void HandleEscapeMessage(RabbitEscapedMessage message)
+ {
+ // Handle the message like before, perform the necessary migration,
+ // or simply ignore it if you no longer need it.
+ }
+ }
+
+Messages will still be consumed from the queue as long as it exists, but the routing key binding will removed so no new messages of that type will be delivered.
+
+The ``[Obsolete]`` attribute can also be applied to the entire controller to mark all message handlers it contains as obsolete.
+
+
+If all message handlers bound to a durable queue are marked as obsolete, including other controllers bound to the same durable queue, the queue is a candidate for removal. During startup, if the queue is empty it will be deleted. This action is logged to the registered ILogger.
+
+If there are still messages in the queue it's pending removal will be logged but the consumers will run as normal to empty the queue. The queue will then remain until it is checked again when the application is restarted.
Request - response