diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index d47a498..a6e5209 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -17,6 +17,8 @@ namespace Tapeti.Connection private readonly IDependencyResolver dependencyResolver; private readonly IReadOnlyList messageMiddleware; private readonly List bindings; + + private readonly ILogger logger; private readonly IExceptionStrategy exceptionStrategy; @@ -28,6 +30,7 @@ namespace Tapeti.Connection this.messageMiddleware = messageMiddleware; this.bindings = bindings.ToList(); + logger = dependencyResolver.Resolve(); exceptionStrategy = dependencyResolver.Resolve(); } @@ -35,59 +38,65 @@ namespace Tapeti.Connection public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) { - ExceptionDispatchInfo exception = null; - try + Task.Run(async () => { - var message = dependencyResolver.Resolve().Deserialize(body, properties); - if (message == null) - throw new ArgumentException("Empty message"); - - var validMessageType = false; - - using (var context = new MessageContext + ExceptionDispatchInfo exception = null; + try { - DependencyResolver = dependencyResolver, - Queue = queueName, - RoutingKey = routingKey, - Message = message, - Properties = properties - }) - { - try + var message = dependencyResolver.Resolve().Deserialize(body, properties); + if (message == null) + throw new ArgumentException("Empty message"); + + var validMessageType = false; + + using (var context = new MessageContext { - foreach (var binding in bindings) + DependencyResolver = dependencyResolver, + Queue = queueName, + RoutingKey = routingKey, + Message = message, + Properties = properties + }) + { + try { - if (binding.Accept(context, message)) + foreach (var binding in bindings) { - InvokeUsingBinding(context, binding, message); + if (binding.Accept(context, message)) + { + await InvokeUsingBinding(context, binding, message); - validMessageType = true; + validMessageType = true; + } } + + if (!validMessageType) + throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); + + await worker.Respond(deliveryTag, ConsumeResponse.Ack); + } + catch (Exception e) + { + exception = ExceptionDispatchInfo.Capture(UnwrapException(e)); + await worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, exception.SourceException)); } - - if (!validMessageType) - throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); - - worker.Respond(deliveryTag, ConsumeResponse.Ack); - } - catch (Exception e) - { - exception = ExceptionDispatchInfo.Capture(UnwrapException(e)); - worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, exception.SourceException)); } } - } - catch (Exception e) - { - exception = ExceptionDispatchInfo.Capture(UnwrapException(e)); - worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, exception.SourceException)); - } + catch (Exception e) + { + exception = ExceptionDispatchInfo.Capture(UnwrapException(e)); + await worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, exception.SourceException)); + } - exception?.Throw(); + if (exception != null) + { + logger.HandlerException(exception.SourceException); + } + }); } - private void InvokeUsingBinding(MessageContext context, IBinding binding, object message) + private Task InvokeUsingBinding(MessageContext context, IBinding binding, object message) { context.Binding = binding; @@ -136,9 +145,7 @@ namespace Tapeti.Connection await binding.Invoke(c, message); }); - firstCaller.Call(context) - .Wait(); - + return firstCaller.Call(context); } private static Exception UnwrapException(Exception exception) diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index 2cb7caf..98484fe 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -1,4 +1,6 @@ -namespace Tapeti.Default +using System; + +namespace Tapeti.Default { public class ConsoleLogger : ILogger { @@ -16,5 +18,10 @@ { throw new System.NotImplementedException(); } + + public void HandlerException(Exception e) + { + Console.WriteLine(e.ToString()); + } } } diff --git a/Tapeti/Default/DevNullLogger.cs b/Tapeti/Default/DevNullLogger.cs index 24919fc..af4ce57 100644 --- a/Tapeti/Default/DevNullLogger.cs +++ b/Tapeti/Default/DevNullLogger.cs @@ -1,4 +1,6 @@ -namespace Tapeti.Default +using System; + +namespace Tapeti.Default { public class DevNullLogger : ILogger { @@ -13,5 +15,9 @@ public void ConnectSuccess(TapetiConnectionParams connectionParams) { } + + public void HandlerException(Exception e) + { + } } } diff --git a/Tapeti/ILogger.cs b/Tapeti/ILogger.cs index 014f217..1dc244d 100644 --- a/Tapeti/ILogger.cs +++ b/Tapeti/ILogger.cs @@ -1,4 +1,6 @@ -namespace Tapeti +using System; + +namespace Tapeti { // This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog) // instead of only string-based logging without control over the output. @@ -7,5 +9,6 @@ void Connect(TapetiConnectionParams connectionParams); void ConnectFailed(TapetiConnectionParams connectionParams); void ConnectSuccess(TapetiConnectionParams connectionParams); + void HandlerException(Exception e); } } diff --git a/Test/Program.cs b/Test/Program.cs index 6eecc55..0e97ee9 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -20,6 +20,7 @@ namespace Test var container = new Container(); container.Register(); container.Register(); + container.Register(); //container.Register(() => new EF(serviceID));