1
0
mirror of synced 2025-01-23 00:23:06 +01:00

RDB-136 Flow tabel wordt niet meer opgeruimd Bij een exceptie in Flow

Consummer HandleBasicDeliver aangepast zodat het makkelijker is om een dispatch ronde toe tevoegen voor cleanup handlers
This commit is contained in:
Menno van Lavieren 2017-10-13 10:49:59 +02:00
parent 7245424c15
commit 3c44074289

View File

@ -41,60 +41,93 @@ namespace Tapeti.Connection
Task.Run(async () =>
{
ExceptionDispatchInfo exception = null;
MessageContext context = null;
ConsumeResponse response = ConsumeResponse.Nack;
try
{
var message = dependencyResolver.Resolve<IMessageSerializer>().Deserialize(body, properties);
if (message == null)
throw new ArgumentException("Empty message");
var validMessageType = false;
using (var context = new MessageContext
try
{
DependencyResolver = dependencyResolver,
Queue = queueName,
RoutingKey = routingKey,
Message = message,
Properties = properties
})
context = new MessageContext
{
DependencyResolver = dependencyResolver,
Queue = queueName,
RoutingKey = routingKey,
Properties = properties
};
await DispatchMesage(context, body);
response = ConsumeResponse.Ack;
}
catch (Exception eDispatch)
{
exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch));
logger.HandlerException(eDispatch);
try
{
foreach (var binding in bindings)
{
if (binding.Accept(context, message))
{
await InvokeUsingBinding(context, binding, message);
validMessageType = true;
}
}
if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
await worker.Respond(deliveryTag, ConsumeResponse.Ack);
response = exceptionStrategy.HandleException(null, exception.SourceException);
}
catch (Exception e)
catch (Exception eStrategy)
{
exception = ExceptionDispatchInfo.Capture(UnwrapException(e));
await worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, exception.SourceException));
logger.HandlerException(eStrategy);
}
}
try
{
// Cleanup handlers
//response = exceptionStrategy.HandleException(null, exception.SourceException);
}
catch (Exception eCleanup)
{
logger.HandlerException(eCleanup);
}
}
catch (Exception e)
finally
{
exception = ExceptionDispatchInfo.Capture(UnwrapException(e));
await worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, exception.SourceException));
}
if (exception != null)
{
logger.HandlerException(exception.SourceException);
try
{
await worker.Respond(deliveryTag, response);
}
catch (Exception eRespond)
{
logger.HandlerException(eRespond);
}
try
{
if (context != null)
{
context.Dispose();
}
}
catch (Exception eDispose)
{
logger.HandlerException(eDispose);
}
}
});
}
private async Task DispatchMesage(MessageContext context, byte[] body)
{
var message = dependencyResolver.Resolve<IMessageSerializer>().Deserialize(body, context.Properties);
if (message == null)
throw new ArgumentException("Empty message");
var validMessageType = false;
foreach (var binding in bindings)
{
if (binding.Accept(context, message))
{
await InvokeUsingBinding(context, binding, message);
validMessageType = true;
}
}
if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
}
private Task InvokeUsingBinding(MessageContext context, IBinding binding, object message)
{