diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs
index 7dd969a..1cc8e92 100644
--- a/Tapeti/Connection/TapetiBasicConsumer.cs
+++ b/Tapeti/Connection/TapetiBasicConsumer.cs
@@ -9,7 +9,7 @@ namespace Tapeti.Connection
///
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
///
- internal class TapetiBasicConsumer : DefaultBasicConsumer
+ internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer
{
private readonly IConsumer consumer;
private readonly Func onRespond;
@@ -24,7 +24,7 @@ namespace Tapeti.Connection
///
- public override void HandleBasicDeliver(string consumerTag,
+ public override async Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
@@ -32,18 +32,15 @@ namespace Tapeti.Connection
IBasicProperties properties,
ReadOnlyMemory body)
{
- Task.Run(async () =>
+ try
{
- try
- {
- var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), body.ToArray());
- await onRespond(deliveryTag, response);
- }
- catch
- {
- await onRespond(deliveryTag, ConsumeResult.Error);
- }
- });
+ var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), body);
+ await onRespond(deliveryTag, response);
+ }
+ catch
+ {
+ await onRespond(deliveryTag, ConsumeResult.Error);
+ }
}
}
}
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index c41e7b4..d30f512 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -656,7 +656,8 @@ namespace Tapeti.Connection
Password = connectionParams.Password,
AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false,
- RequestedHeartbeat = TimeSpan.FromSeconds(30)
+ RequestedHeartbeat = TimeSpan.FromSeconds(30),
+ DispatchConsumersAsync = true
};
if (connectionParams.ClientProperties != null)
diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs
index 19fb870..ddbb4d7 100644
--- a/Tapeti/Connection/TapetiConsumer.cs
+++ b/Tapeti/Connection/TapetiConsumer.cs
@@ -39,12 +39,12 @@ namespace Tapeti.Connection
///
- public async Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body)
+ public async Task Consume(string exchange, string routingKey, IMessageProperties properties, ReadOnlyMemory body)
{
object message = null;
try
{
- message = messageSerializer.Deserialize(body, properties);
+ message = messageSerializer.Deserialize(body.ToArray(), properties);
if (message == null)
throw new ArgumentException("Message body could not be deserialized into a message object", nameof(body));
diff --git a/Tapeti/IConsumer.cs b/Tapeti/IConsumer.cs
index 7204bc5..cd20f30 100644
--- a/Tapeti/IConsumer.cs
+++ b/Tapeti/IConsumer.cs
@@ -1,4 +1,5 @@
-using System.Threading.Tasks;
+using System;
+using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti
@@ -16,6 +17,6 @@ namespace Tapeti
/// Metadata included in the message
/// The raw body of the message
///
- Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body);
+ Task Consume(string exchange, string routingKey, IMessageProperties properties, ReadOnlyMemory body);
}
}