1
0
mirror of synced 2024-07-03 09:00:36 +00:00

RDB-136 Flow tabel wordt niet meer opgeruimd Bij een exceptie in Flow

Refactoring om de actie van de Exceptionstrategie door te geven aan de cleanup stack
This commit is contained in:
Menno van Lavieren 2017-10-17 13:29:16 +02:00
parent ef0268d9ac
commit d549faf7b3
14 changed files with 223 additions and 24 deletions

View File

@ -9,7 +9,7 @@ namespace Tapeti.Flow.Default
{ {
public class FlowCleanupMiddleware : ICleanupMiddleware public class FlowCleanupMiddleware : ICleanupMiddleware
{ {
public async Task Handle(IMessageContext context, ConsumeResponse response) public async Task Handle(IMessageContext context, HandlingResult handlingResult)
{ {
object flowContextObj; object flowContextObj;
if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextObj)) if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextObj))
@ -18,7 +18,8 @@ namespace Tapeti.Flow.Default
if (flowContext?.FlowStateLock != null) if (flowContext?.FlowStateLock != null)
{ {
if (response == ConsumeResponse.Nack) if (handlingResult.ConsumeResponse == ConsumeResponse.Nack
|| handlingResult.MessageAction == MessageAction.ErrorLog)
{ {
await flowContext.FlowStateLock.DeleteFlowState(); await flowContext.FlowStateLock.DeleteFlowState();
} }

View File

@ -55,25 +55,28 @@ namespace Tapeti.Flow.Default
var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>();
ConsumeResponse response = ConsumeResponse.Nack; HandlingResultBuilder handlingResult = new HandlingResultBuilder
{
ConsumeResponse = ConsumeResponse.Nack,
};
try try
{ {
await flowHandler.Execute(context, yieldPoint); await flowHandler.Execute(context, yieldPoint);
response = ConsumeResponse.Ack; handlingResult.ConsumeResponse = ConsumeResponse.Ack;
} }
finally finally
{ {
await RunCleanup(context, response); await RunCleanup(context, handlingResult.ToHandlingResult());
} }
} }
private async Task RunCleanup(MessageContext context, ConsumeResponse response) private async Task RunCleanup(MessageContext context, HandlingResult handlingResult)
{ {
foreach (var handler in config.CleanupMiddleware) foreach (var handler in config.CleanupMiddleware)
{ {
try try
{ {
await handler.Handle(context, response); await handler.Handle(context, handlingResult);
} }
catch (Exception eCleanup) catch (Exception eCleanup)
{ {

View File

@ -8,6 +8,6 @@ namespace Tapeti.Config
{ {
public interface ICleanupMiddleware public interface ICleanupMiddleware
{ {
Task Handle(IMessageContext context, ConsumeResponse response); Task Handle(IMessageContext context, HandlingResult handlingResult);
} }
} }

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Config
{
public interface IExceptionStrategyContext
{
IMessageContext MessageContext { get; }
Exception Exception { get; }
HandlingResultBuilder HandlingResult { get; set; }
}
}

View File

@ -44,7 +44,7 @@ namespace Tapeti.Connection
{ {
ExceptionDispatchInfo exception = null; ExceptionDispatchInfo exception = null;
MessageContext context = null; MessageContext context = null;
ConsumeResponse response = ConsumeResponse.Nack; HandlingResult handlingResult = null;
try try
{ {
try try
@ -59,7 +59,11 @@ namespace Tapeti.Connection
await DispatchMesage(context, body); await DispatchMesage(context, body);
response = ConsumeResponse.Ack; handlingResult = new HandlingResult
{
ConsumeResponse = ConsumeResponse.Ack,
MessageAction = MessageAction.None
};
} }
catch (Exception eDispatch) catch (Exception eDispatch)
{ {
@ -67,7 +71,11 @@ namespace Tapeti.Connection
logger.HandlerException(eDispatch); logger.HandlerException(eDispatch);
try try
{ {
response = exceptionStrategy.HandleException(null, exception.SourceException); var exceptionStrategyContext = new ExceptionStrategyContext(context, exception.SourceException);
exceptionStrategy.HandleException(exceptionStrategyContext);
handlingResult = exceptionStrategyContext.HandlingResult.ToHandlingResult();
} }
catch (Exception eStrategy) catch (Exception eStrategy)
{ {
@ -76,7 +84,15 @@ namespace Tapeti.Connection
} }
try try
{ {
await RunCleanup(context, response); if (handlingResult == null)
{
handlingResult = new HandlingResult
{
ConsumeResponse = ConsumeResponse.Nack,
MessageAction = MessageAction.None
};
}
await RunCleanup(context, handlingResult);
} }
catch (Exception eCleanup) catch (Exception eCleanup)
{ {
@ -87,7 +103,15 @@ namespace Tapeti.Connection
{ {
try try
{ {
await worker.Respond(deliveryTag, response); if (handlingResult == null)
{
handlingResult = new HandlingResult
{
ConsumeResponse = ConsumeResponse.Nack,
MessageAction = MessageAction.None
};
}
await worker.Respond(deliveryTag, handlingResult.ConsumeResponse);
} }
catch (Exception eRespond) catch (Exception eRespond)
{ {
@ -108,13 +132,13 @@ namespace Tapeti.Connection
}); });
} }
private async Task RunCleanup(MessageContext context, ConsumeResponse response) private async Task RunCleanup(MessageContext context, HandlingResult handlingResult)
{ {
foreach(var handler in cleanupMiddleware) foreach(var handler in cleanupMiddleware)
{ {
try try
{ {
await handler.Handle(context, response); await handler.Handle(context, handlingResult);
} }
catch (Exception eCleanup) catch (Exception eCleanup)
{ {

View File

@ -0,0 +1,40 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Default
{
public class ExceptionStrategyContext : IExceptionStrategyContext
{
internal ExceptionStrategyContext(IMessageContext messageContext, Exception exception)
{
MessageContext = messageContext;
Exception = exception;
}
public IMessageContext MessageContext { get; }
public Exception Exception { get; }
private HandlingResultBuilder handlingResult;
public HandlingResultBuilder HandlingResult
{
get
{
if (handlingResult == null)
{
handlingResult = new HandlingResultBuilder();
}
return handlingResult;
}
set
{
handlingResult = value;
}
}
}
}

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Default
{
public class NackExceptionStrategy : IExceptionStrategy
{
public void HandleException(IExceptionStrategyContext context)
{
context.HandlingResult.ConsumeResponse = ConsumeResponse.Nack;
}
}
}

View File

@ -5,10 +5,9 @@ namespace Tapeti.Default
{ {
public class RequeueExceptionStrategy : IExceptionStrategy public class RequeueExceptionStrategy : IExceptionStrategy
{ {
public ConsumeResponse HandleException(IMessageContext context, Exception exception) public void HandleException(IExceptionStrategyContext context)
{ {
// TODO log exception context.HandlingResult.ConsumeResponse = ConsumeResponse.Requeue;
return ConsumeResponse.Requeue;
} }
} }
} }

79
Tapeti/HandlingResult.cs Normal file
View File

@ -0,0 +1,79 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti
{
public class HandlingResult
{
public HandlingResult()
{
ConsumeResponse = ConsumeResponse.Nack;
MessageAction = MessageAction.None;
}
/// <summary>
/// Determines which response will be given to the message bus from where the message originates.
/// </summary>
public ConsumeResponse ConsumeResponse { get; internal set; }
/// <summary>
/// Registers which action the Exception strategy has taken or will take to handle the error condition
/// on the message. This is important to know for cleanup handlers registered by middleware.
/// </summary>
public MessageAction MessageAction { get; internal set; }
}
public class HandlingResultBuilder
{
private static readonly HandlingResult Default = new HandlingResult();
private HandlingResult data = Default;
public ConsumeResponse ConsumeResponse {
get
{
return data.ConsumeResponse;
}
set
{
GetWritableData().ConsumeResponse = value;
}
}
public MessageAction MessageAction
{
get
{
return data.MessageAction;
}
set
{
GetWritableData().MessageAction = value;
}
}
public HandlingResult ToHandlingResult()
{
if (data == Default)
{
return new HandlingResult();
}
var result = GetWritableData();
data = Default;
return result;
}
private HandlingResult GetWritableData()
{
if (data == Default)
{
data = new HandlingResult();
}
return data;
}
}
}

View File

@ -8,9 +8,9 @@ namespace Tapeti
/// <summary> /// <summary>
/// Called when an exception occurs while handling a message. /// Called when an exception occurs while handling a message.
/// </summary> /// </summary>
/// <param name="context">The message context if available. May be null!</param> /// <param name="context">The exception strategy context containing the necessary data including the message context and the thrown exception.
/// <param name="exception">The exception instance</param> /// Also the response to the message can be set.
/// <returns>The ConsumeResponse to determine whether to requeue, dead-letter (nack) or simply ack the message.</returns> /// If there is any other handling of the message than the expected default than HandlingResult.MessageFutureAction must be set accordingly. </param>
ConsumeResponse HandleException(IMessageContext context, Exception exception); void HandleException(IExceptionStrategyContext context);
} }
} }

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti
{
public enum MessageAction
{
None = 1,
ErrorLog = 2,
Retry = 3,
}
}

View File

@ -53,6 +53,7 @@
<Reference Include="System.Xml" /> <Reference Include="System.Xml" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="Config\IExceptionStrategyContext.cs" />
<Compile Include="Config\ICleanupMiddleware.cs" /> <Compile Include="Config\ICleanupMiddleware.cs" />
<Compile Include="Config\IPublishContext.cs" /> <Compile Include="Config\IPublishContext.cs" />
<Compile Include="Config\IMessageFilterMiddleware.cs" /> <Compile Include="Config\IMessageFilterMiddleware.cs" />
@ -64,6 +65,9 @@
<Compile Include="Connection\TapetiWorker.cs" /> <Compile Include="Connection\TapetiWorker.cs" />
<Compile Include="Default\ConsoleLogger.cs" /> <Compile Include="Default\ConsoleLogger.cs" />
<Compile Include="Default\DevNullLogger.cs" /> <Compile Include="Default\DevNullLogger.cs" />
<Compile Include="Default\ExceptionStrategyContext.cs" />
<Compile Include="Default\NackExceptionStrategy.cs" />
<Compile Include="HandlingResult.cs" />
<Compile Include="Default\JsonMessageSerializer.cs" /> <Compile Include="Default\JsonMessageSerializer.cs" />
<Compile Include="Default\MessageContext.cs" /> <Compile Include="Default\MessageContext.cs" />
<Compile Include="Default\PublishResultBinding.cs" /> <Compile Include="Default\PublishResultBinding.cs" />
@ -84,6 +88,7 @@
<Compile Include="Config\IConfig.cs" /> <Compile Include="Config\IConfig.cs" />
<Compile Include="MessageController.cs" /> <Compile Include="MessageController.cs" />
<Compile Include="Config\IBindingMiddleware.cs" /> <Compile Include="Config\IBindingMiddleware.cs" />
<Compile Include="MessageFutureAction.cs" />
<Compile Include="TapetiAppSettingsConnectionParams.cs" /> <Compile Include="TapetiAppSettingsConnectionParams.cs" />
<Compile Include="TapetiConnectionParams.cs" /> <Compile Include="TapetiConnectionParams.cs" />
<Compile Include="TapetiConfig.cs" /> <Compile Include="TapetiConfig.cs" />

View File

@ -143,7 +143,7 @@ namespace Tapeti
container.RegisterDefault<IMessageSerializer, JsonMessageSerializer>(); container.RegisterDefault<IMessageSerializer, JsonMessageSerializer>();
container.RegisterDefault<IExchangeStrategy, NamespaceMatchExchangeStrategy>(); container.RegisterDefault<IExchangeStrategy, NamespaceMatchExchangeStrategy>();
container.RegisterDefault<IRoutingKeyStrategy, TypeNameRoutingKeyStrategy>(); container.RegisterDefault<IRoutingKeyStrategy, TypeNameRoutingKeyStrategy>();
container.RegisterDefault<IExceptionStrategy, RequeueExceptionStrategy>(); container.RegisterDefault<IExceptionStrategy, NackExceptionStrategy>();
} }

View File

@ -57,7 +57,6 @@ namespace Test
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage> return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>
(new PoloConfirmationRequestMessage(), (new PoloConfirmationRequestMessage(),
HandlePoloConfirmationResponseEnd); HandlePoloConfirmationResponseEnd);
} }