1
0
mirror of synced 2025-01-22 16:13:07 +01:00

RDB-136 Flow tabel wordt niet meer opgeruimd Bij een exceptie in Flow

Refactoring om de actie van de Exceptionstrategie door te geven aan de cleanup stack
This commit is contained in:
Menno van Lavieren 2017-10-17 13:29:16 +02:00
parent 3ce847c21f
commit ee86e2c739
14 changed files with 223 additions and 24 deletions

View File

@ -9,7 +9,7 @@ namespace Tapeti.Flow.Default
{
public class FlowCleanupMiddleware : ICleanupMiddleware
{
public async Task Handle(IMessageContext context, ConsumeResponse response)
public async Task Handle(IMessageContext context, HandlingResult handlingResult)
{
object flowContextObj;
if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextObj))
@ -18,7 +18,8 @@ namespace Tapeti.Flow.Default
if (flowContext?.FlowStateLock != null)
{
if (response == ConsumeResponse.Nack)
if (handlingResult.ConsumeResponse == ConsumeResponse.Nack
|| handlingResult.MessageAction == MessageAction.ErrorLog)
{
await flowContext.FlowStateLock.DeleteFlowState();
}

View File

@ -55,25 +55,28 @@ namespace Tapeti.Flow.Default
var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>();
ConsumeResponse response = ConsumeResponse.Nack;
HandlingResultBuilder handlingResult = new HandlingResultBuilder
{
ConsumeResponse = ConsumeResponse.Nack,
};
try
{
await flowHandler.Execute(context, yieldPoint);
response = ConsumeResponse.Ack;
handlingResult.ConsumeResponse = ConsumeResponse.Ack;
}
finally
{
await RunCleanup(context, response);
await RunCleanup(context, handlingResult.ToHandlingResult());
}
}
private async Task RunCleanup(MessageContext context, ConsumeResponse response)
private async Task RunCleanup(MessageContext context, HandlingResult handlingResult)
{
foreach (var handler in config.CleanupMiddleware)
{
try
{
await handler.Handle(context, response);
await handler.Handle(context, handlingResult);
}
catch (Exception eCleanup)
{

View File

@ -8,6 +8,6 @@ namespace Tapeti.Config
{
public interface ICleanupMiddleware
{
Task Handle(IMessageContext context, ConsumeResponse response);
Task Handle(IMessageContext context, HandlingResult handlingResult);
}
}

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Config
{
public interface IExceptionStrategyContext
{
IMessageContext MessageContext { get; }
Exception Exception { get; }
HandlingResultBuilder HandlingResult { get; set; }
}
}

View File

@ -44,7 +44,7 @@ namespace Tapeti.Connection
{
ExceptionDispatchInfo exception = null;
MessageContext context = null;
ConsumeResponse response = ConsumeResponse.Nack;
HandlingResult handlingResult = null;
try
{
try
@ -59,7 +59,11 @@ namespace Tapeti.Connection
await DispatchMesage(context, body);
response = ConsumeResponse.Ack;
handlingResult = new HandlingResult
{
ConsumeResponse = ConsumeResponse.Ack,
MessageAction = MessageAction.None
};
}
catch (Exception eDispatch)
{
@ -67,7 +71,11 @@ namespace Tapeti.Connection
logger.HandlerException(eDispatch);
try
{
response = exceptionStrategy.HandleException(null, exception.SourceException);
var exceptionStrategyContext = new ExceptionStrategyContext(context, exception.SourceException);
exceptionStrategy.HandleException(exceptionStrategyContext);
handlingResult = exceptionStrategyContext.HandlingResult.ToHandlingResult();
}
catch (Exception eStrategy)
{
@ -76,7 +84,15 @@ namespace Tapeti.Connection
}
try
{
await RunCleanup(context, response);
if (handlingResult == null)
{
handlingResult = new HandlingResult
{
ConsumeResponse = ConsumeResponse.Nack,
MessageAction = MessageAction.None
};
}
await RunCleanup(context, handlingResult);
}
catch (Exception eCleanup)
{
@ -87,7 +103,15 @@ namespace Tapeti.Connection
{
try
{
await worker.Respond(deliveryTag, response);
if (handlingResult == null)
{
handlingResult = new HandlingResult
{
ConsumeResponse = ConsumeResponse.Nack,
MessageAction = MessageAction.None
};
}
await worker.Respond(deliveryTag, handlingResult.ConsumeResponse);
}
catch (Exception eRespond)
{
@ -108,13 +132,13 @@ namespace Tapeti.Connection
});
}
private async Task RunCleanup(MessageContext context, ConsumeResponse response)
private async Task RunCleanup(MessageContext context, HandlingResult handlingResult)
{
foreach(var handler in cleanupMiddleware)
{
try
{
await handler.Handle(context, response);
await handler.Handle(context, handlingResult);
}
catch (Exception eCleanup)
{

View File

@ -0,0 +1,40 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Default
{
public class ExceptionStrategyContext : IExceptionStrategyContext
{
internal ExceptionStrategyContext(IMessageContext messageContext, Exception exception)
{
MessageContext = messageContext;
Exception = exception;
}
public IMessageContext MessageContext { get; }
public Exception Exception { get; }
private HandlingResultBuilder handlingResult;
public HandlingResultBuilder HandlingResult
{
get
{
if (handlingResult == null)
{
handlingResult = new HandlingResultBuilder();
}
return handlingResult;
}
set
{
handlingResult = value;
}
}
}
}

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Default
{
public class NackExceptionStrategy : IExceptionStrategy
{
public void HandleException(IExceptionStrategyContext context)
{
context.HandlingResult.ConsumeResponse = ConsumeResponse.Nack;
}
}
}

View File

@ -5,10 +5,9 @@ namespace Tapeti.Default
{
public class RequeueExceptionStrategy : IExceptionStrategy
{
public ConsumeResponse HandleException(IMessageContext context, Exception exception)
public void HandleException(IExceptionStrategyContext context)
{
// TODO log exception
return ConsumeResponse.Requeue;
context.HandlingResult.ConsumeResponse = ConsumeResponse.Requeue;
}
}
}

79
Tapeti/HandlingResult.cs Normal file
View File

@ -0,0 +1,79 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti
{
public class HandlingResult
{
public HandlingResult()
{
ConsumeResponse = ConsumeResponse.Nack;
MessageAction = MessageAction.None;
}
/// <summary>
/// Determines which response will be given to the message bus from where the message originates.
/// </summary>
public ConsumeResponse ConsumeResponse { get; internal set; }
/// <summary>
/// Registers which action the Exception strategy has taken or will take to handle the error condition
/// on the message. This is important to know for cleanup handlers registered by middleware.
/// </summary>
public MessageAction MessageAction { get; internal set; }
}
public class HandlingResultBuilder
{
private static readonly HandlingResult Default = new HandlingResult();
private HandlingResult data = Default;
public ConsumeResponse ConsumeResponse {
get
{
return data.ConsumeResponse;
}
set
{
GetWritableData().ConsumeResponse = value;
}
}
public MessageAction MessageAction
{
get
{
return data.MessageAction;
}
set
{
GetWritableData().MessageAction = value;
}
}
public HandlingResult ToHandlingResult()
{
if (data == Default)
{
return new HandlingResult();
}
var result = GetWritableData();
data = Default;
return result;
}
private HandlingResult GetWritableData()
{
if (data == Default)
{
data = new HandlingResult();
}
return data;
}
}
}

View File

@ -8,9 +8,9 @@ namespace Tapeti
/// <summary>
/// Called when an exception occurs while handling a message.
/// </summary>
/// <param name="context">The message context if available. May be null!</param>
/// <param name="exception">The exception instance</param>
/// <returns>The ConsumeResponse to determine whether to requeue, dead-letter (nack) or simply ack the message.</returns>
ConsumeResponse HandleException(IMessageContext context, Exception exception);
/// <param name="context">The exception strategy context containing the necessary data including the message context and the thrown exception.
/// Also the response to the message can be set.
/// If there is any other handling of the message than the expected default than HandlingResult.MessageFutureAction must be set accordingly. </param>
void HandleException(IExceptionStrategyContext context);
}
}

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti
{
public enum MessageAction
{
None = 1,
ErrorLog = 2,
Retry = 3,
}
}

View File

@ -53,6 +53,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Config\IExceptionStrategyContext.cs" />
<Compile Include="Config\ICleanupMiddleware.cs" />
<Compile Include="Config\IPublishContext.cs" />
<Compile Include="Config\IMessageFilterMiddleware.cs" />
@ -64,6 +65,9 @@
<Compile Include="Connection\TapetiWorker.cs" />
<Compile Include="Default\ConsoleLogger.cs" />
<Compile Include="Default\DevNullLogger.cs" />
<Compile Include="Default\ExceptionStrategyContext.cs" />
<Compile Include="Default\NackExceptionStrategy.cs" />
<Compile Include="HandlingResult.cs" />
<Compile Include="Default\JsonMessageSerializer.cs" />
<Compile Include="Default\MessageContext.cs" />
<Compile Include="Default\PublishResultBinding.cs" />
@ -84,6 +88,7 @@
<Compile Include="Config\IConfig.cs" />
<Compile Include="MessageController.cs" />
<Compile Include="Config\IBindingMiddleware.cs" />
<Compile Include="MessageFutureAction.cs" />
<Compile Include="TapetiAppSettingsConnectionParams.cs" />
<Compile Include="TapetiConnectionParams.cs" />
<Compile Include="TapetiConfig.cs" />

View File

@ -143,7 +143,7 @@ namespace Tapeti
container.RegisterDefault<IMessageSerializer, JsonMessageSerializer>();
container.RegisterDefault<IExchangeStrategy, NamespaceMatchExchangeStrategy>();
container.RegisterDefault<IRoutingKeyStrategy, TypeNameRoutingKeyStrategy>();
container.RegisterDefault<IExceptionStrategy, RequeueExceptionStrategy>();
container.RegisterDefault<IExceptionStrategy, NackExceptionStrategy>();
}

View File

@ -57,7 +57,6 @@ namespace Test
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>
(new PoloConfirmationRequestMessage(),
HandlePoloConfirmationResponseEnd);
}