diff --git a/Tapeti.Flow/Default/FlowCleanupMiddleware.cs b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs index 811be59..16cf61b 100644 --- a/Tapeti.Flow/Default/FlowCleanupMiddleware.cs +++ b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs @@ -9,7 +9,7 @@ namespace Tapeti.Flow.Default { public class FlowCleanupMiddleware : ICleanupMiddleware { - public async Task Handle(IMessageContext context, ConsumeResponse response) + public async Task Handle(IMessageContext context, HandlingResult handlingResult) { object flowContextObj; if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextObj)) @@ -18,7 +18,8 @@ namespace Tapeti.Flow.Default if (flowContext?.FlowStateLock != null) { - if (response == ConsumeResponse.Nack) + if (handlingResult.ConsumeResponse == ConsumeResponse.Nack + || handlingResult.MessageAction == MessageAction.ErrorLog) { await flowContext.FlowStateLock.DeleteFlowState(); } diff --git a/Tapeti.Flow/Default/FlowStarter.cs b/Tapeti.Flow/Default/FlowStarter.cs index d376f65..306f034 100644 --- a/Tapeti.Flow/Default/FlowStarter.cs +++ b/Tapeti.Flow/Default/FlowStarter.cs @@ -55,25 +55,28 @@ namespace Tapeti.Flow.Default var flowHandler = config.DependencyResolver.Resolve(); - ConsumeResponse response = ConsumeResponse.Nack; + HandlingResultBuilder handlingResult = new HandlingResultBuilder + { + ConsumeResponse = ConsumeResponse.Nack, + }; try { await flowHandler.Execute(context, yieldPoint); - response = ConsumeResponse.Ack; + handlingResult.ConsumeResponse = ConsumeResponse.Ack; } finally { - await RunCleanup(context, response); + await RunCleanup(context, handlingResult.ToHandlingResult()); } } - private async Task RunCleanup(MessageContext context, ConsumeResponse response) + private async Task RunCleanup(MessageContext context, HandlingResult handlingResult) { foreach (var handler in config.CleanupMiddleware) { try { - await handler.Handle(context, response); + await handler.Handle(context, handlingResult); } catch (Exception eCleanup) { diff --git a/Tapeti/Config/ICleanupMiddleware.cs b/Tapeti/Config/ICleanupMiddleware.cs index e056ad0..290236e 100644 --- a/Tapeti/Config/ICleanupMiddleware.cs +++ b/Tapeti/Config/ICleanupMiddleware.cs @@ -8,6 +8,6 @@ namespace Tapeti.Config { public interface ICleanupMiddleware { - Task Handle(IMessageContext context, ConsumeResponse response); + Task Handle(IMessageContext context, HandlingResult handlingResult); } } diff --git a/Tapeti/Config/IExceptionStrategyContext.cs b/Tapeti/Config/IExceptionStrategyContext.cs new file mode 100644 index 0000000..2a99af9 --- /dev/null +++ b/Tapeti/Config/IExceptionStrategyContext.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface IExceptionStrategyContext + { + IMessageContext MessageContext { get; } + + Exception Exception { get; } + + HandlingResultBuilder HandlingResult { get; set; } + } +} diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 3beb9ed..03355f5 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -44,7 +44,7 @@ namespace Tapeti.Connection { ExceptionDispatchInfo exception = null; MessageContext context = null; - ConsumeResponse response = ConsumeResponse.Nack; + HandlingResult handlingResult = null; try { try @@ -59,7 +59,11 @@ namespace Tapeti.Connection await DispatchMesage(context, body); - response = ConsumeResponse.Ack; + handlingResult = new HandlingResult + { + ConsumeResponse = ConsumeResponse.Ack, + MessageAction = MessageAction.None + }; } catch (Exception eDispatch) { @@ -67,7 +71,11 @@ namespace Tapeti.Connection logger.HandlerException(eDispatch); try { - response = exceptionStrategy.HandleException(null, exception.SourceException); + var exceptionStrategyContext = new ExceptionStrategyContext(context, exception.SourceException); + + exceptionStrategy.HandleException(exceptionStrategyContext); + + handlingResult = exceptionStrategyContext.HandlingResult.ToHandlingResult(); } catch (Exception eStrategy) { @@ -76,7 +84,15 @@ namespace Tapeti.Connection } try { - await RunCleanup(context, response); + if (handlingResult == null) + { + handlingResult = new HandlingResult + { + ConsumeResponse = ConsumeResponse.Nack, + MessageAction = MessageAction.None + }; + } + await RunCleanup(context, handlingResult); } catch (Exception eCleanup) { @@ -87,7 +103,15 @@ namespace Tapeti.Connection { try { - await worker.Respond(deliveryTag, response); + if (handlingResult == null) + { + handlingResult = new HandlingResult + { + ConsumeResponse = ConsumeResponse.Nack, + MessageAction = MessageAction.None + }; + } + await worker.Respond(deliveryTag, handlingResult.ConsumeResponse); } catch (Exception eRespond) { @@ -108,13 +132,13 @@ namespace Tapeti.Connection }); } - private async Task RunCleanup(MessageContext context, ConsumeResponse response) + private async Task RunCleanup(MessageContext context, HandlingResult handlingResult) { foreach(var handler in cleanupMiddleware) { try { - await handler.Handle(context, response); + await handler.Handle(context, handlingResult); } catch (Exception eCleanup) { diff --git a/Tapeti/Default/ExceptionStrategyContext.cs b/Tapeti/Default/ExceptionStrategyContext.cs new file mode 100644 index 0000000..fc24ab3 --- /dev/null +++ b/Tapeti/Default/ExceptionStrategyContext.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class ExceptionStrategyContext : IExceptionStrategyContext + { + internal ExceptionStrategyContext(IMessageContext messageContext, Exception exception) + { + MessageContext = messageContext; + Exception = exception; + } + + public IMessageContext MessageContext { get; } + + public Exception Exception { get; } + + private HandlingResultBuilder handlingResult; + public HandlingResultBuilder HandlingResult + { + get + { + if (handlingResult == null) + { + handlingResult = new HandlingResultBuilder(); + } + return handlingResult; + } + + set + { + handlingResult = value; + } + } + } +} diff --git a/Tapeti/Default/NackExceptionStrategy.cs b/Tapeti/Default/NackExceptionStrategy.cs new file mode 100644 index 0000000..48babe3 --- /dev/null +++ b/Tapeti/Default/NackExceptionStrategy.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class NackExceptionStrategy : IExceptionStrategy + { + public void HandleException(IExceptionStrategyContext context) + { + context.HandlingResult.ConsumeResponse = ConsumeResponse.Nack; + } + } +} diff --git a/Tapeti/Default/RequeueExceptionStrategy.cs b/Tapeti/Default/RequeueExceptionStrategy.cs index 6a20ca7..afa3143 100644 --- a/Tapeti/Default/RequeueExceptionStrategy.cs +++ b/Tapeti/Default/RequeueExceptionStrategy.cs @@ -5,10 +5,9 @@ namespace Tapeti.Default { public class RequeueExceptionStrategy : IExceptionStrategy { - public ConsumeResponse HandleException(IMessageContext context, Exception exception) + public void HandleException(IExceptionStrategyContext context) { - // TODO log exception - return ConsumeResponse.Requeue; + context.HandlingResult.ConsumeResponse = ConsumeResponse.Requeue; } } } diff --git a/Tapeti/HandlingResult.cs b/Tapeti/HandlingResult.cs new file mode 100644 index 0000000..e1bb575 --- /dev/null +++ b/Tapeti/HandlingResult.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti +{ + public class HandlingResult + { + public HandlingResult() + { + ConsumeResponse = ConsumeResponse.Nack; + MessageAction = MessageAction.None; + } + + /// + /// Determines which response will be given to the message bus from where the message originates. + /// + public ConsumeResponse ConsumeResponse { get; internal set; } + + /// + /// Registers which action the Exception strategy has taken or will take to handle the error condition + /// on the message. This is important to know for cleanup handlers registered by middleware. + /// + public MessageAction MessageAction { get; internal set; } + + } + + public class HandlingResultBuilder + { + private static readonly HandlingResult Default = new HandlingResult(); + + private HandlingResult data = Default; + + public ConsumeResponse ConsumeResponse { + get + { + return data.ConsumeResponse; + } + set + { + GetWritableData().ConsumeResponse = value; + } + } + + public MessageAction MessageAction + { + get + { + return data.MessageAction; + } + set + { + GetWritableData().MessageAction = value; + } + } + + public HandlingResult ToHandlingResult() + { + if (data == Default) + { + return new HandlingResult(); + } + var result = GetWritableData(); + data = Default; + return result; + } + + private HandlingResult GetWritableData() + { + if (data == Default) + { + data = new HandlingResult(); + } + return data; + } + } +} diff --git a/Tapeti/IExceptionStrategy.cs b/Tapeti/IExceptionStrategy.cs index 7b46af6..7525324 100644 --- a/Tapeti/IExceptionStrategy.cs +++ b/Tapeti/IExceptionStrategy.cs @@ -8,9 +8,9 @@ namespace Tapeti /// /// Called when an exception occurs while handling a message. /// - /// The message context if available. May be null! - /// The exception instance - /// The ConsumeResponse to determine whether to requeue, dead-letter (nack) or simply ack the message. - ConsumeResponse HandleException(IMessageContext context, Exception exception); + /// The exception strategy context containing the necessary data including the message context and the thrown exception. + /// Also the response to the message can be set. + /// If there is any other handling of the message than the expected default than HandlingResult.MessageFutureAction must be set accordingly. + void HandleException(IExceptionStrategyContext context); } } diff --git a/Tapeti/MessageFutureAction.cs b/Tapeti/MessageFutureAction.cs new file mode 100644 index 0000000..7cbd319 --- /dev/null +++ b/Tapeti/MessageFutureAction.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti +{ + public enum MessageAction + { + None = 1, + ErrorLog = 2, + Retry = 3, + } +} diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index f41e903..5ce8b98 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -53,6 +53,7 @@ + @@ -64,6 +65,9 @@ + + + @@ -84,6 +88,7 @@ + diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index b8ed17b..117c4c9 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -143,7 +143,7 @@ namespace Tapeti container.RegisterDefault(); container.RegisterDefault(); container.RegisterDefault(); - container.RegisterDefault(); + container.RegisterDefault(); } diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index 4e631b2..fe78c4b 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -57,7 +57,6 @@ namespace Test return flowProvider.YieldWithRequestSync (new PoloConfirmationRequestMessage(), HandlePoloConfirmationResponseEnd); - }