1
0
mirror of synced 2024-11-21 17:03:50 +00:00

Tapeti Consumer HandleBasicDeliver schedules a Task asynchronously to handle the messages

This commit is contained in:
Menno van Lavieren 2017-08-30 17:47:43 +02:00
parent d71927915c
commit 9f30c1ec74
5 changed files with 69 additions and 45 deletions

View File

@ -17,6 +17,8 @@ namespace Tapeti.Connection
private readonly IDependencyResolver dependencyResolver; private readonly IDependencyResolver dependencyResolver;
private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware; private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware;
private readonly List<IBinding> bindings; private readonly List<IBinding> bindings;
private readonly ILogger logger;
private readonly IExceptionStrategy exceptionStrategy; private readonly IExceptionStrategy exceptionStrategy;
@ -28,6 +30,7 @@ namespace Tapeti.Connection
this.messageMiddleware = messageMiddleware; this.messageMiddleware = messageMiddleware;
this.bindings = bindings.ToList(); this.bindings = bindings.ToList();
logger = dependencyResolver.Resolve<ILogger>();
exceptionStrategy = dependencyResolver.Resolve<IExceptionStrategy>(); exceptionStrategy = dependencyResolver.Resolve<IExceptionStrategy>();
} }
@ -35,59 +38,65 @@ namespace Tapeti.Connection
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IBasicProperties properties, byte[] body) IBasicProperties properties, byte[] body)
{ {
ExceptionDispatchInfo exception = null; Task.Run(async () =>
try
{ {
var message = dependencyResolver.Resolve<IMessageSerializer>().Deserialize(body, properties); ExceptionDispatchInfo exception = null;
if (message == null) try
throw new ArgumentException("Empty message");
var validMessageType = false;
using (var context = new MessageContext
{ {
DependencyResolver = dependencyResolver, var message = dependencyResolver.Resolve<IMessageSerializer>().Deserialize(body, properties);
Queue = queueName, if (message == null)
RoutingKey = routingKey, throw new ArgumentException("Empty message");
Message = message,
Properties = properties var validMessageType = false;
})
{ using (var context = new MessageContext
try
{ {
foreach (var binding in bindings) DependencyResolver = dependencyResolver,
Queue = queueName,
RoutingKey = routingKey,
Message = message,
Properties = properties
})
{
try
{ {
if (binding.Accept(context, message)) foreach (var binding in bindings)
{ {
InvokeUsingBinding(context, binding, message); if (binding.Accept(context, message))
{
await InvokeUsingBinding(context, binding, message);
validMessageType = true; validMessageType = true;
}
} }
if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
await worker.Respond(deliveryTag, ConsumeResponse.Ack);
}
catch (Exception e)
{
exception = ExceptionDispatchInfo.Capture(UnwrapException(e));
await worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, exception.SourceException));
} }
if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
worker.Respond(deliveryTag, ConsumeResponse.Ack);
}
catch (Exception e)
{
exception = ExceptionDispatchInfo.Capture(UnwrapException(e));
worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, exception.SourceException));
} }
} }
} catch (Exception e)
catch (Exception e) {
{ exception = ExceptionDispatchInfo.Capture(UnwrapException(e));
exception = ExceptionDispatchInfo.Capture(UnwrapException(e)); await worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, exception.SourceException));
worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, exception.SourceException)); }
}
exception?.Throw(); if (exception != null)
{
logger.HandlerException(exception.SourceException);
}
});
} }
private void InvokeUsingBinding(MessageContext context, IBinding binding, object message) private Task InvokeUsingBinding(MessageContext context, IBinding binding, object message)
{ {
context.Binding = binding; context.Binding = binding;
@ -136,9 +145,7 @@ namespace Tapeti.Connection
await binding.Invoke(c, message); await binding.Invoke(c, message);
}); });
firstCaller.Call(context) return firstCaller.Call(context);
.Wait();
} }
private static Exception UnwrapException(Exception exception) private static Exception UnwrapException(Exception exception)

View File

@ -1,4 +1,6 @@
namespace Tapeti.Default using System;
namespace Tapeti.Default
{ {
public class ConsoleLogger : ILogger public class ConsoleLogger : ILogger
{ {
@ -16,5 +18,10 @@
{ {
throw new System.NotImplementedException(); throw new System.NotImplementedException();
} }
public void HandlerException(Exception e)
{
Console.WriteLine(e.ToString());
}
} }
} }

View File

@ -1,4 +1,6 @@
namespace Tapeti.Default using System;
namespace Tapeti.Default
{ {
public class DevNullLogger : ILogger public class DevNullLogger : ILogger
{ {
@ -13,5 +15,9 @@
public void ConnectSuccess(TapetiConnectionParams connectionParams) public void ConnectSuccess(TapetiConnectionParams connectionParams)
{ {
} }
public void HandlerException(Exception e)
{
}
} }
} }

View File

@ -1,4 +1,6 @@
namespace Tapeti using System;
namespace Tapeti
{ {
// This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog) // This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog)
// instead of only string-based logging without control over the output. // instead of only string-based logging without control over the output.
@ -7,5 +9,6 @@
void Connect(TapetiConnectionParams connectionParams); void Connect(TapetiConnectionParams connectionParams);
void ConnectFailed(TapetiConnectionParams connectionParams); void ConnectFailed(TapetiConnectionParams connectionParams);
void ConnectSuccess(TapetiConnectionParams connectionParams); void ConnectSuccess(TapetiConnectionParams connectionParams);
void HandlerException(Exception e);
} }
} }

View File

@ -20,6 +20,7 @@ namespace Test
var container = new Container(); var container = new Container();
container.Register<MarcoEmitter>(); container.Register<MarcoEmitter>();
container.Register<Visualizer>(); container.Register<Visualizer>();
container.Register<ILogger, Tapeti.Default.ConsoleLogger>();
//container.Register<IFlowRepository>(() => new EF(serviceID)); //container.Register<IFlowRepository>(() => new EF(serviceID));