diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs
index 5b04ba5..24badec 100644
--- a/Tapeti/Connection/TapetiBasicConsumer.cs
+++ b/Tapeti/Connection/TapetiBasicConsumer.cs
@@ -17,7 +17,7 @@ namespace Tapeti.Connection
///
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
///
- internal class TapetiBasicConsumer : DefaultBasicConsumer
+ internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer
{
private readonly IConsumer consumer;
private readonly long connectionReference;
@@ -34,7 +34,7 @@ namespace Tapeti.Connection
///
- public override void HandleBasicDeliver(string consumerTag,
+ public override async Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
@@ -50,18 +50,15 @@ namespace Tapeti.Connection
// See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
var bodyArray = body.ToArray();
- Task.Run(async () =>
+ try
{
- try
- {
- var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
- await onRespond(connectionReference, deliveryTag, response);
- }
- catch
- {
- await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
- }
- });
+ var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
+ await onRespond(connectionReference, deliveryTag, response);
+ }
+ catch
+ {
+ await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
+ }
}
}
}
diff --git a/Tapeti/Connection/TapetiChannel.cs b/Tapeti/Connection/TapetiChannel.cs
index ed39446..da75767 100644
--- a/Tapeti/Connection/TapetiChannel.cs
+++ b/Tapeti/Connection/TapetiChannel.cs
@@ -21,7 +21,7 @@ namespace Tapeti.Connection
{
private readonly Func modelFactory;
private readonly object taskQueueLock = new();
- private SingleThreadTaskQueue? taskQueue;
+ private SerialTaskQueue? taskQueue;
private readonly ModelProvider modelProvider;
@@ -34,7 +34,7 @@ namespace Tapeti.Connection
public async Task Reset()
{
- SingleThreadTaskQueue? capturedTaskQueue;
+ SerialTaskQueue? capturedTaskQueue;
lock (taskQueueLock)
{
@@ -80,11 +80,11 @@ namespace Tapeti.Connection
- private SingleThreadTaskQueue GetTaskQueue()
+ private SerialTaskQueue GetTaskQueue()
{
lock (taskQueueLock)
{
- return taskQueue ??= new SingleThreadTaskQueue();
+ return taskQueue ??= new SerialTaskQueue();
}
}
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index 07e2718..b23038e 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -777,7 +777,8 @@ namespace Tapeti.Connection
Password = connectionParams.Password,
AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false,
- RequestedHeartbeat = TimeSpan.FromSeconds(30)
+ RequestedHeartbeat = TimeSpan.FromSeconds(30),
+ DispatchConsumersAsync = true
};
if (connectionParams.ClientProperties != null)
@@ -790,6 +791,9 @@ namespace Tapeti.Connection
}
+ // TODO lock both channels when attempting the connection
+ // TODO when one channel is lost, do not reconnect, instead restore the channel
+
while (true)
{
try
@@ -808,25 +812,23 @@ namespace Tapeti.Connection
{
try
{
- if (connection is { IsOpen: true })
- connection.Close();
+ if (capturedConnection is { IsOpen: true })
+ capturedConnection.Close();
}
catch (AlreadyClosedException)
{
}
finally
{
- connection?.Dispose();
+ capturedConnection.Dispose();
}
-
- connection = null;
}
logger.Connect(new ConnectContext(connectionParams, isReconnect));
Interlocked.Increment(ref connectionReference);
lock (connectionLock)
- {
+ {
connection = connectionFactory.CreateConnection();
capturedConnection = connection;
diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj
index 18a03ff..67eaba9 100644
--- a/Tapeti/Tapeti.csproj
+++ b/Tapeti/Tapeti.csproj
@@ -25,8 +25,7 @@
-
-
+
diff --git a/Tapeti/Tasks/SerialTaskQueue.cs b/Tapeti/Tasks/SerialTaskQueue.cs
new file mode 100644
index 0000000..b4d3081
--- /dev/null
+++ b/Tapeti/Tasks/SerialTaskQueue.cs
@@ -0,0 +1,65 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Tapeti.Tasks
+{
+ ///
+ /// An implementation of a queue which runs tasks serially.
+ ///
+ public class SerialTaskQueue : IDisposable
+ {
+ private readonly object previousTaskLock = new();
+ private Task previousTask = Task.CompletedTask;
+
+
+ ///
+ /// Add the specified synchronous action to the task queue.
+ ///
+ ///
+ public Task Add(Action action)
+ {
+ lock (previousTaskLock)
+ {
+ previousTask = previousTask.ContinueWith(
+ _ => action(),
+ CancellationToken.None,
+ TaskContinuationOptions.None,
+ TaskScheduler.Default);
+
+ return previousTask;
+ }
+ }
+
+
+ ///
+ /// Add the specified asynchronous method to the task queue.
+ ///
+ ///
+ public Task Add(Func func)
+ {
+ lock (previousTaskLock)
+ {
+ var task = previousTask.ContinueWith(
+ _ => func(),
+ CancellationToken.None,
+ TaskContinuationOptions.None,
+ TaskScheduler.Default);
+
+ previousTask = task;
+
+ // 'task' completes at the moment a Task is returned (for example, an await is encountered),
+ // this is used to chain the next. We return the unwrapped Task however, so that the caller
+ // awaits until the full task chain has completed.
+ return task.Unwrap();
+ }
+ }
+
+
+ ///
+ public void Dispose()
+ {
+ GC.SuppressFinalize(this);
+ }
+ }
+}
diff --git a/Tapeti/Tasks/SingleThreadTaskQueue.cs b/Tapeti/Tasks/SingleThreadTaskQueue.cs
deleted file mode 100644
index 30032bb..0000000
--- a/Tapeti/Tasks/SingleThreadTaskQueue.cs
+++ /dev/null
@@ -1,145 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Tapeti.Tasks
-{
- ///
- /// An implementation of a queue which runs tasks on a single thread.
- ///
- public class SingleThreadTaskQueue : IDisposable
- {
- private readonly object previousTaskLock = new();
- private Task previousTask = Task.CompletedTask;
-
- private readonly Lazy singleThreadScheduler = new();
-
-
- ///
- /// Add the specified synchronous action to the task queue.
- ///
- ///
- public Task Add(Action action)
- {
- lock (previousTaskLock)
- {
- previousTask = previousTask.ContinueWith(_ => action(), CancellationToken.None
- , TaskContinuationOptions.None
- , singleThreadScheduler.Value);
-
- return previousTask;
- }
- }
-
-
- ///
- /// Add the specified asynchronous method to the task queue.
- ///
- ///
- public Task Add(Func func)
- {
- lock (previousTaskLock)
- {
- var task = previousTask.ContinueWith(_ => func(), CancellationToken.None
- , TaskContinuationOptions.None
- , singleThreadScheduler.Value);
-
- previousTask = task;
-
- // 'task' completes at the moment a Task is returned (for example, an await is encountered),
- // this is used to chain the next. We return the unwrapped Task however, so that the caller
- // awaits until the full task chain has completed.
- return task.Unwrap();
- }
- }
-
-
- ///
- public void Dispose()
- {
- if (singleThreadScheduler.IsValueCreated)
- singleThreadScheduler.Value.Dispose();
- }
-
-
- internal class SingleThreadTaskScheduler : TaskScheduler, IDisposable
- {
- public override int MaximumConcurrencyLevel => 1;
-
-
- private readonly Queue scheduledTasks = new();
- private bool disposed;
-
-
- public SingleThreadTaskScheduler()
- {
- // ReSharper disable once ObjectCreationAsStatement - fire and forget!
- new Thread(WorkerThread).Start();
- }
-
-
- public void Dispose()
- {
- lock (scheduledTasks)
- {
- disposed = true;
- Monitor.PulseAll(scheduledTasks);
- }
- }
-
-
- protected override void QueueTask(Task task)
- {
- if (disposed) return;
-
- lock (scheduledTasks)
- {
- scheduledTasks.Enqueue(task);
- Monitor.Pulse(scheduledTasks);
- }
- }
-
- protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
- {
- return false;
- }
-
-
- protected override IEnumerable GetScheduledTasks()
- {
- lock (scheduledTasks)
- {
- return scheduledTasks.ToList();
- }
- }
-
-
- private void WorkerThread()
- {
- while (true)
- {
- Task? task;
- lock (scheduledTasks)
- {
- task = WaitAndDequeueTask();
- }
-
- if (task == null)
- break;
-
- TryExecuteTask(task);
- }
- }
-
- private Task? WaitAndDequeueTask()
- {
- while (!scheduledTasks.Any() && !disposed)
- Monitor.Wait(scheduledTasks);
-
- return disposed ? null : scheduledTasks.Dequeue();
- }
- }
- }
-}