From 06e1139840b66b393bed8cb975722e7d653f8e32 Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Fri, 13 Oct 2017 13:07:41 +0200 Subject: [PATCH] RDB-136 Flow tabel wordt niet meer opgeruimd Bij een exceptie in Flow Mogelijkheid voor het registreren van Cleanup middleware in Tapeti --- Tapeti/Config/ICleanupMiddleware.cs | 13 +++++++++++++ Tapeti/Config/IConfig.cs | 1 + Tapeti/Connection/TapetiConsumer.cs | 22 +++++++++++++++++++--- Tapeti/Connection/TapetiWorker.cs | 2 +- Tapeti/Tapeti.csproj | 1 + Tapeti/TapetiConfig.cs | 16 ++++++++++++++-- 6 files changed, 49 insertions(+), 6 deletions(-) create mode 100644 Tapeti/Config/ICleanupMiddleware.cs diff --git a/Tapeti/Config/ICleanupMiddleware.cs b/Tapeti/Config/ICleanupMiddleware.cs new file mode 100644 index 0000000..e056ad0 --- /dev/null +++ b/Tapeti/Config/ICleanupMiddleware.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface ICleanupMiddleware + { + Task Handle(IMessageContext context, ConsumeResponse response); + } +} diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs index a7cd268..46bf575 100644 --- a/Tapeti/Config/IConfig.cs +++ b/Tapeti/Config/IConfig.cs @@ -9,6 +9,7 @@ namespace Tapeti.Config { IDependencyResolver DependencyResolver { get; } IReadOnlyList MessageMiddleware { get; } + IReadOnlyList CleanupMiddleware { get; } IReadOnlyList PublishMiddleware { get; } IEnumerable Queues { get; } diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 4bfb1a7..9c7a118 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -16,18 +16,20 @@ namespace Tapeti.Connection private readonly string queueName; private readonly IDependencyResolver dependencyResolver; private readonly IReadOnlyList messageMiddleware; + private readonly IReadOnlyList cleanupMiddleware; private readonly List bindings; private readonly ILogger logger; private readonly IExceptionStrategy exceptionStrategy; - public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware) + public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware, IReadOnlyList cleanupMiddleware) { this.worker = worker; this.queueName = queueName; this.dependencyResolver = dependencyResolver; this.messageMiddleware = messageMiddleware; + this.cleanupMiddleware = cleanupMiddleware; this.bindings = bindings.ToList(); logger = dependencyResolver.Resolve(); @@ -74,8 +76,7 @@ namespace Tapeti.Connection } try { - // Cleanup handlers - //response = exceptionStrategy.HandleException(null, exception.SourceException); + await RunCleanup(context, response); } catch (Exception eCleanup) { @@ -107,6 +108,21 @@ namespace Tapeti.Connection }); } + private async Task RunCleanup(MessageContext context, ConsumeResponse response) + { + foreach(var handler in cleanupMiddleware) + { + try + { + await handler.Handle(context, response); + } + catch (Exception eCleanup) + { + logger.HandlerException(eCleanup); + } + } + } + private async Task DispatchMesage(MessageContext context, byte[] body) { var message = dependencyResolver.Resolve().Deserialize(body, context.Properties); diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index 8b9f7d6..e539bf7 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -57,7 +57,7 @@ namespace Tapeti.Connection return taskQueue.Value.Add(async () => { - (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware)); + (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)); }).Unwrap(); } diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 493548a..f41e903 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -53,6 +53,7 @@ + diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 75ed930..b8ed17b 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -26,6 +26,7 @@ namespace Tapeti private readonly List bindingMiddleware = new List(); private readonly List messageMiddleware = new List(); + private readonly List cleanupMiddleware = new List(); private readonly List publishMiddleware = new List(); private readonly IDependencyResolver dependencyResolver; @@ -62,7 +63,7 @@ namespace Tapeti queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl))); - var config = new Config(dependencyResolver, messageMiddleware, publishMiddleware, queues); + var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues); (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); return config; @@ -83,6 +84,13 @@ namespace Tapeti } + public TapetiConfig Use(ICleanupMiddleware handler) + { + cleanupMiddleware.Add(handler); + return this; + } + + public TapetiConfig Use(IPublishMiddleware handler) { publishMiddleware.Add(handler); @@ -108,6 +116,8 @@ namespace Tapeti Use((IBindingMiddleware)middleware); else if (middleware is IMessageMiddleware) Use((IMessageMiddleware)middleware); + else if (middleware is ICleanupMiddleware) + Use((ICleanupMiddleware)middleware); else if (middleware is IPublishMiddleware) Use((IPublishMiddleware)middleware); else @@ -345,16 +355,18 @@ namespace Tapeti { public IDependencyResolver DependencyResolver { get; } public IReadOnlyList MessageMiddleware { get; } + public IReadOnlyList CleanupMiddleware { get; } public IReadOnlyList PublishMiddleware { get; } public IEnumerable Queues { get; } private readonly Dictionary bindingMethodLookup; - public Config(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IReadOnlyList publishMiddleware, IEnumerable queues) + public Config(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IReadOnlyList cleanupMiddleware, IReadOnlyList publishMiddleware, IEnumerable queues) { DependencyResolver = dependencyResolver; MessageMiddleware = messageMiddleware; + CleanupMiddleware = cleanupMiddleware; PublishMiddleware = publishMiddleware; Queues = queues.ToList();