Merge branch 'release/3.1'

This commit is contained in:
Mark van Renswoude 2023-03-31 08:01:40 +02:00
commit 435f793e40
6 changed files with 89 additions and 171 deletions

View File

@ -17,7 +17,7 @@ namespace Tapeti.Connection
/// <summary> /// <summary>
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
/// </summary> /// </summary>
internal class TapetiBasicConsumer : DefaultBasicConsumer internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer
{ {
private readonly IConsumer consumer; private readonly IConsumer consumer;
private readonly long connectionReference; private readonly long connectionReference;
@ -34,7 +34,7 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public override void HandleBasicDeliver(string consumerTag, public override async Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag, ulong deliveryTag,
bool redelivered, bool redelivered,
string exchange, string exchange,
@ -50,18 +50,15 @@ namespace Tapeti.Connection
// See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
var bodyArray = body.ToArray(); var bodyArray = body.ToArray();
Task.Run(async () => try
{ {
try var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
{ await onRespond(connectionReference, deliveryTag, response);
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); }
await onRespond(connectionReference, deliveryTag, response); catch
} {
catch await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
{ }
await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
}
});
} }
} }
} }

View File

@ -21,7 +21,7 @@ namespace Tapeti.Connection
{ {
private readonly Func<IModel> modelFactory; private readonly Func<IModel> modelFactory;
private readonly object taskQueueLock = new(); private readonly object taskQueueLock = new();
private SingleThreadTaskQueue? taskQueue; private SerialTaskQueue? taskQueue;
private readonly ModelProvider modelProvider; private readonly ModelProvider modelProvider;
@ -34,7 +34,7 @@ namespace Tapeti.Connection
public async Task Reset() public async Task Reset()
{ {
SingleThreadTaskQueue? capturedTaskQueue; SerialTaskQueue? capturedTaskQueue;
lock (taskQueueLock) lock (taskQueueLock)
{ {
@ -80,11 +80,11 @@ namespace Tapeti.Connection
private SingleThreadTaskQueue GetTaskQueue() private SerialTaskQueue GetTaskQueue()
{ {
lock (taskQueueLock) lock (taskQueueLock)
{ {
return taskQueue ??= new SingleThreadTaskQueue(); return taskQueue ??= new SerialTaskQueue();
} }
} }

View File

@ -777,7 +777,8 @@ namespace Tapeti.Connection
Password = connectionParams.Password, Password = connectionParams.Password,
AutomaticRecoveryEnabled = false, AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false, TopologyRecoveryEnabled = false,
RequestedHeartbeat = TimeSpan.FromSeconds(30) RequestedHeartbeat = TimeSpan.FromSeconds(30),
DispatchConsumersAsync = true
}; };
if (connectionParams.ClientProperties != null) if (connectionParams.ClientProperties != null)
@ -790,6 +791,9 @@ namespace Tapeti.Connection
} }
// TODO lock both channels when attempting the connection
// TODO when one channel is lost, do not reconnect, instead restore the channel
while (true) while (true)
{ {
try try
@ -808,25 +812,23 @@ namespace Tapeti.Connection
{ {
try try
{ {
if (connection is { IsOpen: true }) if (capturedConnection is { IsOpen: true })
connection.Close(); capturedConnection.Close();
} }
catch (AlreadyClosedException) catch (AlreadyClosedException)
{ {
} }
finally finally
{ {
connection?.Dispose(); capturedConnection.Dispose();
} }
connection = null;
} }
logger.Connect(new ConnectContext(connectionParams, isReconnect)); logger.Connect(new ConnectContext(connectionParams, isReconnect));
Interlocked.Increment(ref connectionReference); Interlocked.Increment(ref connectionReference);
lock (connectionLock) lock (connectionLock)
{ {
connection = connectionFactory.CreateConnection(); connection = connectionFactory.CreateConnection();
capturedConnection = connection; capturedConnection = connection;

View File

@ -25,8 +25,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.*" /> <PackageReference Include="Newtonsoft.Json" Version="13.*" />
<PackageReference Include="RabbitMQ.Client" Version="[6.4]" /> <PackageReference Include="RabbitMQ.Client" Version="[6.5]" />
<PackageReference Include="System.Configuration.ConfigurationManager" Version="7.0.0" />
</ItemGroup> </ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'=='netstandard2.0'"> <ItemGroup Condition="'$(TargetFramework)'=='netstandard2.0'">

View File

@ -0,0 +1,65 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Tapeti.Tasks
{
/// <summary>
/// An implementation of a queue which runs tasks serially.
/// </summary>
public class SerialTaskQueue : IDisposable
{
private readonly object previousTaskLock = new();
private Task previousTask = Task.CompletedTask;
/// <summary>
/// Add the specified synchronous action to the task queue.
/// </summary>
/// <param name="action"></param>
public Task Add(Action action)
{
lock (previousTaskLock)
{
previousTask = previousTask.ContinueWith(
_ => action(),
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.Default);
return previousTask;
}
}
/// <summary>
/// Add the specified asynchronous method to the task queue.
/// </summary>
/// <param name="func"></param>
public Task Add(Func<Task> func)
{
lock (previousTaskLock)
{
var task = previousTask.ContinueWith(
_ => func(),
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.Default);
previousTask = task;
// 'task' completes at the moment a Task is returned (for example, an await is encountered),
// this is used to chain the next. We return the unwrapped Task however, so that the caller
// awaits until the full task chain has completed.
return task.Unwrap();
}
}
/// <inheritdoc />
public void Dispose()
{
GC.SuppressFinalize(this);
}
}
}

View File

@ -1,145 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Tapeti.Tasks
{
/// <summary>
/// An implementation of a queue which runs tasks on a single thread.
/// </summary>
public class SingleThreadTaskQueue : IDisposable
{
private readonly object previousTaskLock = new();
private Task previousTask = Task.CompletedTask;
private readonly Lazy<SingleThreadTaskScheduler> singleThreadScheduler = new();
/// <summary>
/// Add the specified synchronous action to the task queue.
/// </summary>
/// <param name="action"></param>
public Task Add(Action action)
{
lock (previousTaskLock)
{
previousTask = previousTask.ContinueWith(_ => action(), CancellationToken.None
, TaskContinuationOptions.None
, singleThreadScheduler.Value);
return previousTask;
}
}
/// <summary>
/// Add the specified asynchronous method to the task queue.
/// </summary>
/// <param name="func"></param>
public Task Add(Func<Task> func)
{
lock (previousTaskLock)
{
var task = previousTask.ContinueWith(_ => func(), CancellationToken.None
, TaskContinuationOptions.None
, singleThreadScheduler.Value);
previousTask = task;
// 'task' completes at the moment a Task is returned (for example, an await is encountered),
// this is used to chain the next. We return the unwrapped Task however, so that the caller
// awaits until the full task chain has completed.
return task.Unwrap();
}
}
/// <inheritdoc />
public void Dispose()
{
if (singleThreadScheduler.IsValueCreated)
singleThreadScheduler.Value.Dispose();
}
internal class SingleThreadTaskScheduler : TaskScheduler, IDisposable
{
public override int MaximumConcurrencyLevel => 1;
private readonly Queue<Task> scheduledTasks = new();
private bool disposed;
public SingleThreadTaskScheduler()
{
// ReSharper disable once ObjectCreationAsStatement - fire and forget!
new Thread(WorkerThread).Start();
}
public void Dispose()
{
lock (scheduledTasks)
{
disposed = true;
Monitor.PulseAll(scheduledTasks);
}
}
protected override void QueueTask(Task task)
{
if (disposed) return;
lock (scheduledTasks)
{
scheduledTasks.Enqueue(task);
Monitor.Pulse(scheduledTasks);
}
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
protected override IEnumerable<Task> GetScheduledTasks()
{
lock (scheduledTasks)
{
return scheduledTasks.ToList();
}
}
private void WorkerThread()
{
while (true)
{
Task? task;
lock (scheduledTasks)
{
task = WaitAndDequeueTask();
}
if (task == null)
break;
TryExecuteTask(task);
}
}
private Task? WaitAndDequeueTask()
{
while (!scheduledTasks.Any() && !disposed)
Monitor.Wait(scheduledTasks);
return disposed ? null : scheduledTasks.Dequeue();
}
}
}
}