From 80ac032f18b00d86151ff8f27ffca0aa4e20579d Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Tue, 21 Feb 2017 22:08:05 +0100 Subject: [PATCH] Fixed #16: No exception when publishing while RabbitMQ is down --- Tapeti/Connection/TapetiWorker.cs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index 60b53c7..8843a93 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -13,6 +13,9 @@ namespace Tapeti.Connection { public class TapetiWorker { + private const int ReconnectDelay = 5000; + private const int PublishMaxConnectAttempts = 3; + private readonly IConfig config; public TapetiConnectionParams ConnectionParams { get; set; } @@ -161,7 +164,7 @@ namespace Tapeti.Connection () => taskQueue.Value.Add(async () => { var body = messageSerializer.Serialize(context.Message, context.Properties); - (await GetChannel()).BasicPublish(context.Exchange, context.RoutingKey, false, + (await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, false, context.Properties, body); }).Unwrap()); // ReSharper restore ImplicitlyCapturedClosure @@ -171,11 +174,12 @@ namespace Tapeti.Connection /// Only call this from a task in the taskQueue to ensure IModel is only used /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. /// - private async Task GetChannel() + private async Task GetChannel(int? maxAttempts = null) { if (channelInstance != null) return channelInstance; + var attempts = 0; var connectionFactory = new ConnectionFactory { HostName = ConnectionParams.HostName, @@ -201,7 +205,11 @@ namespace Tapeti.Connection } catch (BrokerUnreachableException) { - await Task.Delay(5000); + attempts++; + if (maxAttempts.HasValue && attempts > maxAttempts.Value) + throw; + + await Task.Delay(ReconnectDelay); } }