1
0
mirror of synced 2024-11-22 09:13:51 +00:00

Fixed #16: No exception when publishing while RabbitMQ is down

This commit is contained in:
Mark van Renswoude 2017-02-21 22:08:05 +01:00
parent 1fba66d105
commit 80ac032f18

View File

@ -13,6 +13,9 @@ namespace Tapeti.Connection
{ {
public class TapetiWorker public class TapetiWorker
{ {
private const int ReconnectDelay = 5000;
private const int PublishMaxConnectAttempts = 3;
private readonly IConfig config; private readonly IConfig config;
public TapetiConnectionParams ConnectionParams { get; set; } public TapetiConnectionParams ConnectionParams { get; set; }
@ -161,7 +164,7 @@ namespace Tapeti.Connection
() => taskQueue.Value.Add(async () => () => taskQueue.Value.Add(async () =>
{ {
var body = messageSerializer.Serialize(context.Message, context.Properties); var body = messageSerializer.Serialize(context.Message, context.Properties);
(await GetChannel()).BasicPublish(context.Exchange, context.RoutingKey, false, (await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, false,
context.Properties, body); context.Properties, body);
}).Unwrap()); }).Unwrap());
// ReSharper restore ImplicitlyCapturedClosure // ReSharper restore ImplicitlyCapturedClosure
@ -171,11 +174,12 @@ namespace Tapeti.Connection
/// Only call this from a task in the taskQueue to ensure IModel is only used /// Only call this from a task in the taskQueue to ensure IModel is only used
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
/// </remarks> /// </remarks>
private async Task<IModel> GetChannel() private async Task<IModel> GetChannel(int? maxAttempts = null)
{ {
if (channelInstance != null) if (channelInstance != null)
return channelInstance; return channelInstance;
var attempts = 0;
var connectionFactory = new ConnectionFactory var connectionFactory = new ConnectionFactory
{ {
HostName = ConnectionParams.HostName, HostName = ConnectionParams.HostName,
@ -201,7 +205,11 @@ namespace Tapeti.Connection
} }
catch (BrokerUnreachableException) catch (BrokerUnreachableException)
{ {
await Task.Delay(5000); attempts++;
if (maxAttempts.HasValue && attempts > maxAttempts.Value)
throw;
await Task.Delay(ReconnectDelay);
} }
} }