From d37e593b7837f884f5d07ef63cee68c1f284349b Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 25 Jan 2019 14:52:09 +0100 Subject: [PATCH] Fixed #5: Dynamic queues not reinitialized after reconnect - Tasks in the TapetiWorker are no longer async, as that splits it in multiple tasks (and only because of a delay in GetChannel, there's no real point to the async code otherwise) which causes issues with publisher confirms --- Tapeti/Connection/TapetiSubscriber.cs | 6 +++ Tapeti/Connection/TapetiWorker.cs | 62 +++++++++++++-------------- Tapeti/TapetiConnection.cs | 25 +++++++---- 3 files changed, 52 insertions(+), 41 deletions(-) diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 0202e1b..ce309b2 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -26,6 +26,12 @@ namespace Tapeti.Connection } + public Task RebindQueues() + { + return BindQueues(); + } + + public Task Resume() { if (consuming) diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index 6771b38..79db3ed 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -62,18 +62,18 @@ namespace Tapeti.Connection if (string.IsNullOrEmpty(queueName)) throw new ArgumentNullException(nameof(queueName)); - return taskQueue.Value.Add(async () => + return taskQueue.Value.Add(() => { - (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)); - }).Unwrap(); + GetChannel().BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)); + }); } public Task Subscribe(IQueue queue) { - return taskQueue.Value.Add(async () => + return taskQueue.Value.Add(() => { - var channel = await GetChannel(); + var channel = GetChannel(); if (queue.Dynamic) { @@ -101,30 +101,30 @@ namespace Tapeti.Connection (binding as IBuildBinding)?.SetQueueName(queue.Name); } } - }).Unwrap(); + }); } public Task Respond(ulong deliveryTag, ConsumeResponse response) { - return taskQueue.Value.Add(async () => + return taskQueue.Value.Add(() => { switch (response) { case ConsumeResponse.Ack: - (await GetChannel()).BasicAck(deliveryTag, false); + GetChannel().BasicAck(deliveryTag, false); break; case ConsumeResponse.Nack: - (await GetChannel()).BasicNack(deliveryTag, false, false); + GetChannel().BasicNack(deliveryTag, false, false); break; case ConsumeResponse.Requeue: - (await GetChannel()).BasicNack(deliveryTag, false, true); + GetChannel().BasicNack(deliveryTag, false, true); break; } - }).Unwrap(); + }); } @@ -175,7 +175,7 @@ namespace Tapeti.Connection return MiddlewareHelper.GoAsync( config.PublishMiddleware, async (handler, next) => await handler.Handle(context, next), - () => taskQueue.Value.Add(async () => + () => taskQueue.Value.Add(() => { var body = messageSerializer.Serialize(context.Message, context.Properties); Task publishResultTask = null; @@ -188,30 +188,25 @@ namespace Tapeti.Connection else mandatory = false; - (await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body); + GetChannel(PublishMaxConnectAttempts).BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body); - if (publishResultTask != null) - { - var timerCancellationSource = new CancellationTokenSource(); + if (publishResultTask == null) + return; - if (await Task.WhenAny(publishResultTask, Task.Delay(MandatoryReturnTimeout, timerCancellationSource.Token)) == publishResultTask) - { - timerCancellationSource.Cancel(); + if (!publishResultTask.Wait(MandatoryReturnTimeout)) + throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}"); - var replyCode = publishResultTask.Result; - // There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code - // at the time of writing... - if (replyCode == 312) - throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} does not have a route"); + var replyCode = publishResultTask.Result; - if (replyCode > 0) - throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}"); - } - else - throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}"); - } - }).Unwrap()); + // There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code + // at the time of writing... + if (replyCode == 312) + throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} does not have a route"); + + if (replyCode > 0) + throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}"); + })); // ReSharper restore ImplicitlyCapturedClosure } @@ -219,7 +214,7 @@ namespace Tapeti.Connection /// Only call this from a task in the taskQueue to ensure IModel is only used /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. /// - private async Task GetChannel(int? maxAttempts = null) + private IModel GetChannel(int? maxAttempts = null) { if (channelInstance != null) return channelInstance; @@ -233,6 +228,7 @@ namespace Tapeti.Connection UserName = ConnectionParams.Username, Password = ConnectionParams.Password, AutomaticRecoveryEnabled = true, // The created connection is an IRecoverable + TopologyRecoveryEnabled = false, // We'll manually redeclare all queues in the Reconnect event to update the internal state for dynamic queues RequestedHeartbeat = 30 }; @@ -277,7 +273,7 @@ namespace Tapeti.Connection if (maxAttempts.HasValue && attempts > maxAttempts.Value) throw; - await Task.Delay(ReconnectDelay); + Thread.Sleep(ReconnectDelay); } } diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index 0578a2e..238d320 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -14,7 +14,7 @@ namespace Tapeti public TapetiConnectionParams Params { get; set; } private readonly Lazy worker; - + private TapetiSubscriber subscriber; public TapetiConnection(IConfig config) { @@ -36,8 +36,11 @@ namespace Tapeti public async Task Subscribe(bool startConsuming = true) { - var subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList()); - await subscriber.BindQueues(); + if (subscriber == null) + { + subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList()); + await subscriber.BindQueues(); + } if (startConsuming) await subscriber.Resume(); @@ -46,9 +49,9 @@ namespace Tapeti } - public ISubscriber SubscribeSync() + public ISubscriber SubscribeSync(bool startConsuming = true) { - return Subscribe().Result; + return Subscribe(startConsuming).Result; } @@ -97,17 +100,23 @@ namespace Tapeti protected virtual void OnConnected(EventArgs e) { - Connected?.Invoke(this, e); + Task.Run(() => Connected?.Invoke(this, e)); } protected virtual void OnReconnected(EventArgs e) { - Reconnected?.Invoke(this, e); + Task.Run(() => + { + subscriber?.RebindQueues().ContinueWith((t) => + { + Reconnected?.Invoke(this, e); + }); + }); } protected virtual void OnDisconnected(EventArgs e) { - Disconnected?.Invoke(this, e); + Task.Run(() => Disconnected?.Invoke(this, e)); } } }