diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index f43f0af..f610d8b 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -52,16 +52,19 @@ namespace Tapeti.Flow.Default } - public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func next) + public async Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next) { await next(); if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext)) return; + if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method)) + // Do not call when the controller method was filtered, if the same message has two methods + return; + if (flowContext?.FlowStateLock != null) { - // TODO do not call when the controller method was filtered, if the same message has two methods if (!flowContext.IsStoredOrDeleted()) // The exception strategy can set the consume result to Success. Instead, check if the yield point // was handled. The flow provider ensures we only end up here in case of an exception. diff --git a/Tapeti/Config/IControllerCleanupMiddleware.cs b/Tapeti/Config/IControllerCleanupMiddleware.cs index c089b82..9cbd234 100644 --- a/Tapeti/Config/IControllerCleanupMiddleware.cs +++ b/Tapeti/Config/IControllerCleanupMiddleware.cs @@ -14,6 +14,6 @@ namespace Tapeti.Config /// /// /// Always call to allow the next in the chain to clean up - Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func next); + Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next); } } diff --git a/Tapeti/Config/IControllerMessageContext.cs b/Tapeti/Config/IControllerMessageContext.cs index 3eec194..16b650d 100644 --- a/Tapeti/Config/IControllerMessageContext.cs +++ b/Tapeti/Config/IControllerMessageContext.cs @@ -7,7 +7,7 @@ public interface IControllerMessageContext : IMessageContext { /// - /// An instance of the controller referenced by the binding. + /// An instance of the controller referenced by the binding. Note: is null during Cleanup. /// object Controller { get; } diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index dae8a26..4c334d8 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Reflection; @@ -182,10 +182,16 @@ namespace Tapeti.Default /// public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult) { - await MiddlewareHelper.GoAsync( - bindingInfo.CleanupMiddleware, - async (handler, next) => await handler.Cleanup(context, consumeResult, next), - () => Task.CompletedTask); + using (var controllerContext = new ControllerMessageContext(context) + { + Controller = null + }) + { + await MiddlewareHelper.GoAsync( + bindingInfo.CleanupMiddleware, + async (handler, next) => await handler.Cleanup(controllerContext, consumeResult, next), + () => Task.CompletedTask); + } } diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs index aaaddf6..9573512 100644 --- a/Tapeti/TapetiConfigControllers.cs +++ b/Tapeti/TapetiConfigControllers.cs @@ -47,15 +47,8 @@ namespace Tapeti .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) .Select(m => (MethodInfo)m)) { - var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo; - if (methodQueueInfo == null || !methodQueueInfo.IsValid) - throw new TopologyConfigurationException( - $"Method {method.Name} or controller {controller.Name} requires a queue attribute"); - - var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute() != null; - var context = new ControllerBindingContext(method.GetParameters(), method.ReturnParameter) { Controller = controller, @@ -85,6 +78,10 @@ namespace Tapeti throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}"); } + var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo; + if (methodQueueInfo == null || !methodQueueInfo.IsValid) + throw new TopologyConfigurationException( + $"Method {method.Name} or controller {controller.Name} requires a queue attribute"); builder.RegisterBinding(new ControllerMethodBinding(builderAccess.DependencyResolver, new ControllerMethodBinding.BindingInfo {