From 37d55ac71da76e40127064222f8a14b3f0d28972 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 24 Jan 2019 14:47:46 +0100 Subject: [PATCH 1/6] Fixed #7: Do not allow null for reply object in case of a Request attribute --- Tapeti/Default/PublishResultBinding.cs | 28 +++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/Tapeti/Default/PublishResultBinding.cs b/Tapeti/Default/PublishResultBinding.cs index d02d087..53e5118 100644 --- a/Tapeti/Default/PublishResultBinding.cs +++ b/Tapeti/Default/PublishResultBinding.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Reflection; using System.Threading.Tasks; using RabbitMQ.Client.Framing; +using Tapeti.Annotations; using Tapeti.Config; using Tapeti.Helpers; @@ -18,23 +19,30 @@ namespace Tapeti.Default return; - if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out var isTaskOf, out var actualType)) + var hasClassResult = context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out var isTaskOf, out var actualType); + + var request = context.MessageClass?.GetCustomAttribute(); + var expectedClassResult = request?.Response; + + // Verify the return type matches with the Request attribute of the message class. This is a backwards incompatible change in + // Tapeti 1.2: if you just want to publish another message as a result of the incoming message, explicitly call IPublisher.Publish. + if (!hasClassResult && expectedClassResult != null || hasClassResult && expectedClassResult != actualType) + throw new ArgumentException($"Message handler must return type {expectedClassResult?.FullName ?? "void"} in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}, found: {actualType?.FullName ?? "void"}"); + + if (!hasClassResult) return; + if (isTaskOf) { var handler = GetType().GetMethod("PublishGenericTaskResult", BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType); Debug.Assert(handler != null, nameof(handler) + " != null"); - context.Result.SetHandler(async (messageContext, value) => - { - await (Task)handler.Invoke(null, new[] { messageContext, value }); - }); + context.Result.SetHandler(async (messageContext, value) => { await (Task) handler.Invoke(null, new[] {messageContext, value }); }); } else - context.Result.SetHandler((messageContext, value) => - value == null ? null : Reply(value, messageContext)); + context.Result.SetHandler((messageContext, value) => Reply(value, messageContext)); } @@ -43,13 +51,15 @@ namespace Tapeti.Default private static async Task PublishGenericTaskResult(IMessageContext messageContext, object value) where T : class { var message = await (Task)value; - if (message != null) - await Reply(message, messageContext); + await Reply(message, messageContext); } private static Task Reply(object message, IMessageContext messageContext) { + if (message == null) + throw new ArgumentException("Return value of a request message handler must not be null"); + var publisher = (IInternalPublisher)messageContext.DependencyResolver.Resolve(); var properties = new BasicProperties(); From 60c7020a2c9a9fa8c2aa3eeae6cf6cbedffd98db Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 24 Jan 2019 22:52:21 +0100 Subject: [PATCH 2/6] Fixed #6: Use 'mandatory' on replies --- Tapeti.Flow/Default/FlowProvider.cs | 6 +- Tapeti.Serilog/TapetiSeriLogger.cs | 2 + Tapeti/Config/IConfig.cs | 2 + Tapeti/Connection/TapetiPublisher.cs | 10 +-- Tapeti/Connection/TapetiWorker.cs | 65 ++++++++++++++-- Tapeti/Default/ConsoleLogger.cs | 2 +- Tapeti/Default/PublishResultBinding.cs | 4 +- Tapeti/Exceptions/NoRouteException.cs | 9 +++ Tapeti/IPublisher.cs | 4 +- Tapeti/TapetiConfig.cs | 57 +++++++++++--- Test/MyLogger.cs | 25 ------ Test/Program.cs | 103 +++++++++++++------------ 12 files changed, 180 insertions(+), 109 deletions(-) create mode 100644 Tapeti/Exceptions/NoRouteException.cs delete mode 100644 Test/MyLogger.cs diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 5a6a49a..289b246 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -80,7 +80,7 @@ namespace Tapeti.Flow.Default await context.Store(); - await publisher.Publish(message, properties); + await publisher.Publish(message, properties, true); } @@ -105,9 +105,9 @@ namespace Tapeti.Flow.Default // TODO disallow if replyto is not specified? if (reply.ReplyTo != null) - await publisher.PublishDirect(message, reply.ReplyTo, properties); + await publisher.PublishDirect(message, reply.ReplyTo, properties, true); else - await publisher.Publish(message, properties); + await publisher.Publish(message, properties, true); await context.Delete(); } diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs index 65605bb..5968966 100644 --- a/Tapeti.Serilog/TapetiSeriLogger.cs +++ b/Tapeti.Serilog/TapetiSeriLogger.cs @@ -1,6 +1,8 @@ using System; using ISeriLogger = Serilog.ILogger; +// ReSharper disable UnusedMember.Global + namespace Tapeti.Serilog { public class TapetiSeriLogger: ILogger diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs index 46bf575..07494cb 100644 --- a/Tapeti/Config/IConfig.cs +++ b/Tapeti/Config/IConfig.cs @@ -7,6 +7,8 @@ namespace Tapeti.Config { public interface IConfig { + bool UsePublisherConfirms { get; } + IDependencyResolver DependencyResolver { get; } IReadOnlyList MessageMiddleware { get; } IReadOnlyList CleanupMiddleware { get; } diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs index 8f8c742..d3afeb6 100644 --- a/Tapeti/Connection/TapetiPublisher.cs +++ b/Tapeti/Connection/TapetiPublisher.cs @@ -17,19 +17,19 @@ namespace Tapeti.Connection public Task Publish(object message) { - return workerFactory().Publish(message, null); + return workerFactory().Publish(message, null, false); } - public Task Publish(object message, IBasicProperties properties) + public Task Publish(object message, IBasicProperties properties, bool mandatory) { - return workerFactory().Publish(message, properties); + return workerFactory().Publish(message, properties, mandatory); } - public Task PublishDirect(object message, string queueName, IBasicProperties properties) + public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory) { - return workerFactory().PublishDirect(message, queueName, properties); + return workerFactory().PublishDirect(message, queueName, properties, mandatory); } } } diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index d5781cd..6771b38 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -1,10 +1,12 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; using Tapeti.Config; +using Tapeti.Exceptions; using Tapeti.Helpers; using Tapeti.Tasks; @@ -13,6 +15,7 @@ namespace Tapeti.Connection public class TapetiWorker { private const int ReconnectDelay = 5000; + private const int MandatoryReturnTimeout = 30000; private const int PublishMaxConnectAttempts = 3; private readonly IConfig config; @@ -24,8 +27,11 @@ namespace Tapeti.Connection private readonly IRoutingKeyStrategy routingKeyStrategy; private readonly IExchangeStrategy exchangeStrategy; private readonly Lazy taskQueue = new Lazy(); + + // These fields are for use in the taskQueue only! private RabbitMQ.Client.IConnection connection; private IModel channelInstance; + private TaskCompletionSource publishResultTaskSource; public TapetiWorker(IConfig config) @@ -39,15 +45,15 @@ namespace Tapeti.Connection } - public Task Publish(object message, IBasicProperties properties) + public Task Publish(object message, IBasicProperties properties, bool mandatory) { - return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType())); + return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType()), mandatory); } - public Task PublishDirect(object message, string queueName, IBasicProperties properties) + public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory) { - return Publish(message, properties, "", queueName); + return Publish(message, properties, "", queueName, mandatory); } @@ -147,7 +153,7 @@ namespace Tapeti.Connection } - private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey) + private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey, bool mandatory) { var context = new PublishContext { @@ -172,8 +178,39 @@ namespace Tapeti.Connection () => taskQueue.Value.Add(async () => { var body = messageSerializer.Serialize(context.Message, context.Properties); - (await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, false, - context.Properties, body); + Task publishResultTask = null; + + if (config.UsePublisherConfirms) + { + publishResultTaskSource = new TaskCompletionSource(); + publishResultTask = publishResultTaskSource.Task; + } + else + mandatory = false; + + (await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body); + + if (publishResultTask != null) + { + var timerCancellationSource = new CancellationTokenSource(); + + if (await Task.WhenAny(publishResultTask, Task.Delay(MandatoryReturnTimeout, timerCancellationSource.Token)) == publishResultTask) + { + timerCancellationSource.Cancel(); + + var replyCode = publishResultTask.Result; + + // There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code + // at the time of writing... + if (replyCode == 312) + throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} does not have a route"); + + if (replyCode > 0) + throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}"); + } + else + throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}"); + } }).Unwrap()); // ReSharper restore ImplicitlyCapturedClosure } @@ -207,13 +244,25 @@ namespace Tapeti.Connection connection = connectionFactory.CreateConnection(); channelInstance = connection.CreateModel(); + channelInstance.ConfirmSelect(); if (ConnectionParams.PrefetchCount > 0) channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false); ((IRecoverable)connection).Recovery += (sender, e) => ConnectionEventListener?.Reconnected(); - channelInstance.ModelShutdown += (sender, e) => ConnectionEventListener?.Disconnected(); + channelInstance.ModelShutdown += (sender, eventArgs) => ConnectionEventListener?.Disconnected(); + channelInstance.BasicReturn += (sender, eventArgs) => + { + publishResultTaskSource?.SetResult(eventArgs.ReplyCode); + publishResultTaskSource = null; + }; + + channelInstance.BasicAcks += (sender, eventArgs) => + { + publishResultTaskSource?.SetResult(0); + publishResultTaskSource = null; + }; ConnectionEventListener?.Connected(); logger.ConnectSuccess(ConnectionParams); diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index e8c5776..a00f8d2 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -16,7 +16,7 @@ namespace Tapeti.Default public void ConnectSuccess(TapetiConnectionParams connectionParams) { - Console.WriteLine($"[Tapeti] Connected"); + Console.WriteLine("[Tapeti] Connected"); } public void HandlerException(Exception e) diff --git a/Tapeti/Default/PublishResultBinding.cs b/Tapeti/Default/PublishResultBinding.cs index 53e5118..2ffe4e1 100644 --- a/Tapeti/Default/PublishResultBinding.cs +++ b/Tapeti/Default/PublishResultBinding.cs @@ -69,9 +69,9 @@ namespace Tapeti.Default properties.CorrelationId = messageContext.Properties.CorrelationId; if (messageContext.Properties.IsReplyToPresent()) - return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties); + return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, true); - return publisher.Publish(message, properties); + return publisher.Publish(message, properties, false); } } } diff --git a/Tapeti/Exceptions/NoRouteException.cs b/Tapeti/Exceptions/NoRouteException.cs new file mode 100644 index 0000000..2dcd591 --- /dev/null +++ b/Tapeti/Exceptions/NoRouteException.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Exceptions +{ + public class NoRouteException : Exception + { + public NoRouteException(string message) : base(message) { } + } +} diff --git a/Tapeti/IPublisher.cs b/Tapeti/IPublisher.cs index f1bf689..c55f47c 100644 --- a/Tapeti/IPublisher.cs +++ b/Tapeti/IPublisher.cs @@ -13,7 +13,7 @@ namespace Tapeti public interface IInternalPublisher : IPublisher { - Task Publish(object message, IBasicProperties properties); - Task PublishDirect(object message, string queueName, IBasicProperties properties); + Task Publish(object message, IBasicProperties properties, bool mandatory); + Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory); } } diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 3807190..9a0867f 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -8,6 +8,7 @@ using Tapeti.Config; using Tapeti.Default; using Tapeti.Helpers; +// ReSharper disable UnusedMember.Global namespace Tapeti { @@ -31,6 +32,8 @@ namespace Tapeti private readonly IDependencyResolver dependencyResolver; + private bool usePublisherConfirms = true; + public TapetiConfig(IDependencyResolver dependencyResolver) { @@ -90,7 +93,16 @@ namespace Tapeti queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl))); } - var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues); + var config = new Config(queues) + { + DependencyResolver = dependencyResolver, + MessageMiddleware = messageMiddleware, + CleanupMiddleware = cleanupMiddleware, + PublishMiddleware = publishMiddleware, + + UsePublisherConfirms = usePublisherConfirms + }; + (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); return config; @@ -154,11 +166,34 @@ namespace Tapeti return this; } + + /// + /// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds, + /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may + /// result in never-ending flows. Only disable if you can accept those consequences. + /// + public TapetiConfig DisablePublisherConfirms() + { + usePublisherConfirms = false; + return this; + } + + + /// + /// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds, + /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may + /// result in never-ending flows. Only disable if you accept those consequences. + /// + public TapetiConfig SetPublisherConfirms(bool enabled) + { + usePublisherConfirms = enabled; + return this; + } + public void RegisterDefaults() { - var container = dependencyResolver as IDependencyContainer; - if (container == null) + if (!(dependencyResolver is IDependencyContainer container)) return; if (ConsoleHelper.IsAvailable()) @@ -399,21 +434,19 @@ namespace Tapeti protected class Config : IConfig { - public IDependencyResolver DependencyResolver { get; } - public IReadOnlyList MessageMiddleware { get; } - public IReadOnlyList CleanupMiddleware { get; } - public IReadOnlyList PublishMiddleware { get; } + public bool UsePublisherConfirms { get; set; } + + public IDependencyResolver DependencyResolver { get; set; } + public IReadOnlyList MessageMiddleware { get; set; } + public IReadOnlyList CleanupMiddleware { get; set; } + public IReadOnlyList PublishMiddleware { get; set; } public IEnumerable Queues { get; } private readonly Dictionary bindingMethodLookup; - public Config(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IReadOnlyList cleanupMiddleware, IReadOnlyList publishMiddleware, IEnumerable queues) + public Config(IEnumerable queues) { - DependencyResolver = dependencyResolver; - MessageMiddleware = messageMiddleware; - CleanupMiddleware = cleanupMiddleware; - PublishMiddleware = publishMiddleware; Queues = queues.ToList(); bindingMethodLookup = Queues.SelectMany(q => q.Bindings).ToDictionary(b => b.Method, b => b); diff --git a/Test/MyLogger.cs b/Test/MyLogger.cs deleted file mode 100644 index 81a60d8..0000000 --- a/Test/MyLogger.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System; -using Tapeti; - -namespace Test -{ - public class MyLogger : ILogger - { - public void Connect(TapetiConnectionParams connectionParams) - { - } - - public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception) - { - } - - public void ConnectSuccess(TapetiConnectionParams connectionParams) - { - } - - public void HandlerException(Exception e) - { - Console.WriteLine("Mylogger: " + e.Message); - } - } -} diff --git a/Test/Program.cs b/Test/Program.cs index be6eed9..62377aa 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -5,7 +5,6 @@ using Tapeti.DataAnnotations; using Tapeti.Flow; using Tapeti.SimpleInjector; using System.Threading; -using Tapeti.Flow.SQL; namespace Test { @@ -14,58 +13,60 @@ namespace Test private static void Main() { // TODO logging - - var container = new Container(); - container.Register(); - container.Register(); - container.Register(); - - var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) - //.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true") - .WithFlow() - .WithDataAnnotations() - .RegisterAllControllers() - .Build(); - - using (var connection = new TapetiConnection(config) + try { - Params = new TapetiAppSettingsConnectionParams() - }) + var container = new Container(); + container.Register(); + container.Register(); + container.Register(); + + var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) + //.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true") + .WithFlow() + .WithDataAnnotations() + .RegisterAllControllers() + //.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails + .Build(); + + using (var connection = new TapetiConnection(config) + { + Params = new TapetiAppSettingsConnectionParams() + }) + { + var flowStore = container.GetInstance(); + var flowStore2 = container.GetInstance(); + + Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2)); + + connection.Connected += (sender, e) => { Console.WriteLine("Event Connected"); }; + connection.Disconnected += (sender, e) => { Console.WriteLine("Event Disconnected"); }; + connection.Reconnected += (sender, e) => { Console.WriteLine("Event Reconnected"); }; + + Console.WriteLine("Subscribing..."); + var subscriber = connection.Subscribe(false).Result; + + Console.WriteLine("Consuming..."); + subscriber.Resume().Wait(); + + Console.WriteLine("Done!"); + + connection.GetPublisher().Publish(new FlowEndController.PingMessage()); + + //container.GetInstance().Start(c => c.StartFlow, true).Wait(); + container.GetInstance().Start(c => c.TestParallelRequest).Wait(); + + Thread.Sleep(1000); + + var emitter = container.GetInstance(); + emitter.Run().Wait(); + + + } + } + catch (Exception e) { - var flowStore = container.GetInstance(); - var flowStore2 = container.GetInstance(); - - Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2)); - - connection.Connected += (sender, e) => { - Console.WriteLine("Event Connected"); - }; - connection.Disconnected += (sender, e) => { - Console.WriteLine("Event Disconnected"); - }; - connection.Reconnected += (sender, e) => { - Console.WriteLine("Event Reconnected"); - }; - - Console.WriteLine("Subscribing..."); - var subscriber = connection.Subscribe(false).Result; - - Console.WriteLine("Consuming..."); - subscriber.Resume().Wait(); - - Console.WriteLine("Done!"); - - connection.GetPublisher().Publish(new FlowEndController.PingMessage()); - - //container.GetInstance().Start(c => c.StartFlow, true); - container.GetInstance().Start(c => c.TestParallelRequest); - - Thread.Sleep(1000); - - var emitter = container.GetInstance(); - emitter.Run().Wait(); - - + Console.WriteLine(e.ToString()); + Console.ReadKey(); } } } From d37e593b7837f884f5d07ef63cee68c1f284349b Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 25 Jan 2019 14:52:09 +0100 Subject: [PATCH 3/6] Fixed #5: Dynamic queues not reinitialized after reconnect - Tasks in the TapetiWorker are no longer async, as that splits it in multiple tasks (and only because of a delay in GetChannel, there's no real point to the async code otherwise) which causes issues with publisher confirms --- Tapeti/Connection/TapetiSubscriber.cs | 6 +++ Tapeti/Connection/TapetiWorker.cs | 62 +++++++++++++-------------- Tapeti/TapetiConnection.cs | 25 +++++++---- 3 files changed, 52 insertions(+), 41 deletions(-) diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 0202e1b..ce309b2 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -26,6 +26,12 @@ namespace Tapeti.Connection } + public Task RebindQueues() + { + return BindQueues(); + } + + public Task Resume() { if (consuming) diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index 6771b38..79db3ed 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -62,18 +62,18 @@ namespace Tapeti.Connection if (string.IsNullOrEmpty(queueName)) throw new ArgumentNullException(nameof(queueName)); - return taskQueue.Value.Add(async () => + return taskQueue.Value.Add(() => { - (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)); - }).Unwrap(); + GetChannel().BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)); + }); } public Task Subscribe(IQueue queue) { - return taskQueue.Value.Add(async () => + return taskQueue.Value.Add(() => { - var channel = await GetChannel(); + var channel = GetChannel(); if (queue.Dynamic) { @@ -101,30 +101,30 @@ namespace Tapeti.Connection (binding as IBuildBinding)?.SetQueueName(queue.Name); } } - }).Unwrap(); + }); } public Task Respond(ulong deliveryTag, ConsumeResponse response) { - return taskQueue.Value.Add(async () => + return taskQueue.Value.Add(() => { switch (response) { case ConsumeResponse.Ack: - (await GetChannel()).BasicAck(deliveryTag, false); + GetChannel().BasicAck(deliveryTag, false); break; case ConsumeResponse.Nack: - (await GetChannel()).BasicNack(deliveryTag, false, false); + GetChannel().BasicNack(deliveryTag, false, false); break; case ConsumeResponse.Requeue: - (await GetChannel()).BasicNack(deliveryTag, false, true); + GetChannel().BasicNack(deliveryTag, false, true); break; } - }).Unwrap(); + }); } @@ -175,7 +175,7 @@ namespace Tapeti.Connection return MiddlewareHelper.GoAsync( config.PublishMiddleware, async (handler, next) => await handler.Handle(context, next), - () => taskQueue.Value.Add(async () => + () => taskQueue.Value.Add(() => { var body = messageSerializer.Serialize(context.Message, context.Properties); Task publishResultTask = null; @@ -188,30 +188,25 @@ namespace Tapeti.Connection else mandatory = false; - (await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body); + GetChannel(PublishMaxConnectAttempts).BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body); - if (publishResultTask != null) - { - var timerCancellationSource = new CancellationTokenSource(); + if (publishResultTask == null) + return; - if (await Task.WhenAny(publishResultTask, Task.Delay(MandatoryReturnTimeout, timerCancellationSource.Token)) == publishResultTask) - { - timerCancellationSource.Cancel(); + if (!publishResultTask.Wait(MandatoryReturnTimeout)) + throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}"); - var replyCode = publishResultTask.Result; - // There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code - // at the time of writing... - if (replyCode == 312) - throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} does not have a route"); + var replyCode = publishResultTask.Result; - if (replyCode > 0) - throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}"); - } - else - throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}"); - } - }).Unwrap()); + // There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code + // at the time of writing... + if (replyCode == 312) + throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} does not have a route"); + + if (replyCode > 0) + throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}"); + })); // ReSharper restore ImplicitlyCapturedClosure } @@ -219,7 +214,7 @@ namespace Tapeti.Connection /// Only call this from a task in the taskQueue to ensure IModel is only used /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. /// - private async Task GetChannel(int? maxAttempts = null) + private IModel GetChannel(int? maxAttempts = null) { if (channelInstance != null) return channelInstance; @@ -233,6 +228,7 @@ namespace Tapeti.Connection UserName = ConnectionParams.Username, Password = ConnectionParams.Password, AutomaticRecoveryEnabled = true, // The created connection is an IRecoverable + TopologyRecoveryEnabled = false, // We'll manually redeclare all queues in the Reconnect event to update the internal state for dynamic queues RequestedHeartbeat = 30 }; @@ -277,7 +273,7 @@ namespace Tapeti.Connection if (maxAttempts.HasValue && attempts > maxAttempts.Value) throw; - await Task.Delay(ReconnectDelay); + Thread.Sleep(ReconnectDelay); } } diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index 0578a2e..238d320 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -14,7 +14,7 @@ namespace Tapeti public TapetiConnectionParams Params { get; set; } private readonly Lazy worker; - + private TapetiSubscriber subscriber; public TapetiConnection(IConfig config) { @@ -36,8 +36,11 @@ namespace Tapeti public async Task Subscribe(bool startConsuming = true) { - var subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList()); - await subscriber.BindQueues(); + if (subscriber == null) + { + subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList()); + await subscriber.BindQueues(); + } if (startConsuming) await subscriber.Resume(); @@ -46,9 +49,9 @@ namespace Tapeti } - public ISubscriber SubscribeSync() + public ISubscriber SubscribeSync(bool startConsuming = true) { - return Subscribe().Result; + return Subscribe(startConsuming).Result; } @@ -97,17 +100,23 @@ namespace Tapeti protected virtual void OnConnected(EventArgs e) { - Connected?.Invoke(this, e); + Task.Run(() => Connected?.Invoke(this, e)); } protected virtual void OnReconnected(EventArgs e) { - Reconnected?.Invoke(this, e); + Task.Run(() => + { + subscriber?.RebindQueues().ContinueWith((t) => + { + Reconnected?.Invoke(this, e); + }); + }); } protected virtual void OnDisconnected(EventArgs e) { - Disconnected?.Invoke(this, e); + Task.Run(() => Disconnected?.Invoke(this, e)); } } } From 45c090d00d56a520bc5efd414c5a7aebef4f6022 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 28 Jan 2019 11:30:24 +0100 Subject: [PATCH 4/6] Fixed #8: Forwards compatibility of enums --- Tapeti/Default/FallbackStringEnumConverter.cs | 90 +++++++++++++++++++ Tapeti/Default/JsonMessageSerializer.cs | 5 +- Test/MarcoController.cs | 25 +++++- 3 files changed, 114 insertions(+), 6 deletions(-) create mode 100644 Tapeti/Default/FallbackStringEnumConverter.cs diff --git a/Tapeti/Default/FallbackStringEnumConverter.cs b/Tapeti/Default/FallbackStringEnumConverter.cs new file mode 100644 index 0000000..d4098c3 --- /dev/null +++ b/Tapeti/Default/FallbackStringEnumConverter.cs @@ -0,0 +1,90 @@ +using System; +using System.Diagnostics; +using Newtonsoft.Json; + +namespace Tapeti.Default +{ + /// + /// Converts an to and from its name string value. If an unknown string value is encountered + /// it will translate to 0xDEADBEEF (-559038737) so it can be gracefully handled. + /// If you copy this value as-is to another message and try to send it, this converter will throw an exception. + /// + /// This converter is far simpler than the default StringEnumConverter, it assumes both sides use the same + /// enum and therefore skips the naming strategy. + /// + public class FallbackStringEnumConverter : JsonConverter + { + private readonly int invalidEnumValue; + + + public FallbackStringEnumConverter() + { + unchecked { invalidEnumValue = (int)0xDEADBEEF; } + } + + + public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) + { + if (value == null) + { + writer.WriteNull(); + return; + } + + if ((int) value == invalidEnumValue) + throw new ArgumentException("Enum value was an unknown string value in an incoming message and can not be published in an outgoing message as-is"); + + var outputValue = Enum.GetName(value.GetType(), value); + writer.WriteValue(outputValue); + } + + + public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + { + var isNullable = IsNullableType(objectType); + + if (reader.TokenType == JsonToken.Null) + { + if (!isNullable) + throw new JsonSerializationException($"Cannot convert null value to {objectType}"); + + return null; + } + + var actualType = isNullable ? Nullable.GetUnderlyingType(objectType) : objectType; + Debug.Assert(actualType != null, nameof(actualType) + " != null"); + + if (reader.TokenType != JsonToken.String) + throw new JsonSerializationException($"Unexpected token {reader.TokenType} when parsing enum"); + + var enumText = reader.Value.ToString(); + if (enumText == string.Empty && isNullable) + return null; + + try + { + return Enum.Parse(actualType, enumText); + } + catch (ArgumentException) + { + return Enum.ToObject(actualType, invalidEnumValue); + } + } + + + public override bool CanConvert(Type objectType) + { + var actualType = IsNullableType(objectType) ? Nullable.GetUnderlyingType(objectType) : objectType; + return actualType?.IsEnum ?? false; + } + + + private static bool IsNullableType(Type t) + { + if (t == null) + throw new ArgumentNullException(nameof(t)); + + return t.IsGenericType && t.GetGenericTypeDefinition() == typeof(Nullable<>); + } + } +} diff --git a/Tapeti/Default/JsonMessageSerializer.cs b/Tapeti/Default/JsonMessageSerializer.cs index 2aee24f..9cee002 100644 --- a/Tapeti/Default/JsonMessageSerializer.cs +++ b/Tapeti/Default/JsonMessageSerializer.cs @@ -3,7 +3,6 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Text; using Newtonsoft.Json; -using Newtonsoft.Json.Converters; using RabbitMQ.Client; namespace Tapeti.Default @@ -25,7 +24,7 @@ namespace Tapeti.Default NullValueHandling = NullValueHandling.Ignore }; - serializerSettings.Converters.Add(new StringEnumConverter()); + serializerSettings.Converters.Add(new FallbackStringEnumConverter()); } @@ -52,7 +51,7 @@ namespace Tapeti.Default throw new ArgumentException($"{ClassTypeHeader} header not present"); var messageType = deserializedTypeNames.GetOrAdd(Encoding.UTF8.GetString((byte[])typeName), DeserializeTypeName); - return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body), messageType); + return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body), messageType, serializerSettings); } diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index 4ced01e..3368f59 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -80,12 +80,16 @@ namespace Test return flowProvider.YieldWithParallelRequest() .AddRequestSync(new PoloConfirmationRequestMessage { - StoredInState = StateTestGuid + StoredInState = StateTestGuid, + EnumValue = TestEnum.Value1, + }, HandlePoloConfirmationResponse1) .AddRequestSync(new PoloConfirmationRequestMessage { - StoredInState = StateTestGuid + StoredInState = StateTestGuid, + EnumValue = TestEnum.Value2, + OptionalEnumValue = TestEnum.Value1 }, HandlePoloConfirmationResponse2) .YieldSync(ContinuePoloConfirmation); @@ -127,7 +131,9 @@ namespace Test return new PoloConfirmationResponseMessage { - ShouldMatchState = message.StoredInState + ShouldMatchState = message.StoredInState, + EnumValue = message.EnumValue, + OptionalEnumValue = message.OptionalEnumValue }; } @@ -141,6 +147,13 @@ namespace Test } + public enum TestEnum + { + Value1, + Value2 + } + + [Request(Response = typeof(PoloMessage))] public class MarcoMessage { @@ -157,6 +170,9 @@ namespace Test { [Required] public Guid StoredInState { get; set; } + + public TestEnum EnumValue; + public TestEnum? OptionalEnumValue; } @@ -164,5 +180,8 @@ namespace Test { [Required] public Guid ShouldMatchState { get; set; } + + public TestEnum EnumValue; + public TestEnum? OptionalEnumValue; } } From 5b3be481e1d9c3a704e42c47e5cf8bff56b95ce0 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 13 Feb 2019 12:00:34 +0100 Subject: [PATCH 5/6] Fixed #6: Use 'mandatory' on replies (review) Fixed #13: Exception for dynamic queues after reconnect --- Tapeti/Config/IConfig.cs | 1 + Tapeti/Connection/IConnectionEventListener.cs | 9 +- Tapeti/Connection/TapetiWorker.cs | 289 ++++++++++++++---- Tapeti/Exceptions/NackException.cs | 9 + Tapeti/TapetiConfig.cs | 10 + Tapeti/TapetiConnection.cs | 13 +- Tapeti/Tasks/SingleThreadTaskQueue.cs | 8 +- 7 files changed, 274 insertions(+), 65 deletions(-) create mode 100644 Tapeti/Exceptions/NackException.cs diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs index 07494cb..6f7449d 100644 --- a/Tapeti/Config/IConfig.cs +++ b/Tapeti/Config/IConfig.cs @@ -30,6 +30,7 @@ namespace Tapeti.Config public interface IDynamicQueue : IQueue { + string GetDeclareQueueName(); void SetName(string name); } diff --git a/Tapeti/Connection/IConnectionEventListener.cs b/Tapeti/Connection/IConnectionEventListener.cs index c0e82df..d86feab 100644 --- a/Tapeti/Connection/IConnectionEventListener.cs +++ b/Tapeti/Connection/IConnectionEventListener.cs @@ -1,9 +1,16 @@ namespace Tapeti.Connection { + public class DisconnectedEventArgs + { + public ushort ReplyCode; + public string ReplyText; + } + + public interface IConnectionEventListener { void Connected(); void Reconnected(); - void Disconnected(); + void Disconnected(DisconnectedEventArgs e); } } diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index 79db3ed..f9d577a 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -1,8 +1,10 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; using Tapeti.Config; @@ -16,7 +18,7 @@ namespace Tapeti.Connection { private const int ReconnectDelay = 5000; private const int MandatoryReturnTimeout = 30000; - private const int PublishMaxConnectAttempts = 3; + private const int MinimumConnectedReconnectDelay = 1000; private readonly IConfig config; private readonly ILogger logger; @@ -28,10 +30,30 @@ namespace Tapeti.Connection private readonly IExchangeStrategy exchangeStrategy; private readonly Lazy taskQueue = new Lazy(); + // These fields are for use in the taskQueue only! private RabbitMQ.Client.IConnection connection; + private bool isReconnect; private IModel channelInstance; - private TaskCompletionSource publishResultTaskSource; + private ulong lastDeliveryTag; + private DateTime connectedDateTime; + private readonly Dictionary confirmMessages = new Dictionary(); + private readonly Dictionary returnRoutingKeys = new Dictionary(); + + + private class ConfirmMessageInfo + { + public string ReturnKey; + public TaskCompletionSource CompletionSource; + } + + + private class ReturnInfo + { + public uint RefCount; + public int FirstReplyCode; + } + public TapetiWorker(IConfig config) @@ -64,7 +86,7 @@ namespace Tapeti.Connection return taskQueue.Value.Add(() => { - GetChannel().BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)); + WithRetryableChannel(channel => channel.BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware))); }); } @@ -73,34 +95,38 @@ namespace Tapeti.Connection { return taskQueue.Value.Add(() => { - var channel = GetChannel(); - - if (queue.Dynamic) + WithRetryableChannel(channel => { - var dynamicQueue = channel.QueueDeclare(queue.Name); - (queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName); - - foreach (var binding in queue.Bindings) + if (queue.Dynamic) { - if (binding.QueueBindingMode == QueueBindingMode.RoutingKey) + if (!(queue is IDynamicQueue dynamicQueue)) + throw new NullReferenceException("Queue with Dynamic = true must implement IDynamicQueue"); + + var declaredQueue = channel.QueueDeclare(dynamicQueue.GetDeclareQueueName()); + dynamicQueue.SetName(declaredQueue.QueueName); + + foreach (var binding in queue.Bindings) { - var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); - var exchange = exchangeStrategy.GetExchange(binding.MessageClass); + if (binding.QueueBindingMode == QueueBindingMode.RoutingKey) + { + var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); + var exchange = exchangeStrategy.GetExchange(binding.MessageClass); - channel.QueueBind(dynamicQueue.QueueName, exchange, routingKey); + channel.QueueBind(declaredQueue.QueueName, exchange, routingKey); + } + + (binding as IBuildBinding)?.SetQueueName(declaredQueue.QueueName); } - - (binding as IBuildBinding)?.SetQueueName(dynamicQueue.QueueName); } - } - else - { - channel.QueueDeclarePassive(queue.Name); - foreach (var binding in queue.Bindings) + else { - (binding as IBuildBinding)?.SetQueueName(queue.Name); + channel.QueueDeclarePassive(queue.Name); + foreach (var binding in queue.Bindings) + { + (binding as IBuildBinding)?.SetQueueName(queue.Name); + } } - } + }); }); } @@ -109,6 +135,8 @@ namespace Tapeti.Connection { return taskQueue.Value.Add(() => { + // No need for a retryable channel here, if the connection is lost we can't + // use the deliveryTag anymore. switch (response) { case ConsumeResponse.Ack: @@ -122,6 +150,9 @@ namespace Tapeti.Connection case ConsumeResponse.Requeue: GetChannel().BasicNack(deliveryTag, false, true); break; + + default: + throw new ArgumentOutOfRangeException(nameof(response), response, null); } }); @@ -175,27 +206,48 @@ namespace Tapeti.Connection return MiddlewareHelper.GoAsync( config.PublishMiddleware, async (handler, next) => await handler.Handle(context, next), - () => taskQueue.Value.Add(() => + () => taskQueue.Value.Add(async () => { var body = messageSerializer.Serialize(context.Message, context.Properties); + Task publishResultTask = null; - - if (config.UsePublisherConfirms) + var messageInfo = new ConfirmMessageInfo { - publishResultTaskSource = new TaskCompletionSource(); - publishResultTask = publishResultTaskSource.Task; - } - else - mandatory = false; + ReturnKey = GetReturnKey(context.Exchange, context.RoutingKey), + CompletionSource = new TaskCompletionSource() + }; + + + WithRetryableChannel(channel => + { + // The delivery tag is lost after a reconnect, register under the new tag + if (config.UsePublisherConfirms) + { + lastDeliveryTag++; + + confirmMessages.Add(lastDeliveryTag, messageInfo); + publishResultTask = messageInfo.CompletionSource.Task; + } + else + mandatory = false; + + channel.BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body); + }); - GetChannel(PublishMaxConnectAttempts).BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body); if (publishResultTask == null) return; - if (!publishResultTask.Wait(MandatoryReturnTimeout)) + var delayCancellationTokenSource = new CancellationTokenSource(); + var signalledTask = await Task.WhenAny(publishResultTask, Task.Delay(MandatoryReturnTimeout, delayCancellationTokenSource.Token)); + + if (signalledTask != publishResultTask) throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}"); + delayCancellationTokenSource.Cancel(); + + if (publishResultTask.IsCanceled) + throw new NackException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} was nacked"); var replyCode = publishResultTask.Result; @@ -210,16 +262,43 @@ namespace Tapeti.Connection // ReSharper restore ImplicitlyCapturedClosure } + /// /// Only call this from a task in the taskQueue to ensure IModel is only used /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. /// - private IModel GetChannel(int? maxAttempts = null) + private void WithRetryableChannel(Action operation) { - if (channelInstance != null) + while (true) + { + try + { + operation(GetChannel()); + break; + } + catch (AlreadyClosedException e) + { + // TODO log? + } + } + } + + + /// + /// Only call this from a task in the taskQueue to ensure IModel is only used + /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. + /// + private IModel GetChannel() + { + if (channelInstance != null && channelInstance.IsOpen) return channelInstance; - var attempts = 0; + // If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ + // not related to the connection), wait for a bit to avoid spamming the connection + if ((DateTime.UtcNow - connectedDateTime).TotalMilliseconds <= MinimumConnectedReconnectDelay) + Thread.Sleep(ReconnectDelay); + + var connectionFactory = new ConnectionFactory { HostName = ConnectionParams.HostName, @@ -227,8 +306,8 @@ namespace Tapeti.Connection VirtualHost = ConnectionParams.VirtualHost, UserName = ConnectionParams.Username, Password = ConnectionParams.Password, - AutomaticRecoveryEnabled = true, // The created connection is an IRecoverable - TopologyRecoveryEnabled = false, // We'll manually redeclare all queues in the Reconnect event to update the internal state for dynamic queues + AutomaticRecoveryEnabled = false, + TopologyRecoveryEnabled = false, RequestedHeartbeat = 30 }; @@ -240,39 +319,50 @@ namespace Tapeti.Connection connection = connectionFactory.CreateConnection(); channelInstance = connection.CreateModel(); - channelInstance.ConfirmSelect(); + + if (channelInstance == null) + throw new BrokerUnreachableException(null); + + if (config.UsePublisherConfirms) + { + lastDeliveryTag = 0; + confirmMessages.Clear(); + channelInstance.ConfirmSelect(); + } if (ConnectionParams.PrefetchCount > 0) channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false); - ((IRecoverable)connection).Recovery += (sender, e) => ConnectionEventListener?.Reconnected(); - - channelInstance.ModelShutdown += (sender, eventArgs) => ConnectionEventListener?.Disconnected(); - channelInstance.BasicReturn += (sender, eventArgs) => + channelInstance.ModelShutdown += (sender, e) => { - publishResultTaskSource?.SetResult(eventArgs.ReplyCode); - publishResultTaskSource = null; + ConnectionEventListener?.Disconnected(new DisconnectedEventArgs + { + ReplyCode = e.ReplyCode, + ReplyText = e.ReplyText + }); + + channelInstance = null; }; - channelInstance.BasicAcks += (sender, eventArgs) => - { - publishResultTaskSource?.SetResult(0); - publishResultTaskSource = null; - }; + channelInstance.BasicReturn += HandleBasicReturn; + channelInstance.BasicAcks += HandleBasicAck; + channelInstance.BasicNacks += HandleBasicNack; + + connectedDateTime = DateTime.UtcNow; + + if (isReconnect) + ConnectionEventListener?.Reconnected(); + else + ConnectionEventListener?.Connected(); - ConnectionEventListener?.Connected(); logger.ConnectSuccess(ConnectionParams); + isReconnect = true; break; } catch (BrokerUnreachableException e) { logger.ConnectFailed(ConnectionParams, e); - - attempts++; - if (maxAttempts.HasValue && attempts > maxAttempts.Value) - throw; - Thread.Sleep(ReconnectDelay); } } @@ -281,6 +371,93 @@ namespace Tapeti.Connection } + private void HandleBasicReturn(object sender, BasicReturnEventArgs e) + { + /* + * "If the message is also published as mandatory, the basic.return is sent to the client before basic.ack." + * - https://www.rabbitmq.com/confirms.html + * + * Because there is no delivery tag included in the basic.return message. This solution is modeled after + * user OhJeez' answer on StackOverflow: + * + * "Since all messages with the same routing key are routed the same way. I assumed that once I get a + * basic.return about a specific routing key, all messages with this routing key can be considered undelivered" + * https://stackoverflow.com/questions/21336659/how-to-tell-which-amqp-message-was-not-routed-from-basic-return-response + */ + var key = GetReturnKey(e.Exchange, e.RoutingKey); + + if (!returnRoutingKeys.TryGetValue(key, out var returnInfo)) + { + returnInfo = new ReturnInfo + { + RefCount = 0, + FirstReplyCode = e.ReplyCode + }; + + returnRoutingKeys.Add(key, returnInfo); + } + + returnInfo.RefCount++; + } + + + private void HandleBasicAck(object sender, BasicAckEventArgs e) + { + foreach (var deliveryTag in GetDeliveryTags(e)) + { + if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo)) + continue; + + if (returnRoutingKeys.TryGetValue(messageInfo.ReturnKey, out var returnInfo)) + { + messageInfo.CompletionSource.SetResult(returnInfo.FirstReplyCode); + + returnInfo.RefCount--; + if (returnInfo.RefCount == 0) + returnRoutingKeys.Remove(messageInfo.ReturnKey); + } + + messageInfo.CompletionSource.SetResult(0); + confirmMessages.Remove(deliveryTag); + } + } + + + private void HandleBasicNack(object sender, BasicNackEventArgs e) + { + foreach (var deliveryTag in GetDeliveryTags(e)) + { + if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo)) + continue; + + messageInfo.CompletionSource.SetCanceled(); + confirmMessages.Remove(e.DeliveryTag); + } + } + + + private IEnumerable GetDeliveryTags(BasicAckEventArgs e) + { + return e.Multiple + ? confirmMessages.Keys.Where(tag => tag <= e.DeliveryTag).ToArray() + : new[] { e.DeliveryTag }; + } + + + private IEnumerable GetDeliveryTags(BasicNackEventArgs e) + { + return e.Multiple + ? confirmMessages.Keys.Where(tag => tag <= e.DeliveryTag).ToArray() + : new[] { e.DeliveryTag }; + } + + + private static string GetReturnKey(string exchange, string routingKey) + { + return exchange + ':' + routingKey; + } + + private class PublishContext : IPublishContext { public IDependencyResolver DependencyResolver { get; set; } diff --git a/Tapeti/Exceptions/NackException.cs b/Tapeti/Exceptions/NackException.cs new file mode 100644 index 0000000..408dd71 --- /dev/null +++ b/Tapeti/Exceptions/NackException.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Exceptions +{ + public class NackException : Exception + { + public NackException(string message) : base(message) { } + } +} diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 9a0867f..c785408 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -462,6 +462,8 @@ namespace Tapeti protected class Queue : IDynamicQueue { + private readonly string declareQueueName; + public bool Dynamic { get; } public string Name { get; set; } public IEnumerable Bindings { get; } @@ -469,12 +471,20 @@ namespace Tapeti public Queue(QueueInfo queue, IEnumerable bindings) { + declareQueueName = queue.Name; + Dynamic = queue.Dynamic.GetValueOrDefault(); Name = queue.Name; Bindings = bindings; } + public string GetDeclareQueueName() + { + return declareQueueName; + } + + public void SetName(string name) { Name = name; diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index 238d320..d66f880 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -8,6 +8,8 @@ using Tapeti.Connection; namespace Tapeti { + public delegate void DisconnectedEventHandler(object sender, DisconnectedEventArgs e); + public class TapetiConnection : IDisposable { private readonly IConfig config; @@ -29,11 +31,10 @@ namespace Tapeti } public event EventHandler Connected; - - public event EventHandler Disconnected; - + public event DisconnectedEventHandler Disconnected; public event EventHandler Reconnected; + public async Task Subscribe(bool startConsuming = true) { if (subscriber == null) @@ -87,9 +88,9 @@ namespace Tapeti owner.OnConnected(new EventArgs()); } - public void Disconnected() + public void Disconnected(DisconnectedEventArgs e) { - owner.OnDisconnected(new EventArgs()); + owner.OnDisconnected(e); } public void Reconnected() @@ -114,7 +115,7 @@ namespace Tapeti }); } - protected virtual void OnDisconnected(EventArgs e) + protected virtual void OnDisconnected(DisconnectedEventArgs e) { Task.Run(() => Disconnected?.Invoke(this, e)); } diff --git a/Tapeti/Tasks/SingleThreadTaskQueue.cs b/Tapeti/Tasks/SingleThreadTaskQueue.cs index fa28949..f22f869 100644 --- a/Tapeti/Tasks/SingleThreadTaskQueue.cs +++ b/Tapeti/Tasks/SingleThreadTaskQueue.cs @@ -27,7 +27,7 @@ namespace Tapeti.Tasks } - public Task Add(Func func) + public Task Add(Func func) { lock (previousTaskLock) { @@ -36,7 +36,11 @@ namespace Tapeti.Tasks , singleThreadScheduler.Value); previousTask = task; - return task; + + // 'task' completes at the moment a Task is returned (for example, an await is encountered), + // this is used to chain the next. We return the unwrapped Task however, so that the caller + // awaits until the full task chain has completed. + return task.Unwrap(); } } From ed421361ac071fa8b9ee1e704c64e24f233dbcf1 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Tue, 19 Mar 2019 11:47:52 +0100 Subject: [PATCH 6/6] Fixed #15: Attribute for mandatory messages --- Tapeti.Annotations/DurableQueueAttribute.cs | 3 +++ Tapeti.Annotations/DynamicQueueAttribute.cs | 11 +++++------ Tapeti.Annotations/MandatoryAttribute.cs | 15 +++++++++++++++ Tapeti.Annotations/MessageControllerAttribute.cs | 5 +++++ Tapeti.Annotations/RequestAttribute.cs | 8 ++++++++ Tapeti/Connection/TapetiPublisher.cs | 10 +++++++++- 6 files changed, 45 insertions(+), 7 deletions(-) create mode 100644 Tapeti.Annotations/MandatoryAttribute.cs diff --git a/Tapeti.Annotations/DurableQueueAttribute.cs b/Tapeti.Annotations/DurableQueueAttribute.cs index bf4cac7..d6fe89d 100644 --- a/Tapeti.Annotations/DurableQueueAttribute.cs +++ b/Tapeti.Annotations/DurableQueueAttribute.cs @@ -2,6 +2,7 @@ namespace Tapeti.Annotations { + /// /// /// Binds to an existing durable queue to receive messages. Can be used /// on an entire MessageController class or on individual methods. @@ -17,6 +18,8 @@ namespace Tapeti.Annotations public string Name { get; set; } + /// + /// The name of the durable queue public DurableQueueAttribute(string name) { Name = name; diff --git a/Tapeti.Annotations/DynamicQueueAttribute.cs b/Tapeti.Annotations/DynamicQueueAttribute.cs index 5fe9525..f7de921 100644 --- a/Tapeti.Annotations/DynamicQueueAttribute.cs +++ b/Tapeti.Annotations/DynamicQueueAttribute.cs @@ -2,6 +2,7 @@ namespace Tapeti.Annotations { + /// /// /// Creates a non-durable auto-delete queue to receive messages. Can be used /// on an entire MessageController class or on individual methods. @@ -12,12 +13,10 @@ namespace Tapeti.Annotations public string Prefix { get; set; } - /// - /// If prefix is specified, Tapeti will compose the queue name using the - /// prefix and a unique ID. If not specified, an empty queue name will be passed - /// to RabbitMQ thus letting it create a unique queue name. - /// - /// + /// + /// An optional prefix. If specified, Tapeti will compose the queue name using the + /// prefix and a unique ID. If not specified, an empty queue name will be passed + /// to RabbitMQ thus letting it create a unique queue name. public DynamicQueueAttribute(string prefix = null) { Prefix = prefix; diff --git a/Tapeti.Annotations/MandatoryAttribute.cs b/Tapeti.Annotations/MandatoryAttribute.cs new file mode 100644 index 0000000..38fcc87 --- /dev/null +++ b/Tapeti.Annotations/MandatoryAttribute.cs @@ -0,0 +1,15 @@ +using System; + +namespace Tapeti.Annotations +{ + /// + /// + /// Can be attached to a message class to specify that delivery to a queue must be guaranteed. + /// Publish will fail if no queues bind to the routing key. Publisher confirms must be enabled + /// on the connection for this to work (enabled by default). + /// + [AttributeUsage(AttributeTargets.Class)] + public class MandatoryAttribute : Attribute + { + } +} diff --git a/Tapeti.Annotations/MessageControllerAttribute.cs b/Tapeti.Annotations/MessageControllerAttribute.cs index 1f419f2..f4a4723 100644 --- a/Tapeti.Annotations/MessageControllerAttribute.cs +++ b/Tapeti.Annotations/MessageControllerAttribute.cs @@ -2,6 +2,11 @@ namespace Tapeti.Annotations { + /// + /// + /// Attaching this attribute to a class includes it in the auto-discovery of message controllers + /// when using the RegisterAllControllers method. It is not required when manually registering a controller. + /// [AttributeUsage(AttributeTargets.Class)] public class MessageControllerAttribute : Attribute { diff --git a/Tapeti.Annotations/RequestAttribute.cs b/Tapeti.Annotations/RequestAttribute.cs index d9fc44e..2f14097 100644 --- a/Tapeti.Annotations/RequestAttribute.cs +++ b/Tapeti.Annotations/RequestAttribute.cs @@ -2,6 +2,14 @@ namespace Tapeti.Annotations { + /// + /// + /// Can be attached to a message class to specify that the receiver of the message must + /// provide a response message of the type specified in the Response attribute. This response + /// must be sent by either returning it from the message handler method or using + /// YieldWithResponse when using Tapeti Flow. These methods will respond directly + /// to the queue specified in the reply-to header automatically added by Tapeti. + /// [AttributeUsage(AttributeTargets.Class)] public class RequestAttribute : Attribute { diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs index d3afeb6..8887b85 100644 --- a/Tapeti/Connection/TapetiPublisher.cs +++ b/Tapeti/Connection/TapetiPublisher.cs @@ -1,6 +1,8 @@ using System; +using System.Reflection; using System.Threading.Tasks; using RabbitMQ.Client; +using Tapeti.Annotations; namespace Tapeti.Connection { @@ -17,7 +19,7 @@ namespace Tapeti.Connection public Task Publish(object message) { - return workerFactory().Publish(message, null, false); + return workerFactory().Publish(message, null, IsMandatory(message)); } @@ -31,5 +33,11 @@ namespace Tapeti.Connection { return workerFactory().PublishDirect(message, queueName, properties, mandatory); } + + + private static bool IsMandatory(object message) + { + return message.GetType().GetCustomAttribute() != null; + } } }