diff --git a/Tapeti.Flow/Default/FlowCleanupMiddleware.cs b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs new file mode 100644 index 0000000..e0420c8 --- /dev/null +++ b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Flow.Default +{ + public class FlowCleanupMiddleware : ICleanupMiddleware + { + public async Task Handle(IMessageContext context, ConsumeResponse response) + { + object flowContextObj; + if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextObj)) + return; + var flowContext = (FlowContext)flowContextObj; + + if (flowContext.FlowStateLock != null) + { + if (response == ConsumeResponse.Nack) + { + await flowContext.FlowStateLock.DeleteFlowState(); + } + flowContext.FlowStateLock.Dispose(); + } + } + } +} diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowMiddleware.cs index bef12a5..fb13d90 100644 --- a/Tapeti.Flow/FlowMiddleware.cs +++ b/Tapeti.Flow/FlowMiddleware.cs @@ -24,7 +24,8 @@ namespace Tapeti.Flow public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) { - return new[] { new FlowBindingMiddleware() }; + yield return new FlowBindingMiddleware(); + yield return new FlowCleanupMiddleware(); } } } diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index b085913..5a4b9fd 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -54,6 +54,7 @@ + diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 9c7a118..3beb9ed 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -129,6 +129,8 @@ namespace Tapeti.Connection if (message == null) throw new ArgumentException("Empty message"); + context.Message = message; + var validMessageType = false; foreach (var binding in bindings)