1
0
mirror of synced 2024-11-22 01:13:49 +00:00

Fixed #5: Dynamic queues not reinitialized after reconnect

- Tasks in the TapetiWorker are no longer async, as that splits it in multiple tasks (and only because of a delay in GetChannel, there's no real point to the async code otherwise) which causes issues with publisher confirms
This commit is contained in:
Mark van Renswoude 2019-01-25 14:52:09 +01:00
parent 60c7020a2c
commit d37e593b78
3 changed files with 52 additions and 41 deletions

View File

@ -26,6 +26,12 @@ namespace Tapeti.Connection
} }
public Task RebindQueues()
{
return BindQueues();
}
public Task Resume() public Task Resume()
{ {
if (consuming) if (consuming)

View File

@ -62,18 +62,18 @@ namespace Tapeti.Connection
if (string.IsNullOrEmpty(queueName)) if (string.IsNullOrEmpty(queueName))
throw new ArgumentNullException(nameof(queueName)); throw new ArgumentNullException(nameof(queueName));
return taskQueue.Value.Add(async () => return taskQueue.Value.Add(() =>
{ {
(await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)); GetChannel().BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware));
}).Unwrap(); });
} }
public Task Subscribe(IQueue queue) public Task Subscribe(IQueue queue)
{ {
return taskQueue.Value.Add(async () => return taskQueue.Value.Add(() =>
{ {
var channel = await GetChannel(); var channel = GetChannel();
if (queue.Dynamic) if (queue.Dynamic)
{ {
@ -101,30 +101,30 @@ namespace Tapeti.Connection
(binding as IBuildBinding)?.SetQueueName(queue.Name); (binding as IBuildBinding)?.SetQueueName(queue.Name);
} }
} }
}).Unwrap(); });
} }
public Task Respond(ulong deliveryTag, ConsumeResponse response) public Task Respond(ulong deliveryTag, ConsumeResponse response)
{ {
return taskQueue.Value.Add(async () => return taskQueue.Value.Add(() =>
{ {
switch (response) switch (response)
{ {
case ConsumeResponse.Ack: case ConsumeResponse.Ack:
(await GetChannel()).BasicAck(deliveryTag, false); GetChannel().BasicAck(deliveryTag, false);
break; break;
case ConsumeResponse.Nack: case ConsumeResponse.Nack:
(await GetChannel()).BasicNack(deliveryTag, false, false); GetChannel().BasicNack(deliveryTag, false, false);
break; break;
case ConsumeResponse.Requeue: case ConsumeResponse.Requeue:
(await GetChannel()).BasicNack(deliveryTag, false, true); GetChannel().BasicNack(deliveryTag, false, true);
break; break;
} }
}).Unwrap(); });
} }
@ -175,7 +175,7 @@ namespace Tapeti.Connection
return MiddlewareHelper.GoAsync( return MiddlewareHelper.GoAsync(
config.PublishMiddleware, config.PublishMiddleware,
async (handler, next) => await handler.Handle(context, next), async (handler, next) => await handler.Handle(context, next),
() => taskQueue.Value.Add(async () => () => taskQueue.Value.Add(() =>
{ {
var body = messageSerializer.Serialize(context.Message, context.Properties); var body = messageSerializer.Serialize(context.Message, context.Properties);
Task<int> publishResultTask = null; Task<int> publishResultTask = null;
@ -188,15 +188,14 @@ namespace Tapeti.Connection
else else
mandatory = false; mandatory = false;
(await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body); GetChannel(PublishMaxConnectAttempts).BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body);
if (publishResultTask != null) if (publishResultTask == null)
{ return;
var timerCancellationSource = new CancellationTokenSource();
if (!publishResultTask.Wait(MandatoryReturnTimeout))
throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}");
if (await Task.WhenAny(publishResultTask, Task.Delay(MandatoryReturnTimeout, timerCancellationSource.Token)) == publishResultTask)
{
timerCancellationSource.Cancel();
var replyCode = publishResultTask.Result; var replyCode = publishResultTask.Result;
@ -207,11 +206,7 @@ namespace Tapeti.Connection
if (replyCode > 0) if (replyCode > 0)
throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}"); throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}");
} }));
else
throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}");
}
}).Unwrap());
// ReSharper restore ImplicitlyCapturedClosure // ReSharper restore ImplicitlyCapturedClosure
} }
@ -219,7 +214,7 @@ namespace Tapeti.Connection
/// Only call this from a task in the taskQueue to ensure IModel is only used /// Only call this from a task in the taskQueue to ensure IModel is only used
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
/// </remarks> /// </remarks>
private async Task<IModel> GetChannel(int? maxAttempts = null) private IModel GetChannel(int? maxAttempts = null)
{ {
if (channelInstance != null) if (channelInstance != null)
return channelInstance; return channelInstance;
@ -233,6 +228,7 @@ namespace Tapeti.Connection
UserName = ConnectionParams.Username, UserName = ConnectionParams.Username,
Password = ConnectionParams.Password, Password = ConnectionParams.Password,
AutomaticRecoveryEnabled = true, // The created connection is an IRecoverable AutomaticRecoveryEnabled = true, // The created connection is an IRecoverable
TopologyRecoveryEnabled = false, // We'll manually redeclare all queues in the Reconnect event to update the internal state for dynamic queues
RequestedHeartbeat = 30 RequestedHeartbeat = 30
}; };
@ -277,7 +273,7 @@ namespace Tapeti.Connection
if (maxAttempts.HasValue && attempts > maxAttempts.Value) if (maxAttempts.HasValue && attempts > maxAttempts.Value)
throw; throw;
await Task.Delay(ReconnectDelay); Thread.Sleep(ReconnectDelay);
} }
} }

View File

@ -14,7 +14,7 @@ namespace Tapeti
public TapetiConnectionParams Params { get; set; } public TapetiConnectionParams Params { get; set; }
private readonly Lazy<TapetiWorker> worker; private readonly Lazy<TapetiWorker> worker;
private TapetiSubscriber subscriber;
public TapetiConnection(IConfig config) public TapetiConnection(IConfig config)
{ {
@ -36,8 +36,11 @@ namespace Tapeti
public async Task<ISubscriber> Subscribe(bool startConsuming = true) public async Task<ISubscriber> Subscribe(bool startConsuming = true)
{ {
var subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList()); if (subscriber == null)
{
subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList());
await subscriber.BindQueues(); await subscriber.BindQueues();
}
if (startConsuming) if (startConsuming)
await subscriber.Resume(); await subscriber.Resume();
@ -46,9 +49,9 @@ namespace Tapeti
} }
public ISubscriber SubscribeSync() public ISubscriber SubscribeSync(bool startConsuming = true)
{ {
return Subscribe().Result; return Subscribe(startConsuming).Result;
} }
@ -97,17 +100,23 @@ namespace Tapeti
protected virtual void OnConnected(EventArgs e) protected virtual void OnConnected(EventArgs e)
{ {
Connected?.Invoke(this, e); Task.Run(() => Connected?.Invoke(this, e));
} }
protected virtual void OnReconnected(EventArgs e) protected virtual void OnReconnected(EventArgs e)
{
Task.Run(() =>
{
subscriber?.RebindQueues().ContinueWith((t) =>
{ {
Reconnected?.Invoke(this, e); Reconnected?.Invoke(this, e);
});
});
} }
protected virtual void OnDisconnected(EventArgs e) protected virtual void OnDisconnected(EventArgs e)
{ {
Disconnected?.Invoke(this, e); Task.Run(() => Disconnected?.Invoke(this, e));
} }
} }
} }