diff --git a/Tapeti.Annotations/DurableQueueAttribute.cs b/Tapeti.Annotations/DurableQueueAttribute.cs
index bf4cac7..d6fe89d 100644
--- a/Tapeti.Annotations/DurableQueueAttribute.cs
+++ b/Tapeti.Annotations/DurableQueueAttribute.cs
@@ -2,6 +2,7 @@
namespace Tapeti.Annotations
{
+ ///
///
/// Binds to an existing durable queue to receive messages. Can be used
/// on an entire MessageController class or on individual methods.
@@ -17,6 +18,8 @@ namespace Tapeti.Annotations
public string Name { get; set; }
+ ///
+ /// The name of the durable queue
public DurableQueueAttribute(string name)
{
Name = name;
diff --git a/Tapeti.Annotations/DynamicQueueAttribute.cs b/Tapeti.Annotations/DynamicQueueAttribute.cs
index 5fe9525..f7de921 100644
--- a/Tapeti.Annotations/DynamicQueueAttribute.cs
+++ b/Tapeti.Annotations/DynamicQueueAttribute.cs
@@ -2,6 +2,7 @@
namespace Tapeti.Annotations
{
+ ///
///
/// Creates a non-durable auto-delete queue to receive messages. Can be used
/// on an entire MessageController class or on individual methods.
@@ -12,12 +13,10 @@ namespace Tapeti.Annotations
public string Prefix { get; set; }
- ///
- /// If prefix is specified, Tapeti will compose the queue name using the
- /// prefix and a unique ID. If not specified, an empty queue name will be passed
- /// to RabbitMQ thus letting it create a unique queue name.
- ///
- ///
+ ///
+ /// An optional prefix. If specified, Tapeti will compose the queue name using the
+ /// prefix and a unique ID. If not specified, an empty queue name will be passed
+ /// to RabbitMQ thus letting it create a unique queue name.
public DynamicQueueAttribute(string prefix = null)
{
Prefix = prefix;
diff --git a/Tapeti.Annotations/MandatoryAttribute.cs b/Tapeti.Annotations/MandatoryAttribute.cs
new file mode 100644
index 0000000..38fcc87
--- /dev/null
+++ b/Tapeti.Annotations/MandatoryAttribute.cs
@@ -0,0 +1,15 @@
+using System;
+
+namespace Tapeti.Annotations
+{
+ ///
+ ///
+ /// Can be attached to a message class to specify that delivery to a queue must be guaranteed.
+ /// Publish will fail if no queues bind to the routing key. Publisher confirms must be enabled
+ /// on the connection for this to work (enabled by default).
+ ///
+ [AttributeUsage(AttributeTargets.Class)]
+ public class MandatoryAttribute : Attribute
+ {
+ }
+}
diff --git a/Tapeti.Annotations/MessageControllerAttribute.cs b/Tapeti.Annotations/MessageControllerAttribute.cs
index 1f419f2..f4a4723 100644
--- a/Tapeti.Annotations/MessageControllerAttribute.cs
+++ b/Tapeti.Annotations/MessageControllerAttribute.cs
@@ -2,6 +2,11 @@
namespace Tapeti.Annotations
{
+ ///
+ ///
+ /// Attaching this attribute to a class includes it in the auto-discovery of message controllers
+ /// when using the RegisterAllControllers method. It is not required when manually registering a controller.
+ ///
[AttributeUsage(AttributeTargets.Class)]
public class MessageControllerAttribute : Attribute
{
diff --git a/Tapeti.Annotations/RequestAttribute.cs b/Tapeti.Annotations/RequestAttribute.cs
index d9fc44e..2f14097 100644
--- a/Tapeti.Annotations/RequestAttribute.cs
+++ b/Tapeti.Annotations/RequestAttribute.cs
@@ -2,6 +2,14 @@
namespace Tapeti.Annotations
{
+ ///
+ ///
+ /// Can be attached to a message class to specify that the receiver of the message must
+ /// provide a response message of the type specified in the Response attribute. This response
+ /// must be sent by either returning it from the message handler method or using
+ /// YieldWithResponse when using Tapeti Flow. These methods will respond directly
+ /// to the queue specified in the reply-to header automatically added by Tapeti.
+ ///
[AttributeUsage(AttributeTargets.Class)]
public class RequestAttribute : Attribute
{
diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs
index 5a6a49a..289b246 100644
--- a/Tapeti.Flow/Default/FlowProvider.cs
+++ b/Tapeti.Flow/Default/FlowProvider.cs
@@ -80,7 +80,7 @@ namespace Tapeti.Flow.Default
await context.Store();
- await publisher.Publish(message, properties);
+ await publisher.Publish(message, properties, true);
}
@@ -105,9 +105,9 @@ namespace Tapeti.Flow.Default
// TODO disallow if replyto is not specified?
if (reply.ReplyTo != null)
- await publisher.PublishDirect(message, reply.ReplyTo, properties);
+ await publisher.PublishDirect(message, reply.ReplyTo, properties, true);
else
- await publisher.Publish(message, properties);
+ await publisher.Publish(message, properties, true);
await context.Delete();
}
diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs
index 65605bb..5968966 100644
--- a/Tapeti.Serilog/TapetiSeriLogger.cs
+++ b/Tapeti.Serilog/TapetiSeriLogger.cs
@@ -1,6 +1,8 @@
using System;
using ISeriLogger = Serilog.ILogger;
+// ReSharper disable UnusedMember.Global
+
namespace Tapeti.Serilog
{
public class TapetiSeriLogger: ILogger
diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs
index 46bf575..6f7449d 100644
--- a/Tapeti/Config/IConfig.cs
+++ b/Tapeti/Config/IConfig.cs
@@ -7,6 +7,8 @@ namespace Tapeti.Config
{
public interface IConfig
{
+ bool UsePublisherConfirms { get; }
+
IDependencyResolver DependencyResolver { get; }
IReadOnlyList MessageMiddleware { get; }
IReadOnlyList CleanupMiddleware { get; }
@@ -28,6 +30,7 @@ namespace Tapeti.Config
public interface IDynamicQueue : IQueue
{
+ string GetDeclareQueueName();
void SetName(string name);
}
diff --git a/Tapeti/Connection/IConnectionEventListener.cs b/Tapeti/Connection/IConnectionEventListener.cs
index c0e82df..d86feab 100644
--- a/Tapeti/Connection/IConnectionEventListener.cs
+++ b/Tapeti/Connection/IConnectionEventListener.cs
@@ -1,9 +1,16 @@
namespace Tapeti.Connection
{
+ public class DisconnectedEventArgs
+ {
+ public ushort ReplyCode;
+ public string ReplyText;
+ }
+
+
public interface IConnectionEventListener
{
void Connected();
void Reconnected();
- void Disconnected();
+ void Disconnected(DisconnectedEventArgs e);
}
}
diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs
index 8f8c742..8887b85 100644
--- a/Tapeti/Connection/TapetiPublisher.cs
+++ b/Tapeti/Connection/TapetiPublisher.cs
@@ -1,6 +1,8 @@
using System;
+using System.Reflection;
using System.Threading.Tasks;
using RabbitMQ.Client;
+using Tapeti.Annotations;
namespace Tapeti.Connection
{
@@ -17,19 +19,25 @@ namespace Tapeti.Connection
public Task Publish(object message)
{
- return workerFactory().Publish(message, null);
+ return workerFactory().Publish(message, null, IsMandatory(message));
}
- public Task Publish(object message, IBasicProperties properties)
+ public Task Publish(object message, IBasicProperties properties, bool mandatory)
{
- return workerFactory().Publish(message, properties);
+ return workerFactory().Publish(message, properties, mandatory);
}
- public Task PublishDirect(object message, string queueName, IBasicProperties properties)
+ public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory)
{
- return workerFactory().PublishDirect(message, queueName, properties);
+ return workerFactory().PublishDirect(message, queueName, properties, mandatory);
+ }
+
+
+ private static bool IsMandatory(object message)
+ {
+ return message.GetType().GetCustomAttribute() != null;
}
}
}
diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs
index 0202e1b..ce309b2 100644
--- a/Tapeti/Connection/TapetiSubscriber.cs
+++ b/Tapeti/Connection/TapetiSubscriber.cs
@@ -26,6 +26,12 @@ namespace Tapeti.Connection
}
+ public Task RebindQueues()
+ {
+ return BindQueues();
+ }
+
+
public Task Resume()
{
if (consuming)
diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs
index d5781cd..f9d577a 100644
--- a/Tapeti/Connection/TapetiWorker.cs
+++ b/Tapeti/Connection/TapetiWorker.cs
@@ -1,10 +1,14 @@
using System;
using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing;
using Tapeti.Config;
+using Tapeti.Exceptions;
using Tapeti.Helpers;
using Tapeti.Tasks;
@@ -13,7 +17,8 @@ namespace Tapeti.Connection
public class TapetiWorker
{
private const int ReconnectDelay = 5000;
- private const int PublishMaxConnectAttempts = 3;
+ private const int MandatoryReturnTimeout = 30000;
+ private const int MinimumConnectedReconnectDelay = 1000;
private readonly IConfig config;
private readonly ILogger logger;
@@ -24,8 +29,31 @@ namespace Tapeti.Connection
private readonly IRoutingKeyStrategy routingKeyStrategy;
private readonly IExchangeStrategy exchangeStrategy;
private readonly Lazy taskQueue = new Lazy();
+
+
+ // These fields are for use in the taskQueue only!
private RabbitMQ.Client.IConnection connection;
+ private bool isReconnect;
private IModel channelInstance;
+ private ulong lastDeliveryTag;
+ private DateTime connectedDateTime;
+ private readonly Dictionary confirmMessages = new Dictionary();
+ private readonly Dictionary returnRoutingKeys = new Dictionary();
+
+
+ private class ConfirmMessageInfo
+ {
+ public string ReturnKey;
+ public TaskCompletionSource CompletionSource;
+ }
+
+
+ private class ReturnInfo
+ {
+ public uint RefCount;
+ public int FirstReplyCode;
+ }
+
public TapetiWorker(IConfig config)
@@ -39,15 +67,15 @@ namespace Tapeti.Connection
}
- public Task Publish(object message, IBasicProperties properties)
+ public Task Publish(object message, IBasicProperties properties, bool mandatory)
{
- return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType()));
+ return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType()), mandatory);
}
- public Task PublishDirect(object message, string queueName, IBasicProperties properties)
+ public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory)
{
- return Publish(message, properties, "", queueName);
+ return Publish(message, properties, "", queueName, mandatory);
}
@@ -56,69 +84,78 @@ namespace Tapeti.Connection
if (string.IsNullOrEmpty(queueName))
throw new ArgumentNullException(nameof(queueName));
- return taskQueue.Value.Add(async () =>
+ return taskQueue.Value.Add(() =>
{
- (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware));
- }).Unwrap();
+ WithRetryableChannel(channel => channel.BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)));
+ });
}
public Task Subscribe(IQueue queue)
{
- return taskQueue.Value.Add(async () =>
+ return taskQueue.Value.Add(() =>
{
- var channel = await GetChannel();
-
- if (queue.Dynamic)
+ WithRetryableChannel(channel =>
{
- var dynamicQueue = channel.QueueDeclare(queue.Name);
- (queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName);
-
- foreach (var binding in queue.Bindings)
+ if (queue.Dynamic)
{
- if (binding.QueueBindingMode == QueueBindingMode.RoutingKey)
+ if (!(queue is IDynamicQueue dynamicQueue))
+ throw new NullReferenceException("Queue with Dynamic = true must implement IDynamicQueue");
+
+ var declaredQueue = channel.QueueDeclare(dynamicQueue.GetDeclareQueueName());
+ dynamicQueue.SetName(declaredQueue.QueueName);
+
+ foreach (var binding in queue.Bindings)
{
- var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass);
- var exchange = exchangeStrategy.GetExchange(binding.MessageClass);
+ if (binding.QueueBindingMode == QueueBindingMode.RoutingKey)
+ {
+ var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass);
+ var exchange = exchangeStrategy.GetExchange(binding.MessageClass);
- channel.QueueBind(dynamicQueue.QueueName, exchange, routingKey);
+ channel.QueueBind(declaredQueue.QueueName, exchange, routingKey);
+ }
+
+ (binding as IBuildBinding)?.SetQueueName(declaredQueue.QueueName);
}
-
- (binding as IBuildBinding)?.SetQueueName(dynamicQueue.QueueName);
}
- }
- else
- {
- channel.QueueDeclarePassive(queue.Name);
- foreach (var binding in queue.Bindings)
+ else
{
- (binding as IBuildBinding)?.SetQueueName(queue.Name);
+ channel.QueueDeclarePassive(queue.Name);
+ foreach (var binding in queue.Bindings)
+ {
+ (binding as IBuildBinding)?.SetQueueName(queue.Name);
+ }
}
- }
- }).Unwrap();
+ });
+ });
}
public Task Respond(ulong deliveryTag, ConsumeResponse response)
{
- return taskQueue.Value.Add(async () =>
+ return taskQueue.Value.Add(() =>
{
+ // No need for a retryable channel here, if the connection is lost we can't
+ // use the deliveryTag anymore.
switch (response)
{
case ConsumeResponse.Ack:
- (await GetChannel()).BasicAck(deliveryTag, false);
+ GetChannel().BasicAck(deliveryTag, false);
break;
case ConsumeResponse.Nack:
- (await GetChannel()).BasicNack(deliveryTag, false, false);
+ GetChannel().BasicNack(deliveryTag, false, false);
break;
case ConsumeResponse.Requeue:
- (await GetChannel()).BasicNack(deliveryTag, false, true);
+ GetChannel().BasicNack(deliveryTag, false, true);
break;
+
+ default:
+ throw new ArgumentOutOfRangeException(nameof(response), response, null);
}
- }).Unwrap();
+ });
}
@@ -147,7 +184,7 @@ namespace Tapeti.Connection
}
- private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey)
+ private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey, bool mandatory)
{
var context = new PublishContext
{
@@ -172,22 +209,96 @@ namespace Tapeti.Connection
() => taskQueue.Value.Add(async () =>
{
var body = messageSerializer.Serialize(context.Message, context.Properties);
- (await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, false,
- context.Properties, body);
- }).Unwrap());
+
+ Task publishResultTask = null;
+ var messageInfo = new ConfirmMessageInfo
+ {
+ ReturnKey = GetReturnKey(context.Exchange, context.RoutingKey),
+ CompletionSource = new TaskCompletionSource()
+ };
+
+
+ WithRetryableChannel(channel =>
+ {
+ // The delivery tag is lost after a reconnect, register under the new tag
+ if (config.UsePublisherConfirms)
+ {
+ lastDeliveryTag++;
+
+ confirmMessages.Add(lastDeliveryTag, messageInfo);
+ publishResultTask = messageInfo.CompletionSource.Task;
+ }
+ else
+ mandatory = false;
+
+ channel.BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body);
+ });
+
+
+ if (publishResultTask == null)
+ return;
+
+ var delayCancellationTokenSource = new CancellationTokenSource();
+ var signalledTask = await Task.WhenAny(publishResultTask, Task.Delay(MandatoryReturnTimeout, delayCancellationTokenSource.Token));
+
+ if (signalledTask != publishResultTask)
+ throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}");
+
+ delayCancellationTokenSource.Cancel();
+
+ if (publishResultTask.IsCanceled)
+ throw new NackException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} was nacked");
+
+ var replyCode = publishResultTask.Result;
+
+ // There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code
+ // at the time of writing...
+ if (replyCode == 312)
+ throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} does not have a route");
+
+ if (replyCode > 0)
+ throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}");
+ }));
// ReSharper restore ImplicitlyCapturedClosure
}
+
///
/// Only call this from a task in the taskQueue to ensure IModel is only used
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
///
- private async Task GetChannel(int? maxAttempts = null)
+ private void WithRetryableChannel(Action operation)
{
- if (channelInstance != null)
+ while (true)
+ {
+ try
+ {
+ operation(GetChannel());
+ break;
+ }
+ catch (AlreadyClosedException e)
+ {
+ // TODO log?
+ }
+ }
+ }
+
+
+ ///
+ /// Only call this from a task in the taskQueue to ensure IModel is only used
+ /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
+ ///
+ private IModel GetChannel()
+ {
+ if (channelInstance != null && channelInstance.IsOpen)
return channelInstance;
- var attempts = 0;
+ // If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ
+ // not related to the connection), wait for a bit to avoid spamming the connection
+ if ((DateTime.UtcNow - connectedDateTime).TotalMilliseconds <= MinimumConnectedReconnectDelay)
+ Thread.Sleep(ReconnectDelay);
+
+
var connectionFactory = new ConnectionFactory
{
HostName = ConnectionParams.HostName,
@@ -195,7 +306,8 @@ namespace Tapeti.Connection
VirtualHost = ConnectionParams.VirtualHost,
UserName = ConnectionParams.Username,
Password = ConnectionParams.Password,
- AutomaticRecoveryEnabled = true, // The created connection is an IRecoverable
+ AutomaticRecoveryEnabled = false,
+ TopologyRecoveryEnabled = false,
RequestedHeartbeat = 30
};
@@ -208,27 +320,50 @@ namespace Tapeti.Connection
connection = connectionFactory.CreateConnection();
channelInstance = connection.CreateModel();
+ if (channelInstance == null)
+ throw new BrokerUnreachableException(null);
+
+ if (config.UsePublisherConfirms)
+ {
+ lastDeliveryTag = 0;
+ confirmMessages.Clear();
+ channelInstance.ConfirmSelect();
+ }
+
if (ConnectionParams.PrefetchCount > 0)
channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false);
- ((IRecoverable)connection).Recovery += (sender, e) => ConnectionEventListener?.Reconnected();
+ channelInstance.ModelShutdown += (sender, e) =>
+ {
+ ConnectionEventListener?.Disconnected(new DisconnectedEventArgs
+ {
+ ReplyCode = e.ReplyCode,
+ ReplyText = e.ReplyText
+ });
- channelInstance.ModelShutdown += (sender, e) => ConnectionEventListener?.Disconnected();
+ channelInstance = null;
+ };
+
+ channelInstance.BasicReturn += HandleBasicReturn;
+ channelInstance.BasicAcks += HandleBasicAck;
+ channelInstance.BasicNacks += HandleBasicNack;
+
+ connectedDateTime = DateTime.UtcNow;
+
+ if (isReconnect)
+ ConnectionEventListener?.Reconnected();
+ else
+ ConnectionEventListener?.Connected();
- ConnectionEventListener?.Connected();
logger.ConnectSuccess(ConnectionParams);
+ isReconnect = true;
break;
}
catch (BrokerUnreachableException e)
{
logger.ConnectFailed(ConnectionParams, e);
-
- attempts++;
- if (maxAttempts.HasValue && attempts > maxAttempts.Value)
- throw;
-
- await Task.Delay(ReconnectDelay);
+ Thread.Sleep(ReconnectDelay);
}
}
@@ -236,6 +371,93 @@ namespace Tapeti.Connection
}
+ private void HandleBasicReturn(object sender, BasicReturnEventArgs e)
+ {
+ /*
+ * "If the message is also published as mandatory, the basic.return is sent to the client before basic.ack."
+ * - https://www.rabbitmq.com/confirms.html
+ *
+ * Because there is no delivery tag included in the basic.return message. This solution is modeled after
+ * user OhJeez' answer on StackOverflow:
+ *
+ * "Since all messages with the same routing key are routed the same way. I assumed that once I get a
+ * basic.return about a specific routing key, all messages with this routing key can be considered undelivered"
+ * https://stackoverflow.com/questions/21336659/how-to-tell-which-amqp-message-was-not-routed-from-basic-return-response
+ */
+ var key = GetReturnKey(e.Exchange, e.RoutingKey);
+
+ if (!returnRoutingKeys.TryGetValue(key, out var returnInfo))
+ {
+ returnInfo = new ReturnInfo
+ {
+ RefCount = 0,
+ FirstReplyCode = e.ReplyCode
+ };
+
+ returnRoutingKeys.Add(key, returnInfo);
+ }
+
+ returnInfo.RefCount++;
+ }
+
+
+ private void HandleBasicAck(object sender, BasicAckEventArgs e)
+ {
+ foreach (var deliveryTag in GetDeliveryTags(e))
+ {
+ if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo))
+ continue;
+
+ if (returnRoutingKeys.TryGetValue(messageInfo.ReturnKey, out var returnInfo))
+ {
+ messageInfo.CompletionSource.SetResult(returnInfo.FirstReplyCode);
+
+ returnInfo.RefCount--;
+ if (returnInfo.RefCount == 0)
+ returnRoutingKeys.Remove(messageInfo.ReturnKey);
+ }
+
+ messageInfo.CompletionSource.SetResult(0);
+ confirmMessages.Remove(deliveryTag);
+ }
+ }
+
+
+ private void HandleBasicNack(object sender, BasicNackEventArgs e)
+ {
+ foreach (var deliveryTag in GetDeliveryTags(e))
+ {
+ if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo))
+ continue;
+
+ messageInfo.CompletionSource.SetCanceled();
+ confirmMessages.Remove(e.DeliveryTag);
+ }
+ }
+
+
+ private IEnumerable GetDeliveryTags(BasicAckEventArgs e)
+ {
+ return e.Multiple
+ ? confirmMessages.Keys.Where(tag => tag <= e.DeliveryTag).ToArray()
+ : new[] { e.DeliveryTag };
+ }
+
+
+ private IEnumerable GetDeliveryTags(BasicNackEventArgs e)
+ {
+ return e.Multiple
+ ? confirmMessages.Keys.Where(tag => tag <= e.DeliveryTag).ToArray()
+ : new[] { e.DeliveryTag };
+ }
+
+
+ private static string GetReturnKey(string exchange, string routingKey)
+ {
+ return exchange + ':' + routingKey;
+ }
+
+
private class PublishContext : IPublishContext
{
public IDependencyResolver DependencyResolver { get; set; }
diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs
index e8c5776..a00f8d2 100644
--- a/Tapeti/Default/ConsoleLogger.cs
+++ b/Tapeti/Default/ConsoleLogger.cs
@@ -16,7 +16,7 @@ namespace Tapeti.Default
public void ConnectSuccess(TapetiConnectionParams connectionParams)
{
- Console.WriteLine($"[Tapeti] Connected");
+ Console.WriteLine("[Tapeti] Connected");
}
public void HandlerException(Exception e)
diff --git a/Tapeti/Default/FallbackStringEnumConverter.cs b/Tapeti/Default/FallbackStringEnumConverter.cs
new file mode 100644
index 0000000..d4098c3
--- /dev/null
+++ b/Tapeti/Default/FallbackStringEnumConverter.cs
@@ -0,0 +1,90 @@
+using System;
+using System.Diagnostics;
+using Newtonsoft.Json;
+
+namespace Tapeti.Default
+{
+ ///
+ /// Converts an to and from its name string value. If an unknown string value is encountered
+ /// it will translate to 0xDEADBEEF (-559038737) so it can be gracefully handled.
+ /// If you copy this value as-is to another message and try to send it, this converter will throw an exception.
+ ///
+ /// This converter is far simpler than the default StringEnumConverter, it assumes both sides use the same
+ /// enum and therefore skips the naming strategy.
+ ///
+ public class FallbackStringEnumConverter : JsonConverter
+ {
+ private readonly int invalidEnumValue;
+
+
+ public FallbackStringEnumConverter()
+ {
+ unchecked { invalidEnumValue = (int)0xDEADBEEF; }
+ }
+
+
+ public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
+ {
+ if (value == null)
+ {
+ writer.WriteNull();
+ return;
+ }
+
+ if ((int) value == invalidEnumValue)
+ throw new ArgumentException("Enum value was an unknown string value in an incoming message and can not be published in an outgoing message as-is");
+
+ var outputValue = Enum.GetName(value.GetType(), value);
+ writer.WriteValue(outputValue);
+ }
+
+
+ public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
+ {
+ var isNullable = IsNullableType(objectType);
+
+ if (reader.TokenType == JsonToken.Null)
+ {
+ if (!isNullable)
+ throw new JsonSerializationException($"Cannot convert null value to {objectType}");
+
+ return null;
+ }
+
+ var actualType = isNullable ? Nullable.GetUnderlyingType(objectType) : objectType;
+ Debug.Assert(actualType != null, nameof(actualType) + " != null");
+
+ if (reader.TokenType != JsonToken.String)
+ throw new JsonSerializationException($"Unexpected token {reader.TokenType} when parsing enum");
+
+ var enumText = reader.Value.ToString();
+ if (enumText == string.Empty && isNullable)
+ return null;
+
+ try
+ {
+ return Enum.Parse(actualType, enumText);
+ }
+ catch (ArgumentException)
+ {
+ return Enum.ToObject(actualType, invalidEnumValue);
+ }
+ }
+
+
+ public override bool CanConvert(Type objectType)
+ {
+ var actualType = IsNullableType(objectType) ? Nullable.GetUnderlyingType(objectType) : objectType;
+ return actualType?.IsEnum ?? false;
+ }
+
+
+ private static bool IsNullableType(Type t)
+ {
+ if (t == null)
+ throw new ArgumentNullException(nameof(t));
+
+ return t.IsGenericType && t.GetGenericTypeDefinition() == typeof(Nullable<>);
+ }
+ }
+}
diff --git a/Tapeti/Default/JsonMessageSerializer.cs b/Tapeti/Default/JsonMessageSerializer.cs
index 2aee24f..9cee002 100644
--- a/Tapeti/Default/JsonMessageSerializer.cs
+++ b/Tapeti/Default/JsonMessageSerializer.cs
@@ -3,7 +3,6 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;
-using Newtonsoft.Json.Converters;
using RabbitMQ.Client;
namespace Tapeti.Default
@@ -25,7 +24,7 @@ namespace Tapeti.Default
NullValueHandling = NullValueHandling.Ignore
};
- serializerSettings.Converters.Add(new StringEnumConverter());
+ serializerSettings.Converters.Add(new FallbackStringEnumConverter());
}
@@ -52,7 +51,7 @@ namespace Tapeti.Default
throw new ArgumentException($"{ClassTypeHeader} header not present");
var messageType = deserializedTypeNames.GetOrAdd(Encoding.UTF8.GetString((byte[])typeName), DeserializeTypeName);
- return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body), messageType);
+ return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body), messageType, serializerSettings);
}
diff --git a/Tapeti/Default/PublishResultBinding.cs b/Tapeti/Default/PublishResultBinding.cs
index d02d087..2ffe4e1 100644
--- a/Tapeti/Default/PublishResultBinding.cs
+++ b/Tapeti/Default/PublishResultBinding.cs
@@ -3,6 +3,7 @@ using System.Diagnostics;
using System.Reflection;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;
+using Tapeti.Annotations;
using Tapeti.Config;
using Tapeti.Helpers;
@@ -18,23 +19,30 @@ namespace Tapeti.Default
return;
- if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out var isTaskOf, out var actualType))
+ var hasClassResult = context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out var isTaskOf, out var actualType);
+
+ var request = context.MessageClass?.GetCustomAttribute();
+ var expectedClassResult = request?.Response;
+
+ // Verify the return type matches with the Request attribute of the message class. This is a backwards incompatible change in
+ // Tapeti 1.2: if you just want to publish another message as a result of the incoming message, explicitly call IPublisher.Publish.
+ if (!hasClassResult && expectedClassResult != null || hasClassResult && expectedClassResult != actualType)
+ throw new ArgumentException($"Message handler must return type {expectedClassResult?.FullName ?? "void"} in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}, found: {actualType?.FullName ?? "void"}");
+
+ if (!hasClassResult)
return;
+
if (isTaskOf)
{
var handler = GetType().GetMethod("PublishGenericTaskResult", BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType);
Debug.Assert(handler != null, nameof(handler) + " != null");
- context.Result.SetHandler(async (messageContext, value) =>
- {
- await (Task)handler.Invoke(null, new[] { messageContext, value });
- });
+ context.Result.SetHandler(async (messageContext, value) => { await (Task) handler.Invoke(null, new[] {messageContext, value }); });
}
else
- context.Result.SetHandler((messageContext, value) =>
- value == null ? null : Reply(value, messageContext));
+ context.Result.SetHandler((messageContext, value) => Reply(value, messageContext));
}
@@ -43,13 +51,15 @@ namespace Tapeti.Default
private static async Task PublishGenericTaskResult(IMessageContext messageContext, object value) where T : class
{
var message = await (Task)value;
- if (message != null)
- await Reply(message, messageContext);
+ await Reply(message, messageContext);
}
private static Task Reply(object message, IMessageContext messageContext)
{
+ if (message == null)
+ throw new ArgumentException("Return value of a request message handler must not be null");
+
var publisher = (IInternalPublisher)messageContext.DependencyResolver.Resolve();
var properties = new BasicProperties();
@@ -59,9 +69,9 @@ namespace Tapeti.Default
properties.CorrelationId = messageContext.Properties.CorrelationId;
if (messageContext.Properties.IsReplyToPresent())
- return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties);
+ return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, true);
- return publisher.Publish(message, properties);
+ return publisher.Publish(message, properties, false);
}
}
}
diff --git a/Tapeti/Exceptions/NackException.cs b/Tapeti/Exceptions/NackException.cs
new file mode 100644
index 0000000..408dd71
--- /dev/null
+++ b/Tapeti/Exceptions/NackException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace Tapeti.Exceptions
+{
+ public class NackException : Exception
+ {
+ public NackException(string message) : base(message) { }
+ }
+}
diff --git a/Tapeti/Exceptions/NoRouteException.cs b/Tapeti/Exceptions/NoRouteException.cs
new file mode 100644
index 0000000..2dcd591
--- /dev/null
+++ b/Tapeti/Exceptions/NoRouteException.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace Tapeti.Exceptions
+{
+ public class NoRouteException : Exception
+ {
+ public NoRouteException(string message) : base(message) { }
+ }
+}
diff --git a/Tapeti/IPublisher.cs b/Tapeti/IPublisher.cs
index f1bf689..c55f47c 100644
--- a/Tapeti/IPublisher.cs
+++ b/Tapeti/IPublisher.cs
@@ -13,7 +13,7 @@ namespace Tapeti
public interface IInternalPublisher : IPublisher
{
- Task Publish(object message, IBasicProperties properties);
- Task PublishDirect(object message, string queueName, IBasicProperties properties);
+ Task Publish(object message, IBasicProperties properties, bool mandatory);
+ Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory);
}
}
diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs
index 3807190..c785408 100644
--- a/Tapeti/TapetiConfig.cs
+++ b/Tapeti/TapetiConfig.cs
@@ -8,6 +8,7 @@ using Tapeti.Config;
using Tapeti.Default;
using Tapeti.Helpers;
+// ReSharper disable UnusedMember.Global
namespace Tapeti
{
@@ -31,6 +32,8 @@ namespace Tapeti
private readonly IDependencyResolver dependencyResolver;
+ private bool usePublisherConfirms = true;
+
public TapetiConfig(IDependencyResolver dependencyResolver)
{
@@ -90,7 +93,16 @@ namespace Tapeti
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl)));
}
- var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues);
+ var config = new Config(queues)
+ {
+ DependencyResolver = dependencyResolver,
+ MessageMiddleware = messageMiddleware,
+ CleanupMiddleware = cleanupMiddleware,
+ PublishMiddleware = publishMiddleware,
+
+ UsePublisherConfirms = usePublisherConfirms
+ };
+
(dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config);
return config;
@@ -154,11 +166,34 @@ namespace Tapeti
return this;
}
+
+ ///
+ /// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds,
+ /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may
+ /// result in never-ending flows. Only disable if you can accept those consequences.
+ ///
+ public TapetiConfig DisablePublisherConfirms()
+ {
+ usePublisherConfirms = false;
+ return this;
+ }
+
+
+ ///
+ /// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds,
+ /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may
+ /// result in never-ending flows. Only disable if you accept those consequences.
+ ///
+ public TapetiConfig SetPublisherConfirms(bool enabled)
+ {
+ usePublisherConfirms = enabled;
+ return this;
+ }
+
public void RegisterDefaults()
{
- var container = dependencyResolver as IDependencyContainer;
- if (container == null)
+ if (!(dependencyResolver is IDependencyContainer container))
return;
if (ConsoleHelper.IsAvailable())
@@ -399,21 +434,19 @@ namespace Tapeti
protected class Config : IConfig
{
- public IDependencyResolver DependencyResolver { get; }
- public IReadOnlyList MessageMiddleware { get; }
- public IReadOnlyList CleanupMiddleware { get; }
- public IReadOnlyList PublishMiddleware { get; }
+ public bool UsePublisherConfirms { get; set; }
+
+ public IDependencyResolver DependencyResolver { get; set; }
+ public IReadOnlyList MessageMiddleware { get; set; }
+ public IReadOnlyList CleanupMiddleware { get; set; }
+ public IReadOnlyList PublishMiddleware { get; set; }
public IEnumerable Queues { get; }
private readonly Dictionary bindingMethodLookup;
- public Config(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IReadOnlyList cleanupMiddleware, IReadOnlyList publishMiddleware, IEnumerable queues)
+ public Config(IEnumerable queues)
{
- DependencyResolver = dependencyResolver;
- MessageMiddleware = messageMiddleware;
- CleanupMiddleware = cleanupMiddleware;
- PublishMiddleware = publishMiddleware;
Queues = queues.ToList();
bindingMethodLookup = Queues.SelectMany(q => q.Bindings).ToDictionary(b => b.Method, b => b);
@@ -429,6 +462,8 @@ namespace Tapeti
protected class Queue : IDynamicQueue
{
+ private readonly string declareQueueName;
+
public bool Dynamic { get; }
public string Name { get; set; }
public IEnumerable Bindings { get; }
@@ -436,12 +471,20 @@ namespace Tapeti
public Queue(QueueInfo queue, IEnumerable bindings)
{
+ declareQueueName = queue.Name;
+
Dynamic = queue.Dynamic.GetValueOrDefault();
Name = queue.Name;
Bindings = bindings;
}
+ public string GetDeclareQueueName()
+ {
+ return declareQueueName;
+ }
+
+
public void SetName(string name)
{
Name = name;
diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs
index 0578a2e..d66f880 100644
--- a/Tapeti/TapetiConnection.cs
+++ b/Tapeti/TapetiConnection.cs
@@ -8,13 +8,15 @@ using Tapeti.Connection;
namespace Tapeti
{
+ public delegate void DisconnectedEventHandler(object sender, DisconnectedEventArgs e);
+
public class TapetiConnection : IDisposable
{
private readonly IConfig config;
public TapetiConnectionParams Params { get; set; }
private readonly Lazy worker;
-
+ private TapetiSubscriber subscriber;
public TapetiConnection(IConfig config)
{
@@ -29,15 +31,17 @@ namespace Tapeti
}
public event EventHandler Connected;
-
- public event EventHandler Disconnected;
-
+ public event DisconnectedEventHandler Disconnected;
public event EventHandler Reconnected;
+
public async Task Subscribe(bool startConsuming = true)
{
- var subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList());
- await subscriber.BindQueues();
+ if (subscriber == null)
+ {
+ subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList());
+ await subscriber.BindQueues();
+ }
if (startConsuming)
await subscriber.Resume();
@@ -46,9 +50,9 @@ namespace Tapeti
}
- public ISubscriber SubscribeSync()
+ public ISubscriber SubscribeSync(bool startConsuming = true)
{
- return Subscribe().Result;
+ return Subscribe(startConsuming).Result;
}
@@ -84,9 +88,9 @@ namespace Tapeti
owner.OnConnected(new EventArgs());
}
- public void Disconnected()
+ public void Disconnected(DisconnectedEventArgs e)
{
- owner.OnDisconnected(new EventArgs());
+ owner.OnDisconnected(e);
}
public void Reconnected()
@@ -97,17 +101,23 @@ namespace Tapeti
protected virtual void OnConnected(EventArgs e)
{
- Connected?.Invoke(this, e);
+ Task.Run(() => Connected?.Invoke(this, e));
}
protected virtual void OnReconnected(EventArgs e)
{
- Reconnected?.Invoke(this, e);
+ Task.Run(() =>
+ {
+ subscriber?.RebindQueues().ContinueWith((t) =>
+ {
+ Reconnected?.Invoke(this, e);
+ });
+ });
}
- protected virtual void OnDisconnected(EventArgs e)
+ protected virtual void OnDisconnected(DisconnectedEventArgs e)
{
- Disconnected?.Invoke(this, e);
+ Task.Run(() => Disconnected?.Invoke(this, e));
}
}
}
diff --git a/Tapeti/Tasks/SingleThreadTaskQueue.cs b/Tapeti/Tasks/SingleThreadTaskQueue.cs
index fa28949..f22f869 100644
--- a/Tapeti/Tasks/SingleThreadTaskQueue.cs
+++ b/Tapeti/Tasks/SingleThreadTaskQueue.cs
@@ -27,7 +27,7 @@ namespace Tapeti.Tasks
}
- public Task Add(Func func)
+ public Task Add(Func func)
{
lock (previousTaskLock)
{
@@ -36,7 +36,11 @@ namespace Tapeti.Tasks
, singleThreadScheduler.Value);
previousTask = task;
- return task;
+
+ // 'task' completes at the moment a Task is returned (for example, an await is encountered),
+ // this is used to chain the next. We return the unwrapped Task however, so that the caller
+ // awaits until the full task chain has completed.
+ return task.Unwrap();
}
}
diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs
index 4ced01e..3368f59 100644
--- a/Test/MarcoController.cs
+++ b/Test/MarcoController.cs
@@ -80,12 +80,16 @@ namespace Test
return flowProvider.YieldWithParallelRequest()
.AddRequestSync(new PoloConfirmationRequestMessage
{
- StoredInState = StateTestGuid
+ StoredInState = StateTestGuid,
+ EnumValue = TestEnum.Value1,
+
}, HandlePoloConfirmationResponse1)
.AddRequestSync(new PoloConfirmationRequestMessage
{
- StoredInState = StateTestGuid
+ StoredInState = StateTestGuid,
+ EnumValue = TestEnum.Value2,
+ OptionalEnumValue = TestEnum.Value1
}, HandlePoloConfirmationResponse2)
.YieldSync(ContinuePoloConfirmation);
@@ -127,7 +131,9 @@ namespace Test
return new PoloConfirmationResponseMessage
{
- ShouldMatchState = message.StoredInState
+ ShouldMatchState = message.StoredInState,
+ EnumValue = message.EnumValue,
+ OptionalEnumValue = message.OptionalEnumValue
};
}
@@ -141,6 +147,13 @@ namespace Test
}
+ public enum TestEnum
+ {
+ Value1,
+ Value2
+ }
+
+
[Request(Response = typeof(PoloMessage))]
public class MarcoMessage
{
@@ -157,6 +170,9 @@ namespace Test
{
[Required]
public Guid StoredInState { get; set; }
+
+ public TestEnum EnumValue;
+ public TestEnum? OptionalEnumValue;
}
@@ -164,5 +180,8 @@ namespace Test
{
[Required]
public Guid ShouldMatchState { get; set; }
+
+ public TestEnum EnumValue;
+ public TestEnum? OptionalEnumValue;
}
}
diff --git a/Test/MyLogger.cs b/Test/MyLogger.cs
deleted file mode 100644
index 81a60d8..0000000
--- a/Test/MyLogger.cs
+++ /dev/null
@@ -1,25 +0,0 @@
-using System;
-using Tapeti;
-
-namespace Test
-{
- public class MyLogger : ILogger
- {
- public void Connect(TapetiConnectionParams connectionParams)
- {
- }
-
- public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception)
- {
- }
-
- public void ConnectSuccess(TapetiConnectionParams connectionParams)
- {
- }
-
- public void HandlerException(Exception e)
- {
- Console.WriteLine("Mylogger: " + e.Message);
- }
- }
-}
diff --git a/Test/Program.cs b/Test/Program.cs
index be6eed9..62377aa 100644
--- a/Test/Program.cs
+++ b/Test/Program.cs
@@ -5,7 +5,6 @@ using Tapeti.DataAnnotations;
using Tapeti.Flow;
using Tapeti.SimpleInjector;
using System.Threading;
-using Tapeti.Flow.SQL;
namespace Test
{
@@ -14,58 +13,60 @@ namespace Test
private static void Main()
{
// TODO logging
-
- var container = new Container();
- container.Register();
- container.Register();
- container.Register();
-
- var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
- //.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true")
- .WithFlow()
- .WithDataAnnotations()
- .RegisterAllControllers()
- .Build();
-
- using (var connection = new TapetiConnection(config)
+ try
{
- Params = new TapetiAppSettingsConnectionParams()
- })
+ var container = new Container();
+ container.Register();
+ container.Register();
+ container.Register();
+
+ var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
+ //.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true")
+ .WithFlow()
+ .WithDataAnnotations()
+ .RegisterAllControllers()
+ //.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails
+ .Build();
+
+ using (var connection = new TapetiConnection(config)
+ {
+ Params = new TapetiAppSettingsConnectionParams()
+ })
+ {
+ var flowStore = container.GetInstance();
+ var flowStore2 = container.GetInstance();
+
+ Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2));
+
+ connection.Connected += (sender, e) => { Console.WriteLine("Event Connected"); };
+ connection.Disconnected += (sender, e) => { Console.WriteLine("Event Disconnected"); };
+ connection.Reconnected += (sender, e) => { Console.WriteLine("Event Reconnected"); };
+
+ Console.WriteLine("Subscribing...");
+ var subscriber = connection.Subscribe(false).Result;
+
+ Console.WriteLine("Consuming...");
+ subscriber.Resume().Wait();
+
+ Console.WriteLine("Done!");
+
+ connection.GetPublisher().Publish(new FlowEndController.PingMessage());
+
+ //container.GetInstance().Start(c => c.StartFlow, true).Wait();
+ container.GetInstance().Start(c => c.TestParallelRequest).Wait();
+
+ Thread.Sleep(1000);
+
+ var emitter = container.GetInstance();
+ emitter.Run().Wait();
+
+
+ }
+ }
+ catch (Exception e)
{
- var flowStore = container.GetInstance();
- var flowStore2 = container.GetInstance();
-
- Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2));
-
- connection.Connected += (sender, e) => {
- Console.WriteLine("Event Connected");
- };
- connection.Disconnected += (sender, e) => {
- Console.WriteLine("Event Disconnected");
- };
- connection.Reconnected += (sender, e) => {
- Console.WriteLine("Event Reconnected");
- };
-
- Console.WriteLine("Subscribing...");
- var subscriber = connection.Subscribe(false).Result;
-
- Console.WriteLine("Consuming...");
- subscriber.Resume().Wait();
-
- Console.WriteLine("Done!");
-
- connection.GetPublisher().Publish(new FlowEndController.PingMessage());
-
- //container.GetInstance().Start(c => c.StartFlow, true);
- container.GetInstance().Start(c => c.TestParallelRequest);
-
- Thread.Sleep(1000);
-
- var emitter = container.GetInstance();
- emitter.Run().Wait();
-
-
+ Console.WriteLine(e.ToString());
+ Console.ReadKey();
}
}
}