diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 5a6a49a..289b246 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -80,7 +80,7 @@ namespace Tapeti.Flow.Default await context.Store(); - await publisher.Publish(message, properties); + await publisher.Publish(message, properties, true); } @@ -105,9 +105,9 @@ namespace Tapeti.Flow.Default // TODO disallow if replyto is not specified? if (reply.ReplyTo != null) - await publisher.PublishDirect(message, reply.ReplyTo, properties); + await publisher.PublishDirect(message, reply.ReplyTo, properties, true); else - await publisher.Publish(message, properties); + await publisher.Publish(message, properties, true); await context.Delete(); } diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs index 65605bb..5968966 100644 --- a/Tapeti.Serilog/TapetiSeriLogger.cs +++ b/Tapeti.Serilog/TapetiSeriLogger.cs @@ -1,6 +1,8 @@ using System; using ISeriLogger = Serilog.ILogger; +// ReSharper disable UnusedMember.Global + namespace Tapeti.Serilog { public class TapetiSeriLogger: ILogger diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs index 46bf575..07494cb 100644 --- a/Tapeti/Config/IConfig.cs +++ b/Tapeti/Config/IConfig.cs @@ -7,6 +7,8 @@ namespace Tapeti.Config { public interface IConfig { + bool UsePublisherConfirms { get; } + IDependencyResolver DependencyResolver { get; } IReadOnlyList MessageMiddleware { get; } IReadOnlyList CleanupMiddleware { get; } diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs index 8f8c742..d3afeb6 100644 --- a/Tapeti/Connection/TapetiPublisher.cs +++ b/Tapeti/Connection/TapetiPublisher.cs @@ -17,19 +17,19 @@ namespace Tapeti.Connection public Task Publish(object message) { - return workerFactory().Publish(message, null); + return workerFactory().Publish(message, null, false); } - public Task Publish(object message, IBasicProperties properties) + public Task Publish(object message, IBasicProperties properties, bool mandatory) { - return workerFactory().Publish(message, properties); + return workerFactory().Publish(message, properties, mandatory); } - public Task PublishDirect(object message, string queueName, IBasicProperties properties) + public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory) { - return workerFactory().PublishDirect(message, queueName, properties); + return workerFactory().PublishDirect(message, queueName, properties, mandatory); } } } diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index d5781cd..6771b38 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -1,10 +1,12 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; using Tapeti.Config; +using Tapeti.Exceptions; using Tapeti.Helpers; using Tapeti.Tasks; @@ -13,6 +15,7 @@ namespace Tapeti.Connection public class TapetiWorker { private const int ReconnectDelay = 5000; + private const int MandatoryReturnTimeout = 30000; private const int PublishMaxConnectAttempts = 3; private readonly IConfig config; @@ -24,8 +27,11 @@ namespace Tapeti.Connection private readonly IRoutingKeyStrategy routingKeyStrategy; private readonly IExchangeStrategy exchangeStrategy; private readonly Lazy taskQueue = new Lazy(); + + // These fields are for use in the taskQueue only! private RabbitMQ.Client.IConnection connection; private IModel channelInstance; + private TaskCompletionSource publishResultTaskSource; public TapetiWorker(IConfig config) @@ -39,15 +45,15 @@ namespace Tapeti.Connection } - public Task Publish(object message, IBasicProperties properties) + public Task Publish(object message, IBasicProperties properties, bool mandatory) { - return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType())); + return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType()), mandatory); } - public Task PublishDirect(object message, string queueName, IBasicProperties properties) + public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory) { - return Publish(message, properties, "", queueName); + return Publish(message, properties, "", queueName, mandatory); } @@ -147,7 +153,7 @@ namespace Tapeti.Connection } - private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey) + private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey, bool mandatory) { var context = new PublishContext { @@ -172,8 +178,39 @@ namespace Tapeti.Connection () => taskQueue.Value.Add(async () => { var body = messageSerializer.Serialize(context.Message, context.Properties); - (await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, false, - context.Properties, body); + Task publishResultTask = null; + + if (config.UsePublisherConfirms) + { + publishResultTaskSource = new TaskCompletionSource(); + publishResultTask = publishResultTaskSource.Task; + } + else + mandatory = false; + + (await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body); + + if (publishResultTask != null) + { + var timerCancellationSource = new CancellationTokenSource(); + + if (await Task.WhenAny(publishResultTask, Task.Delay(MandatoryReturnTimeout, timerCancellationSource.Token)) == publishResultTask) + { + timerCancellationSource.Cancel(); + + var replyCode = publishResultTask.Result; + + // There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code + // at the time of writing... + if (replyCode == 312) + throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} does not have a route"); + + if (replyCode > 0) + throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}"); + } + else + throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}"); + } }).Unwrap()); // ReSharper restore ImplicitlyCapturedClosure } @@ -207,13 +244,25 @@ namespace Tapeti.Connection connection = connectionFactory.CreateConnection(); channelInstance = connection.CreateModel(); + channelInstance.ConfirmSelect(); if (ConnectionParams.PrefetchCount > 0) channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false); ((IRecoverable)connection).Recovery += (sender, e) => ConnectionEventListener?.Reconnected(); - channelInstance.ModelShutdown += (sender, e) => ConnectionEventListener?.Disconnected(); + channelInstance.ModelShutdown += (sender, eventArgs) => ConnectionEventListener?.Disconnected(); + channelInstance.BasicReturn += (sender, eventArgs) => + { + publishResultTaskSource?.SetResult(eventArgs.ReplyCode); + publishResultTaskSource = null; + }; + + channelInstance.BasicAcks += (sender, eventArgs) => + { + publishResultTaskSource?.SetResult(0); + publishResultTaskSource = null; + }; ConnectionEventListener?.Connected(); logger.ConnectSuccess(ConnectionParams); diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index e8c5776..a00f8d2 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -16,7 +16,7 @@ namespace Tapeti.Default public void ConnectSuccess(TapetiConnectionParams connectionParams) { - Console.WriteLine($"[Tapeti] Connected"); + Console.WriteLine("[Tapeti] Connected"); } public void HandlerException(Exception e) diff --git a/Tapeti/Default/PublishResultBinding.cs b/Tapeti/Default/PublishResultBinding.cs index 53e5118..2ffe4e1 100644 --- a/Tapeti/Default/PublishResultBinding.cs +++ b/Tapeti/Default/PublishResultBinding.cs @@ -69,9 +69,9 @@ namespace Tapeti.Default properties.CorrelationId = messageContext.Properties.CorrelationId; if (messageContext.Properties.IsReplyToPresent()) - return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties); + return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, true); - return publisher.Publish(message, properties); + return publisher.Publish(message, properties, false); } } } diff --git a/Tapeti/Exceptions/NoRouteException.cs b/Tapeti/Exceptions/NoRouteException.cs new file mode 100644 index 0000000..2dcd591 --- /dev/null +++ b/Tapeti/Exceptions/NoRouteException.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Exceptions +{ + public class NoRouteException : Exception + { + public NoRouteException(string message) : base(message) { } + } +} diff --git a/Tapeti/IPublisher.cs b/Tapeti/IPublisher.cs index f1bf689..c55f47c 100644 --- a/Tapeti/IPublisher.cs +++ b/Tapeti/IPublisher.cs @@ -13,7 +13,7 @@ namespace Tapeti public interface IInternalPublisher : IPublisher { - Task Publish(object message, IBasicProperties properties); - Task PublishDirect(object message, string queueName, IBasicProperties properties); + Task Publish(object message, IBasicProperties properties, bool mandatory); + Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory); } } diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 3807190..9a0867f 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -8,6 +8,7 @@ using Tapeti.Config; using Tapeti.Default; using Tapeti.Helpers; +// ReSharper disable UnusedMember.Global namespace Tapeti { @@ -31,6 +32,8 @@ namespace Tapeti private readonly IDependencyResolver dependencyResolver; + private bool usePublisherConfirms = true; + public TapetiConfig(IDependencyResolver dependencyResolver) { @@ -90,7 +93,16 @@ namespace Tapeti queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl))); } - var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues); + var config = new Config(queues) + { + DependencyResolver = dependencyResolver, + MessageMiddleware = messageMiddleware, + CleanupMiddleware = cleanupMiddleware, + PublishMiddleware = publishMiddleware, + + UsePublisherConfirms = usePublisherConfirms + }; + (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); return config; @@ -154,11 +166,34 @@ namespace Tapeti return this; } + + /// + /// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds, + /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may + /// result in never-ending flows. Only disable if you can accept those consequences. + /// + public TapetiConfig DisablePublisherConfirms() + { + usePublisherConfirms = false; + return this; + } + + + /// + /// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds, + /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may + /// result in never-ending flows. Only disable if you accept those consequences. + /// + public TapetiConfig SetPublisherConfirms(bool enabled) + { + usePublisherConfirms = enabled; + return this; + } + public void RegisterDefaults() { - var container = dependencyResolver as IDependencyContainer; - if (container == null) + if (!(dependencyResolver is IDependencyContainer container)) return; if (ConsoleHelper.IsAvailable()) @@ -399,21 +434,19 @@ namespace Tapeti protected class Config : IConfig { - public IDependencyResolver DependencyResolver { get; } - public IReadOnlyList MessageMiddleware { get; } - public IReadOnlyList CleanupMiddleware { get; } - public IReadOnlyList PublishMiddleware { get; } + public bool UsePublisherConfirms { get; set; } + + public IDependencyResolver DependencyResolver { get; set; } + public IReadOnlyList MessageMiddleware { get; set; } + public IReadOnlyList CleanupMiddleware { get; set; } + public IReadOnlyList PublishMiddleware { get; set; } public IEnumerable Queues { get; } private readonly Dictionary bindingMethodLookup; - public Config(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IReadOnlyList cleanupMiddleware, IReadOnlyList publishMiddleware, IEnumerable queues) + public Config(IEnumerable queues) { - DependencyResolver = dependencyResolver; - MessageMiddleware = messageMiddleware; - CleanupMiddleware = cleanupMiddleware; - PublishMiddleware = publishMiddleware; Queues = queues.ToList(); bindingMethodLookup = Queues.SelectMany(q => q.Bindings).ToDictionary(b => b.Method, b => b); diff --git a/Test/MyLogger.cs b/Test/MyLogger.cs deleted file mode 100644 index 81a60d8..0000000 --- a/Test/MyLogger.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System; -using Tapeti; - -namespace Test -{ - public class MyLogger : ILogger - { - public void Connect(TapetiConnectionParams connectionParams) - { - } - - public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception) - { - } - - public void ConnectSuccess(TapetiConnectionParams connectionParams) - { - } - - public void HandlerException(Exception e) - { - Console.WriteLine("Mylogger: " + e.Message); - } - } -} diff --git a/Test/Program.cs b/Test/Program.cs index be6eed9..62377aa 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -5,7 +5,6 @@ using Tapeti.DataAnnotations; using Tapeti.Flow; using Tapeti.SimpleInjector; using System.Threading; -using Tapeti.Flow.SQL; namespace Test { @@ -14,58 +13,60 @@ namespace Test private static void Main() { // TODO logging - - var container = new Container(); - container.Register(); - container.Register(); - container.Register(); - - var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) - //.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true") - .WithFlow() - .WithDataAnnotations() - .RegisterAllControllers() - .Build(); - - using (var connection = new TapetiConnection(config) + try { - Params = new TapetiAppSettingsConnectionParams() - }) + var container = new Container(); + container.Register(); + container.Register(); + container.Register(); + + var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) + //.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true") + .WithFlow() + .WithDataAnnotations() + .RegisterAllControllers() + //.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails + .Build(); + + using (var connection = new TapetiConnection(config) + { + Params = new TapetiAppSettingsConnectionParams() + }) + { + var flowStore = container.GetInstance(); + var flowStore2 = container.GetInstance(); + + Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2)); + + connection.Connected += (sender, e) => { Console.WriteLine("Event Connected"); }; + connection.Disconnected += (sender, e) => { Console.WriteLine("Event Disconnected"); }; + connection.Reconnected += (sender, e) => { Console.WriteLine("Event Reconnected"); }; + + Console.WriteLine("Subscribing..."); + var subscriber = connection.Subscribe(false).Result; + + Console.WriteLine("Consuming..."); + subscriber.Resume().Wait(); + + Console.WriteLine("Done!"); + + connection.GetPublisher().Publish(new FlowEndController.PingMessage()); + + //container.GetInstance().Start(c => c.StartFlow, true).Wait(); + container.GetInstance().Start(c => c.TestParallelRequest).Wait(); + + Thread.Sleep(1000); + + var emitter = container.GetInstance(); + emitter.Run().Wait(); + + + } + } + catch (Exception e) { - var flowStore = container.GetInstance(); - var flowStore2 = container.GetInstance(); - - Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2)); - - connection.Connected += (sender, e) => { - Console.WriteLine("Event Connected"); - }; - connection.Disconnected += (sender, e) => { - Console.WriteLine("Event Disconnected"); - }; - connection.Reconnected += (sender, e) => { - Console.WriteLine("Event Reconnected"); - }; - - Console.WriteLine("Subscribing..."); - var subscriber = connection.Subscribe(false).Result; - - Console.WriteLine("Consuming..."); - subscriber.Resume().Wait(); - - Console.WriteLine("Done!"); - - connection.GetPublisher().Publish(new FlowEndController.PingMessage()); - - //container.GetInstance().Start(c => c.StartFlow, true); - container.GetInstance().Start(c => c.TestParallelRequest); - - Thread.Sleep(1000); - - var emitter = container.GetInstance(); - emitter.Run().Wait(); - - + Console.WriteLine(e.ToString()); + Console.ReadKey(); } } }