diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs index 5b04ba5..24badec 100644 --- a/Tapeti/Connection/TapetiBasicConsumer.cs +++ b/Tapeti/Connection/TapetiBasicConsumer.cs @@ -17,7 +17,7 @@ namespace Tapeti.Connection /// /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// - internal class TapetiBasicConsumer : DefaultBasicConsumer + internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer { private readonly IConsumer consumer; private readonly long connectionReference; @@ -34,7 +34,7 @@ namespace Tapeti.Connection /// - public override void HandleBasicDeliver(string consumerTag, + public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, @@ -50,18 +50,15 @@ namespace Tapeti.Connection // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 var bodyArray = body.ToArray(); - Task.Run(async () => + try { - try - { - var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); - await onRespond(connectionReference, deliveryTag, response); - } - catch - { - await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); - } - }); + var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); + await onRespond(connectionReference, deliveryTag, response); + } + catch + { + await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); + } } } } diff --git a/Tapeti/Connection/TapetiChannel.cs b/Tapeti/Connection/TapetiChannel.cs index ed39446..da75767 100644 --- a/Tapeti/Connection/TapetiChannel.cs +++ b/Tapeti/Connection/TapetiChannel.cs @@ -21,7 +21,7 @@ namespace Tapeti.Connection { private readonly Func modelFactory; private readonly object taskQueueLock = new(); - private SingleThreadTaskQueue? taskQueue; + private SerialTaskQueue? taskQueue; private readonly ModelProvider modelProvider; @@ -34,7 +34,7 @@ namespace Tapeti.Connection public async Task Reset() { - SingleThreadTaskQueue? capturedTaskQueue; + SerialTaskQueue? capturedTaskQueue; lock (taskQueueLock) { @@ -80,11 +80,11 @@ namespace Tapeti.Connection - private SingleThreadTaskQueue GetTaskQueue() + private SerialTaskQueue GetTaskQueue() { lock (taskQueueLock) { - return taskQueue ??= new SingleThreadTaskQueue(); + return taskQueue ??= new SerialTaskQueue(); } } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 07e2718..04cd078 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -777,7 +777,8 @@ namespace Tapeti.Connection Password = connectionParams.Password, AutomaticRecoveryEnabled = false, TopologyRecoveryEnabled = false, - RequestedHeartbeat = TimeSpan.FromSeconds(30) + RequestedHeartbeat = TimeSpan.FromSeconds(30), + DispatchConsumersAsync = true }; if (connectionParams.ClientProperties != null) @@ -790,6 +791,9 @@ namespace Tapeti.Connection } + // TODO lock both channels when attempting the connection + // TODO when one channel is lost, do not reconnect, instead restore the channel + while (true) { try @@ -808,25 +812,23 @@ namespace Tapeti.Connection { try { - if (connection is { IsOpen: true }) - connection.Close(); + if (capturedConnection is { IsOpen: true }) + capturedConnection.Close(); } catch (AlreadyClosedException) { } finally { - connection?.Dispose(); + capturedConnection?.Dispose(); } - - connection = null; } logger.Connect(new ConnectContext(connectionParams, isReconnect)); Interlocked.Increment(ref connectionReference); lock (connectionLock) - { + { connection = connectionFactory.CreateConnection(); capturedConnection = connection; diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 18a03ff..2ba6a69 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -25,7 +25,7 @@ - + diff --git a/Tapeti/Tasks/SerialTaskQueue.cs b/Tapeti/Tasks/SerialTaskQueue.cs new file mode 100644 index 0000000..b4d3081 --- /dev/null +++ b/Tapeti/Tasks/SerialTaskQueue.cs @@ -0,0 +1,65 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Tapeti.Tasks +{ + /// + /// An implementation of a queue which runs tasks serially. + /// + public class SerialTaskQueue : IDisposable + { + private readonly object previousTaskLock = new(); + private Task previousTask = Task.CompletedTask; + + + /// + /// Add the specified synchronous action to the task queue. + /// + /// + public Task Add(Action action) + { + lock (previousTaskLock) + { + previousTask = previousTask.ContinueWith( + _ => action(), + CancellationToken.None, + TaskContinuationOptions.None, + TaskScheduler.Default); + + return previousTask; + } + } + + + /// + /// Add the specified asynchronous method to the task queue. + /// + /// + public Task Add(Func func) + { + lock (previousTaskLock) + { + var task = previousTask.ContinueWith( + _ => func(), + CancellationToken.None, + TaskContinuationOptions.None, + TaskScheduler.Default); + + previousTask = task; + + // 'task' completes at the moment a Task is returned (for example, an await is encountered), + // this is used to chain the next. We return the unwrapped Task however, so that the caller + // awaits until the full task chain has completed. + return task.Unwrap(); + } + } + + + /// + public void Dispose() + { + GC.SuppressFinalize(this); + } + } +} diff --git a/Tapeti/Tasks/SingleThreadTaskQueue.cs b/Tapeti/Tasks/SingleThreadTaskQueue.cs deleted file mode 100644 index 30032bb..0000000 --- a/Tapeti/Tasks/SingleThreadTaskQueue.cs +++ /dev/null @@ -1,145 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -namespace Tapeti.Tasks -{ - /// - /// An implementation of a queue which runs tasks on a single thread. - /// - public class SingleThreadTaskQueue : IDisposable - { - private readonly object previousTaskLock = new(); - private Task previousTask = Task.CompletedTask; - - private readonly Lazy singleThreadScheduler = new(); - - - /// - /// Add the specified synchronous action to the task queue. - /// - /// - public Task Add(Action action) - { - lock (previousTaskLock) - { - previousTask = previousTask.ContinueWith(_ => action(), CancellationToken.None - , TaskContinuationOptions.None - , singleThreadScheduler.Value); - - return previousTask; - } - } - - - /// - /// Add the specified asynchronous method to the task queue. - /// - /// - public Task Add(Func func) - { - lock (previousTaskLock) - { - var task = previousTask.ContinueWith(_ => func(), CancellationToken.None - , TaskContinuationOptions.None - , singleThreadScheduler.Value); - - previousTask = task; - - // 'task' completes at the moment a Task is returned (for example, an await is encountered), - // this is used to chain the next. We return the unwrapped Task however, so that the caller - // awaits until the full task chain has completed. - return task.Unwrap(); - } - } - - - /// - public void Dispose() - { - if (singleThreadScheduler.IsValueCreated) - singleThreadScheduler.Value.Dispose(); - } - - - internal class SingleThreadTaskScheduler : TaskScheduler, IDisposable - { - public override int MaximumConcurrencyLevel => 1; - - - private readonly Queue scheduledTasks = new(); - private bool disposed; - - - public SingleThreadTaskScheduler() - { - // ReSharper disable once ObjectCreationAsStatement - fire and forget! - new Thread(WorkerThread).Start(); - } - - - public void Dispose() - { - lock (scheduledTasks) - { - disposed = true; - Monitor.PulseAll(scheduledTasks); - } - } - - - protected override void QueueTask(Task task) - { - if (disposed) return; - - lock (scheduledTasks) - { - scheduledTasks.Enqueue(task); - Monitor.Pulse(scheduledTasks); - } - } - - protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) - { - return false; - } - - - protected override IEnumerable GetScheduledTasks() - { - lock (scheduledTasks) - { - return scheduledTasks.ToList(); - } - } - - - private void WorkerThread() - { - while (true) - { - Task? task; - lock (scheduledTasks) - { - task = WaitAndDequeueTask(); - } - - if (task == null) - break; - - TryExecuteTask(task); - } - } - - private Task? WaitAndDequeueTask() - { - while (!scheduledTasks.Any() && !disposed) - Monitor.Wait(scheduledTasks); - - return disposed ? null : scheduledTasks.Dequeue(); - } - } - } -}