1
0
mirror of synced 2024-11-23 01:23:49 +00:00

Fixed #41 - RabbitMQ.Client 6.5 compatibility

Consumers are now properly async as well
This commit is contained in:
Mark van Renswoude 2023-03-31 07:42:59 +02:00
parent 79a5c8c871
commit 0919bfee50
6 changed files with 89 additions and 170 deletions

View File

@ -17,7 +17,7 @@ namespace Tapeti.Connection
/// <summary>
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
/// </summary>
internal class TapetiBasicConsumer : DefaultBasicConsumer
internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer
{
private readonly IConsumer consumer;
private readonly long connectionReference;
@ -34,7 +34,7 @@ namespace Tapeti.Connection
/// <inheritdoc />
public override void HandleBasicDeliver(string consumerTag,
public override async Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
@ -50,18 +50,15 @@ namespace Tapeti.Connection
// See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
var bodyArray = body.ToArray();
Task.Run(async () =>
try
{
try
{
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
await onRespond(connectionReference, deliveryTag, response);
}
catch
{
await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
}
});
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
await onRespond(connectionReference, deliveryTag, response);
}
catch
{
await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
}
}
}
}

View File

@ -21,7 +21,7 @@ namespace Tapeti.Connection
{
private readonly Func<IModel> modelFactory;
private readonly object taskQueueLock = new();
private SingleThreadTaskQueue? taskQueue;
private SerialTaskQueue? taskQueue;
private readonly ModelProvider modelProvider;
@ -34,7 +34,7 @@ namespace Tapeti.Connection
public async Task Reset()
{
SingleThreadTaskQueue? capturedTaskQueue;
SerialTaskQueue? capturedTaskQueue;
lock (taskQueueLock)
{
@ -80,11 +80,11 @@ namespace Tapeti.Connection
private SingleThreadTaskQueue GetTaskQueue()
private SerialTaskQueue GetTaskQueue()
{
lock (taskQueueLock)
{
return taskQueue ??= new SingleThreadTaskQueue();
return taskQueue ??= new SerialTaskQueue();
}
}

View File

@ -777,7 +777,8 @@ namespace Tapeti.Connection
Password = connectionParams.Password,
AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false,
RequestedHeartbeat = TimeSpan.FromSeconds(30)
RequestedHeartbeat = TimeSpan.FromSeconds(30),
DispatchConsumersAsync = true
};
if (connectionParams.ClientProperties != null)
@ -790,6 +791,9 @@ namespace Tapeti.Connection
}
// TODO lock both channels when attempting the connection
// TODO when one channel is lost, do not reconnect, instead restore the channel
while (true)
{
try
@ -808,25 +812,23 @@ namespace Tapeti.Connection
{
try
{
if (connection is { IsOpen: true })
connection.Close();
if (capturedConnection is { IsOpen: true })
capturedConnection.Close();
}
catch (AlreadyClosedException)
{
}
finally
{
connection?.Dispose();
capturedConnection?.Dispose();
}
connection = null;
}
logger.Connect(new ConnectContext(connectionParams, isReconnect));
Interlocked.Increment(ref connectionReference);
lock (connectionLock)
{
{
connection = connectionFactory.CreateConnection();
capturedConnection = connection;

View File

@ -25,7 +25,7 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.*" />
<PackageReference Include="RabbitMQ.Client" Version="[6.4]" />
<PackageReference Include="RabbitMQ.Client" Version="6.5" />
<PackageReference Include="System.Configuration.ConfigurationManager" Version="7.0.0" />
</ItemGroup>

View File

@ -0,0 +1,65 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Tapeti.Tasks
{
/// <summary>
/// An implementation of a queue which runs tasks serially.
/// </summary>
public class SerialTaskQueue : IDisposable
{
private readonly object previousTaskLock = new();
private Task previousTask = Task.CompletedTask;
/// <summary>
/// Add the specified synchronous action to the task queue.
/// </summary>
/// <param name="action"></param>
public Task Add(Action action)
{
lock (previousTaskLock)
{
previousTask = previousTask.ContinueWith(
_ => action(),
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.Default);
return previousTask;
}
}
/// <summary>
/// Add the specified asynchronous method to the task queue.
/// </summary>
/// <param name="func"></param>
public Task Add(Func<Task> func)
{
lock (previousTaskLock)
{
var task = previousTask.ContinueWith(
_ => func(),
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.Default);
previousTask = task;
// 'task' completes at the moment a Task is returned (for example, an await is encountered),
// this is used to chain the next. We return the unwrapped Task however, so that the caller
// awaits until the full task chain has completed.
return task.Unwrap();
}
}
/// <inheritdoc />
public void Dispose()
{
GC.SuppressFinalize(this);
}
}
}

View File

@ -1,145 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Tapeti.Tasks
{
/// <summary>
/// An implementation of a queue which runs tasks on a single thread.
/// </summary>
public class SingleThreadTaskQueue : IDisposable
{
private readonly object previousTaskLock = new();
private Task previousTask = Task.CompletedTask;
private readonly Lazy<SingleThreadTaskScheduler> singleThreadScheduler = new();
/// <summary>
/// Add the specified synchronous action to the task queue.
/// </summary>
/// <param name="action"></param>
public Task Add(Action action)
{
lock (previousTaskLock)
{
previousTask = previousTask.ContinueWith(_ => action(), CancellationToken.None
, TaskContinuationOptions.None
, singleThreadScheduler.Value);
return previousTask;
}
}
/// <summary>
/// Add the specified asynchronous method to the task queue.
/// </summary>
/// <param name="func"></param>
public Task Add(Func<Task> func)
{
lock (previousTaskLock)
{
var task = previousTask.ContinueWith(_ => func(), CancellationToken.None
, TaskContinuationOptions.None
, singleThreadScheduler.Value);
previousTask = task;
// 'task' completes at the moment a Task is returned (for example, an await is encountered),
// this is used to chain the next. We return the unwrapped Task however, so that the caller
// awaits until the full task chain has completed.
return task.Unwrap();
}
}
/// <inheritdoc />
public void Dispose()
{
if (singleThreadScheduler.IsValueCreated)
singleThreadScheduler.Value.Dispose();
}
internal class SingleThreadTaskScheduler : TaskScheduler, IDisposable
{
public override int MaximumConcurrencyLevel => 1;
private readonly Queue<Task> scheduledTasks = new();
private bool disposed;
public SingleThreadTaskScheduler()
{
// ReSharper disable once ObjectCreationAsStatement - fire and forget!
new Thread(WorkerThread).Start();
}
public void Dispose()
{
lock (scheduledTasks)
{
disposed = true;
Monitor.PulseAll(scheduledTasks);
}
}
protected override void QueueTask(Task task)
{
if (disposed) return;
lock (scheduledTasks)
{
scheduledTasks.Enqueue(task);
Monitor.Pulse(scheduledTasks);
}
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
protected override IEnumerable<Task> GetScheduledTasks()
{
lock (scheduledTasks)
{
return scheduledTasks.ToList();
}
}
private void WorkerThread()
{
while (true)
{
Task? task;
lock (scheduledTasks)
{
task = WaitAndDequeueTask();
}
if (task == null)
break;
TryExecuteTask(task);
}
}
private Task? WaitAndDequeueTask()
{
while (!scheduledTasks.Any() && !disposed)
Monitor.Wait(scheduledTasks);
return disposed ? null : scheduledTasks.Dequeue();
}
}
}
}