Fixed #16: No exception when publishing while RabbitMQ is down
This commit is contained in:
parent
7370171604
commit
09a8998a3c
@ -13,6 +13,9 @@ namespace Tapeti.Connection
|
||||
{
|
||||
public class TapetiWorker
|
||||
{
|
||||
private const int ReconnectDelay = 5000;
|
||||
private const int PublishMaxConnectAttempts = 3;
|
||||
|
||||
private readonly IConfig config;
|
||||
public TapetiConnectionParams ConnectionParams { get; set; }
|
||||
|
||||
@ -161,7 +164,7 @@ namespace Tapeti.Connection
|
||||
() => taskQueue.Value.Add(async () =>
|
||||
{
|
||||
var body = messageSerializer.Serialize(context.Message, context.Properties);
|
||||
(await GetChannel()).BasicPublish(context.Exchange, context.RoutingKey, false,
|
||||
(await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, false,
|
||||
context.Properties, body);
|
||||
}).Unwrap());
|
||||
// ReSharper restore ImplicitlyCapturedClosure
|
||||
@ -171,11 +174,12 @@ namespace Tapeti.Connection
|
||||
/// Only call this from a task in the taskQueue to ensure IModel is only used
|
||||
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
|
||||
/// </remarks>
|
||||
private async Task<IModel> GetChannel()
|
||||
private async Task<IModel> GetChannel(int? maxAttempts = null)
|
||||
{
|
||||
if (channelInstance != null)
|
||||
return channelInstance;
|
||||
|
||||
var attempts = 0;
|
||||
var connectionFactory = new ConnectionFactory
|
||||
{
|
||||
HostName = ConnectionParams.HostName,
|
||||
@ -201,7 +205,11 @@ namespace Tapeti.Connection
|
||||
}
|
||||
catch (BrokerUnreachableException)
|
||||
{
|
||||
await Task.Delay(5000);
|
||||
attempts++;
|
||||
if (maxAttempts.HasValue && attempts > maxAttempts.Value)
|
||||
throw;
|
||||
|
||||
await Task.Delay(ReconnectDelay);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user