From 70e04992dc7e21ddafce2fd30f1384d26f5c3a94 Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Fri, 13 Oct 2017 10:49:59 +0200 Subject: [PATCH] RDB-136 Flow tabel wordt niet meer opgeruimd Bij een exceptie in Flow Consummer HandleBasicDeliver aangepast zodat het makkelijker is om een dispatch ronde toe tevoegen voor cleanup handlers --- Tapeti/Connection/TapetiConsumer.cs | 109 ++++++++++++++++++---------- 1 file changed, 71 insertions(+), 38 deletions(-) diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index a6e5209..4bfb1a7 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -41,60 +41,93 @@ namespace Tapeti.Connection Task.Run(async () => { ExceptionDispatchInfo exception = null; + MessageContext context = null; + ConsumeResponse response = ConsumeResponse.Nack; try { - var message = dependencyResolver.Resolve().Deserialize(body, properties); - if (message == null) - throw new ArgumentException("Empty message"); - - var validMessageType = false; - - using (var context = new MessageContext + try { - DependencyResolver = dependencyResolver, - Queue = queueName, - RoutingKey = routingKey, - Message = message, - Properties = properties - }) + context = new MessageContext + { + DependencyResolver = dependencyResolver, + Queue = queueName, + RoutingKey = routingKey, + Properties = properties + }; + + await DispatchMesage(context, body); + + response = ConsumeResponse.Ack; + } + catch (Exception eDispatch) { + exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch)); + logger.HandlerException(eDispatch); try { - foreach (var binding in bindings) - { - if (binding.Accept(context, message)) - { - await InvokeUsingBinding(context, binding, message); - - validMessageType = true; - } - } - - if (!validMessageType) - throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); - - await worker.Respond(deliveryTag, ConsumeResponse.Ack); + response = exceptionStrategy.HandleException(null, exception.SourceException); } - catch (Exception e) + catch (Exception eStrategy) { - exception = ExceptionDispatchInfo.Capture(UnwrapException(e)); - await worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, exception.SourceException)); + logger.HandlerException(eStrategy); } } + try + { + // Cleanup handlers + //response = exceptionStrategy.HandleException(null, exception.SourceException); + } + catch (Exception eCleanup) + { + logger.HandlerException(eCleanup); + } } - catch (Exception e) + finally { - exception = ExceptionDispatchInfo.Capture(UnwrapException(e)); - await worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, exception.SourceException)); - } - - if (exception != null) - { - logger.HandlerException(exception.SourceException); + try + { + await worker.Respond(deliveryTag, response); + } + catch (Exception eRespond) + { + logger.HandlerException(eRespond); + } + try + { + if (context != null) + { + context.Dispose(); + } + } + catch (Exception eDispose) + { + logger.HandlerException(eDispose); + } } }); } + private async Task DispatchMesage(MessageContext context, byte[] body) + { + var message = dependencyResolver.Resolve().Deserialize(body, context.Properties); + if (message == null) + throw new ArgumentException("Empty message"); + + var validMessageType = false; + + foreach (var binding in bindings) + { + if (binding.Accept(context, message)) + { + await InvokeUsingBinding(context, binding, message); + + validMessageType = true; + } + } + + if (!validMessageType) + throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); + } private Task InvokeUsingBinding(MessageContext context, IBinding binding, object message) {