diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs
index 3cc02af..8a387bf 100644
--- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs
+++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs
@@ -1,8 +1,8 @@
using System;
using System.Collections.Generic;
using System.Data;
-using System.Data.SqlClient;
using System.Threading.Tasks;
+using Microsoft.Data.SqlClient;
using Newtonsoft.Json;
// Neither of these are available in language version 7 required for .NET Standard 2.0
diff --git a/Tapeti.Flow.SQL/SqlExceptionHelper.cs b/Tapeti.Flow.SQL/SqlExceptionHelper.cs
index dc00d31..693c9b2 100644
--- a/Tapeti.Flow.SQL/SqlExceptionHelper.cs
+++ b/Tapeti.Flow.SQL/SqlExceptionHelper.cs
@@ -1,7 +1,7 @@
using System;
using System.Collections.Generic;
-using System.Data.SqlClient;
using System.Linq;
+using Microsoft.Data.SqlClient;
// ReSharper disable UnusedMember.Global
diff --git a/Tapeti.Flow.SQL/SqlRetryHelper.cs b/Tapeti.Flow.SQL/SqlRetryHelper.cs
index 5db5b60..b38b498 100644
--- a/Tapeti.Flow.SQL/SqlRetryHelper.cs
+++ b/Tapeti.Flow.SQL/SqlRetryHelper.cs
@@ -1,6 +1,6 @@
using System;
-using System.Data.SqlClient;
using System.Threading.Tasks;
+using Microsoft.Data.SqlClient;
namespace Tapeti.Flow.SQL
{
diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
index 0a4ca77..9650958 100644
--- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
+++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
@@ -33,7 +33,7 @@
-
+
diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs
index 24badec..42e3181 100644
--- a/Tapeti/Connection/TapetiBasicConsumer.cs
+++ b/Tapeti/Connection/TapetiBasicConsumer.cs
@@ -17,7 +17,7 @@ namespace Tapeti.Connection
///
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
///
- internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer
+ internal class TapetiBasicConsumer : DefaultBasicConsumer
{
private readonly IConsumer consumer;
private readonly long connectionReference;
@@ -34,7 +34,7 @@ namespace Tapeti.Connection
///
- public override async Task HandleBasicDeliver(string consumerTag,
+ public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
@@ -49,16 +49,21 @@ namespace Tapeti.Connection
//
// See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
var bodyArray = body.ToArray();
-
- try
+
+ // Changing to AsyncDefaultBasicConsumer does not mean HandleBasicDeliver runs in parallel, the Task.Run would
+ // still be necessary, which is why TapetiBasicConsumer is a DefaultBasicConsumer.
+ Task.Run(async () =>
{
- var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
- await onRespond(connectionReference, deliveryTag, response);
- }
- catch
- {
- await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
- }
+ try
+ {
+ var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
+ await onRespond(connectionReference, deliveryTag, response);
+ }
+ catch
+ {
+ await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
+ }
+ });
}
}
}
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index b23038e..1c4b016 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -777,8 +777,7 @@ namespace Tapeti.Connection
Password = connectionParams.Password,
AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false,
- RequestedHeartbeat = TimeSpan.FromSeconds(30),
- DispatchConsumersAsync = true
+ RequestedHeartbeat = TimeSpan.FromSeconds(30)
};
if (connectionParams.ClientProperties != null)