1
0
mirror of synced 2024-06-26 14:27:38 +00:00

RDB-136 Flow tabel wordt niet meer opgeruimd Bij een exceptie in Flow

Consummer HandleBasicDeliver aangepast zodat het makkelijker is om een dispatch ronde toe tevoegen voor cleanup handlers
This commit is contained in:
Menno van Lavieren 2017-10-13 10:49:59 +02:00
parent 1e4cac2afa
commit 70e04992dc

View File

@ -41,25 +41,80 @@ namespace Tapeti.Connection
Task.Run(async () =>
{
ExceptionDispatchInfo exception = null;
MessageContext context = null;
ConsumeResponse response = ConsumeResponse.Nack;
try
{
var message = dependencyResolver.Resolve<IMessageSerializer>().Deserialize(body, properties);
try
{
context = new MessageContext
{
DependencyResolver = dependencyResolver,
Queue = queueName,
RoutingKey = routingKey,
Properties = properties
};
await DispatchMesage(context, body);
response = ConsumeResponse.Ack;
}
catch (Exception eDispatch)
{
exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch));
logger.HandlerException(eDispatch);
try
{
response = exceptionStrategy.HandleException(null, exception.SourceException);
}
catch (Exception eStrategy)
{
logger.HandlerException(eStrategy);
}
}
try
{
// Cleanup handlers
//response = exceptionStrategy.HandleException(null, exception.SourceException);
}
catch (Exception eCleanup)
{
logger.HandlerException(eCleanup);
}
}
finally
{
try
{
await worker.Respond(deliveryTag, response);
}
catch (Exception eRespond)
{
logger.HandlerException(eRespond);
}
try
{
if (context != null)
{
context.Dispose();
}
}
catch (Exception eDispose)
{
logger.HandlerException(eDispose);
}
}
});
}
private async Task DispatchMesage(MessageContext context, byte[] body)
{
var message = dependencyResolver.Resolve<IMessageSerializer>().Deserialize(body, context.Properties);
if (message == null)
throw new ArgumentException("Empty message");
var validMessageType = false;
using (var context = new MessageContext
{
DependencyResolver = dependencyResolver,
Queue = queueName,
RoutingKey = routingKey,
Message = message,
Properties = properties
})
{
try
{
foreach (var binding in bindings)
{
if (binding.Accept(context, message))
@ -72,29 +127,7 @@ namespace Tapeti.Connection
if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
await worker.Respond(deliveryTag, ConsumeResponse.Ack);
}
catch (Exception e)
{
exception = ExceptionDispatchInfo.Capture(UnwrapException(e));
await worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, exception.SourceException));
}
}
}
catch (Exception e)
{
exception = ExceptionDispatchInfo.Capture(UnwrapException(e));
await worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, exception.SourceException));
}
if (exception != null)
{
logger.HandlerException(exception.SourceException);
}
});
}
private Task InvokeUsingBinding(MessageContext context, IBinding binding, object message)
{