1
0
mirror of synced 2025-01-22 16:13:07 +01:00

Fixed #6: Use 'mandatory' on replies

This commit is contained in:
Mark van Renswoude 2019-01-24 22:52:21 +01:00
parent 37d55ac71d
commit 60c7020a2c
12 changed files with 180 additions and 109 deletions

View File

@ -80,7 +80,7 @@ namespace Tapeti.Flow.Default
await context.Store();
await publisher.Publish(message, properties);
await publisher.Publish(message, properties, true);
}
@ -105,9 +105,9 @@ namespace Tapeti.Flow.Default
// TODO disallow if replyto is not specified?
if (reply.ReplyTo != null)
await publisher.PublishDirect(message, reply.ReplyTo, properties);
await publisher.PublishDirect(message, reply.ReplyTo, properties, true);
else
await publisher.Publish(message, properties);
await publisher.Publish(message, properties, true);
await context.Delete();
}

View File

@ -1,6 +1,8 @@
using System;
using ISeriLogger = Serilog.ILogger;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Serilog
{
public class TapetiSeriLogger: ILogger

View File

@ -7,6 +7,8 @@ namespace Tapeti.Config
{
public interface IConfig
{
bool UsePublisherConfirms { get; }
IDependencyResolver DependencyResolver { get; }
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
IReadOnlyList<ICleanupMiddleware> CleanupMiddleware { get; }

View File

@ -17,19 +17,19 @@ namespace Tapeti.Connection
public Task Publish(object message)
{
return workerFactory().Publish(message, null);
return workerFactory().Publish(message, null, false);
}
public Task Publish(object message, IBasicProperties properties)
public Task Publish(object message, IBasicProperties properties, bool mandatory)
{
return workerFactory().Publish(message, properties);
return workerFactory().Publish(message, properties, mandatory);
}
public Task PublishDirect(object message, string queueName, IBasicProperties properties)
public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory)
{
return workerFactory().PublishDirect(message, queueName, properties);
return workerFactory().PublishDirect(message, queueName, properties, mandatory);
}
}
}

View File

@ -1,10 +1,12 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing;
using Tapeti.Config;
using Tapeti.Exceptions;
using Tapeti.Helpers;
using Tapeti.Tasks;
@ -13,6 +15,7 @@ namespace Tapeti.Connection
public class TapetiWorker
{
private const int ReconnectDelay = 5000;
private const int MandatoryReturnTimeout = 30000;
private const int PublishMaxConnectAttempts = 3;
private readonly IConfig config;
@ -24,8 +27,11 @@ namespace Tapeti.Connection
private readonly IRoutingKeyStrategy routingKeyStrategy;
private readonly IExchangeStrategy exchangeStrategy;
private readonly Lazy<SingleThreadTaskQueue> taskQueue = new Lazy<SingleThreadTaskQueue>();
// These fields are for use in the taskQueue only!
private RabbitMQ.Client.IConnection connection;
private IModel channelInstance;
private TaskCompletionSource<int> publishResultTaskSource;
public TapetiWorker(IConfig config)
@ -39,15 +45,15 @@ namespace Tapeti.Connection
}
public Task Publish(object message, IBasicProperties properties)
public Task Publish(object message, IBasicProperties properties, bool mandatory)
{
return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType()));
return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType()), mandatory);
}
public Task PublishDirect(object message, string queueName, IBasicProperties properties)
public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory)
{
return Publish(message, properties, "", queueName);
return Publish(message, properties, "", queueName, mandatory);
}
@ -147,7 +153,7 @@ namespace Tapeti.Connection
}
private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey)
private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey, bool mandatory)
{
var context = new PublishContext
{
@ -172,8 +178,39 @@ namespace Tapeti.Connection
() => taskQueue.Value.Add(async () =>
{
var body = messageSerializer.Serialize(context.Message, context.Properties);
(await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, false,
context.Properties, body);
Task<int> publishResultTask = null;
if (config.UsePublisherConfirms)
{
publishResultTaskSource = new TaskCompletionSource<int>();
publishResultTask = publishResultTaskSource.Task;
}
else
mandatory = false;
(await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body);
if (publishResultTask != null)
{
var timerCancellationSource = new CancellationTokenSource();
if (await Task.WhenAny(publishResultTask, Task.Delay(MandatoryReturnTimeout, timerCancellationSource.Token)) == publishResultTask)
{
timerCancellationSource.Cancel();
var replyCode = publishResultTask.Result;
// There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code
// at the time of writing...
if (replyCode == 312)
throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} does not have a route");
if (replyCode > 0)
throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}");
}
else
throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}");
}
}).Unwrap());
// ReSharper restore ImplicitlyCapturedClosure
}
@ -207,13 +244,25 @@ namespace Tapeti.Connection
connection = connectionFactory.CreateConnection();
channelInstance = connection.CreateModel();
channelInstance.ConfirmSelect();
if (ConnectionParams.PrefetchCount > 0)
channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false);
((IRecoverable)connection).Recovery += (sender, e) => ConnectionEventListener?.Reconnected();
channelInstance.ModelShutdown += (sender, e) => ConnectionEventListener?.Disconnected();
channelInstance.ModelShutdown += (sender, eventArgs) => ConnectionEventListener?.Disconnected();
channelInstance.BasicReturn += (sender, eventArgs) =>
{
publishResultTaskSource?.SetResult(eventArgs.ReplyCode);
publishResultTaskSource = null;
};
channelInstance.BasicAcks += (sender, eventArgs) =>
{
publishResultTaskSource?.SetResult(0);
publishResultTaskSource = null;
};
ConnectionEventListener?.Connected();
logger.ConnectSuccess(ConnectionParams);

View File

@ -16,7 +16,7 @@ namespace Tapeti.Default
public void ConnectSuccess(TapetiConnectionParams connectionParams)
{
Console.WriteLine($"[Tapeti] Connected");
Console.WriteLine("[Tapeti] Connected");
}
public void HandlerException(Exception e)

View File

@ -69,9 +69,9 @@ namespace Tapeti.Default
properties.CorrelationId = messageContext.Properties.CorrelationId;
if (messageContext.Properties.IsReplyToPresent())
return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties);
return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, true);
return publisher.Publish(message, properties);
return publisher.Publish(message, properties, false);
}
}
}

View File

@ -0,0 +1,9 @@
using System;
namespace Tapeti.Exceptions
{
public class NoRouteException : Exception
{
public NoRouteException(string message) : base(message) { }
}
}

View File

@ -13,7 +13,7 @@ namespace Tapeti
public interface IInternalPublisher : IPublisher
{
Task Publish(object message, IBasicProperties properties);
Task PublishDirect(object message, string queueName, IBasicProperties properties);
Task Publish(object message, IBasicProperties properties, bool mandatory);
Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory);
}
}

View File

@ -8,6 +8,7 @@ using Tapeti.Config;
using Tapeti.Default;
using Tapeti.Helpers;
// ReSharper disable UnusedMember.Global
namespace Tapeti
{
@ -31,6 +32,8 @@ namespace Tapeti
private readonly IDependencyResolver dependencyResolver;
private bool usePublisherConfirms = true;
public TapetiConfig(IDependencyResolver dependencyResolver)
{
@ -90,7 +93,16 @@ namespace Tapeti
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl)));
}
var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues);
var config = new Config(queues)
{
DependencyResolver = dependencyResolver,
MessageMiddleware = messageMiddleware,
CleanupMiddleware = cleanupMiddleware,
PublishMiddleware = publishMiddleware,
UsePublisherConfirms = usePublisherConfirms
};
(dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<IConfig>(config);
return config;
@ -154,11 +166,34 @@ namespace Tapeti
return this;
}
/// <summary>
/// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds,
/// and disables Tapeti.Flow from verifying if a request/response can be routed. This may
/// result in never-ending flows. Only disable if you can accept those consequences.
/// </summary>
public TapetiConfig DisablePublisherConfirms()
{
usePublisherConfirms = false;
return this;
}
/// <summary>
/// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds,
/// and disables Tapeti.Flow from verifying if a request/response can be routed. This may
/// result in never-ending flows. Only disable if you accept those consequences.
/// </summary>
public TapetiConfig SetPublisherConfirms(bool enabled)
{
usePublisherConfirms = enabled;
return this;
}
public void RegisterDefaults()
{
var container = dependencyResolver as IDependencyContainer;
if (container == null)
if (!(dependencyResolver is IDependencyContainer container))
return;
if (ConsoleHelper.IsAvailable())
@ -399,21 +434,19 @@ namespace Tapeti
protected class Config : IConfig
{
public IDependencyResolver DependencyResolver { get; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
public IReadOnlyList<ICleanupMiddleware> CleanupMiddleware { get; }
public IReadOnlyList<IPublishMiddleware> PublishMiddleware { get; }
public bool UsePublisherConfirms { get; set; }
public IDependencyResolver DependencyResolver { get; set; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; set; }
public IReadOnlyList<ICleanupMiddleware> CleanupMiddleware { get; set; }
public IReadOnlyList<IPublishMiddleware> PublishMiddleware { get; set; }
public IEnumerable<IQueue> Queues { get; }
private readonly Dictionary<MethodInfo, IBinding> bindingMethodLookup;
public Config(IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware, IReadOnlyList<ICleanupMiddleware> cleanupMiddleware, IReadOnlyList<IPublishMiddleware> publishMiddleware, IEnumerable<IQueue> queues)
public Config(IEnumerable<IQueue> queues)
{
DependencyResolver = dependencyResolver;
MessageMiddleware = messageMiddleware;
CleanupMiddleware = cleanupMiddleware;
PublishMiddleware = publishMiddleware;
Queues = queues.ToList();
bindingMethodLookup = Queues.SelectMany(q => q.Bindings).ToDictionary(b => b.Method, b => b);

View File

@ -1,25 +0,0 @@
using System;
using Tapeti;
namespace Test
{
public class MyLogger : ILogger
{
public void Connect(TapetiConnectionParams connectionParams)
{
}
public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception)
{
}
public void ConnectSuccess(TapetiConnectionParams connectionParams)
{
}
public void HandlerException(Exception e)
{
Console.WriteLine("Mylogger: " + e.Message);
}
}
}

View File

@ -5,7 +5,6 @@ using Tapeti.DataAnnotations;
using Tapeti.Flow;
using Tapeti.SimpleInjector;
using System.Threading;
using Tapeti.Flow.SQL;
namespace Test
{
@ -14,58 +13,60 @@ namespace Test
private static void Main()
{
// TODO logging
var container = new Container();
container.Register<MarcoEmitter>();
container.Register<Visualizer>();
container.Register<ILogger, Tapeti.Default.ConsoleLogger>();
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
//.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true")
.WithFlow()
.WithDataAnnotations()
.RegisterAllControllers()
.Build();
using (var connection = new TapetiConnection(config)
try
{
Params = new TapetiAppSettingsConnectionParams()
})
var container = new Container();
container.Register<MarcoEmitter>();
container.Register<Visualizer>();
container.Register<ILogger, Tapeti.Default.ConsoleLogger>();
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
//.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true")
.WithFlow()
.WithDataAnnotations()
.RegisterAllControllers()
//.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails
.Build();
using (var connection = new TapetiConnection(config)
{
Params = new TapetiAppSettingsConnectionParams()
})
{
var flowStore = container.GetInstance<IFlowStore>();
var flowStore2 = container.GetInstance<IFlowStore>();
Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2));
connection.Connected += (sender, e) => { Console.WriteLine("Event Connected"); };
connection.Disconnected += (sender, e) => { Console.WriteLine("Event Disconnected"); };
connection.Reconnected += (sender, e) => { Console.WriteLine("Event Reconnected"); };
Console.WriteLine("Subscribing...");
var subscriber = connection.Subscribe(false).Result;
Console.WriteLine("Consuming...");
subscriber.Resume().Wait();
Console.WriteLine("Done!");
connection.GetPublisher().Publish(new FlowEndController.PingMessage());
//container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true).Wait();
container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.TestParallelRequest).Wait();
Thread.Sleep(1000);
var emitter = container.GetInstance<MarcoEmitter>();
emitter.Run().Wait();
}
}
catch (Exception e)
{
var flowStore = container.GetInstance<IFlowStore>();
var flowStore2 = container.GetInstance<IFlowStore>();
Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2));
connection.Connected += (sender, e) => {
Console.WriteLine("Event Connected");
};
connection.Disconnected += (sender, e) => {
Console.WriteLine("Event Disconnected");
};
connection.Reconnected += (sender, e) => {
Console.WriteLine("Event Reconnected");
};
Console.WriteLine("Subscribing...");
var subscriber = connection.Subscribe(false).Result;
Console.WriteLine("Consuming...");
subscriber.Resume().Wait();
Console.WriteLine("Done!");
connection.GetPublisher().Publish(new FlowEndController.PingMessage());
//container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true);
container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.TestParallelRequest);
Thread.Sleep(1000);
var emitter = container.GetInstance<MarcoEmitter>();
emitter.Run().Wait();
Console.WriteLine(e.ToString());
Console.ReadKey();
}
}
}