From 2b56d0130ec39e47ed6f743e463e9007f040811f Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Fri, 13 Oct 2017 13:49:47 +0200 Subject: [PATCH] RDB-136 Flow tabel wordt niet meer opgeruimd Bij een exceptie in Flow Flow ruimt nu zijn locks goed op en de weggeschreven state ingeval van exceptions tijdens het afhandelen van de message --- Tapeti.Flow/Default/FlowCleanupMiddleware.cs | 29 ++++++++++++++++++++ Tapeti.Flow/FlowMiddleware.cs | 3 +- Tapeti.Flow/Tapeti.Flow.csproj | 1 + Tapeti/Connection/TapetiConsumer.cs | 2 ++ 4 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 Tapeti.Flow/Default/FlowCleanupMiddleware.cs diff --git a/Tapeti.Flow/Default/FlowCleanupMiddleware.cs b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs new file mode 100644 index 0000000..e0420c8 --- /dev/null +++ b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Flow.Default +{ + public class FlowCleanupMiddleware : ICleanupMiddleware + { + public async Task Handle(IMessageContext context, ConsumeResponse response) + { + object flowContextObj; + if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextObj)) + return; + var flowContext = (FlowContext)flowContextObj; + + if (flowContext.FlowStateLock != null) + { + if (response == ConsumeResponse.Nack) + { + await flowContext.FlowStateLock.DeleteFlowState(); + } + flowContext.FlowStateLock.Dispose(); + } + } + } +} diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowMiddleware.cs index bef12a5..fb13d90 100644 --- a/Tapeti.Flow/FlowMiddleware.cs +++ b/Tapeti.Flow/FlowMiddleware.cs @@ -24,7 +24,8 @@ namespace Tapeti.Flow public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) { - return new[] { new FlowBindingMiddleware() }; + yield return new FlowBindingMiddleware(); + yield return new FlowCleanupMiddleware(); } } } diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index b085913..5a4b9fd 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -54,6 +54,7 @@ + diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 9c7a118..3beb9ed 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -129,6 +129,8 @@ namespace Tapeti.Connection if (message == null) throw new ArgumentException("Empty message"); + context.Message = message; + var validMessageType = false; foreach (var binding in bindings)