diff --git a/Tapeti.Annotations/DurableQueueAttribute.cs b/Tapeti.Annotations/DurableQueueAttribute.cs
index 8971044..ae99278 100644
--- a/Tapeti.Annotations/DurableQueueAttribute.cs
+++ b/Tapeti.Annotations/DurableQueueAttribute.cs
@@ -8,11 +8,6 @@ namespace Tapeti.Annotations
/// Binds to an existing durable queue to receive messages. Can be used
/// on an entire MessageController class or on individual methods.
///
- ///
- /// At the moment there is no support for creating a durable queue and managing the
- /// bindings. The author recommends https://git.x2software.net/pub/RabbitMetaQueue
- /// for deploy-time management of durable queues (shameless plug intended).
- ///
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
[MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)]
public class DurableQueueAttribute : Attribute
diff --git a/Tapeti.Annotations/Tapeti.Annotations.csproj b/Tapeti.Annotations/Tapeti.Annotations.csproj
index f584c07..8ede527 100644
--- a/Tapeti.Annotations/Tapeti.Annotations.csproj
+++ b/Tapeti.Annotations/Tapeti.Annotations.csproj
@@ -6,7 +6,7 @@
- 1701;1702;1591
+ 1701;1702
diff --git a/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs b/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs
index cef8d1e..b706ccd 100644
--- a/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs
+++ b/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs
@@ -1,9 +1,11 @@
using System;
using System.ComponentModel.DataAnnotations;
-using System.Globalization;
+
+// ReSharper disable UnusedMember.Global
namespace Tapeti.DataAnnotations.Extensions
{
+ ///
///
/// Can be used on Guid fields which are supposed to be Required, as the Required attribute does
/// not work for Guids and making them Nullable is counter-intuitive.
@@ -13,10 +15,12 @@ namespace Tapeti.DataAnnotations.Extensions
private const string DefaultErrorMessage = "'{0}' does not contain a valid guid";
private const string InvalidTypeErrorMessage = "'{0}' is not of type Guid";
+ ///
public RequiredGuidAttribute() : base(DefaultErrorMessage)
{
}
+ ///
protected override ValidationResult IsValid(object value, ValidationContext validationContext)
{
if (value == null)
diff --git a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj
index 6ad9eab..8855449 100644
--- a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj
+++ b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj
@@ -2,10 +2,11 @@
netstandard2.0
+ true
- 1701;1702;1591
+ 1701;1702
diff --git a/Tapeti.DataAnnotations/ConfigExtensions.cs b/Tapeti.DataAnnotations/ConfigExtensions.cs
index 3001fe9..72a3cfb 100644
--- a/Tapeti.DataAnnotations/ConfigExtensions.cs
+++ b/Tapeti.DataAnnotations/ConfigExtensions.cs
@@ -1,10 +1,19 @@
-namespace Tapeti.DataAnnotations
+using Tapeti.Config;
+
+namespace Tapeti.DataAnnotations
{
+ ///
+ /// Extends ITapetiConfigBuilder to enable DataAnnotations.
+ ///
public static class ConfigExtensions
{
- public static TapetiConfig WithDataAnnotations(this TapetiConfig config)
+ ///
+ /// Enables the DataAnnotations validation middleware.
+ ///
+ ///
+ public static ITapetiConfigBuilder WithDataAnnotations(this ITapetiConfigBuilder config)
{
- config.Use(new DataAnnotationsMiddleware());
+ config.Use(new DataAnnotationsExtension());
return config;
}
}
diff --git a/Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs b/Tapeti.DataAnnotations/DataAnnotationsExtension.cs
similarity index 67%
rename from Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs
rename to Tapeti.DataAnnotations/DataAnnotationsExtension.cs
index ffbaac4..abdbc5c 100644
--- a/Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs
+++ b/Tapeti.DataAnnotations/DataAnnotationsExtension.cs
@@ -3,12 +3,18 @@ using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
- public class DataAnnotationsMiddleware : ITapetiExtension
+ ///
+ ///
+ /// Provides the DataAnnotations validation middleware.
+ ///
+ public class DataAnnotationsExtension : ITapetiExtension
{
+ ///
public void RegisterDefaults(IDependencyContainer container)
{
}
+ ///
public IEnumerable
///
- ///
+ ///
/// Always call to allow the next in the chain to clean up
- Task Cleanup(IControllerMessageContext context, HandlingResult handlingResult, Func next);
+ Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next);
}
}
diff --git a/Tapeti/Config/IExceptionStrategyContext.cs b/Tapeti/Config/IExceptionStrategyContext.cs
index 4aae1fd..e418a96 100644
--- a/Tapeti/Config/IExceptionStrategyContext.cs
+++ b/Tapeti/Config/IExceptionStrategyContext.cs
@@ -4,12 +4,26 @@
namespace Tapeti.Config
{
+ ///
+ /// Provides access to information about the message being consumed.
+ /// Allows the strategy to determine how the exception should be handled.
+ ///
public interface IExceptionStrategyContext
{
+ ///
+ /// Provides access to the message context.
+ ///
IMessageContext MessageContext { get; }
+ ///
+ /// Contains the exception being handled.
+ ///
Exception Exception { get; }
- HandlingResultBuilder HandlingResult { get; set; }
+ ///
+ /// Determines how the message has been handled. Defaults to Error.
+ ///
+ ///
+ void SetConsumeResult(ConsumeResult consumeResult);
}
}
diff --git a/Tapeti/Config/IPublishMiddleware.cs b/Tapeti/Config/IPublishMiddleware.cs
index 9a0eccc..c8069e3 100644
--- a/Tapeti/Config/IPublishMiddleware.cs
+++ b/Tapeti/Config/IPublishMiddleware.cs
@@ -3,8 +3,16 @@ using System.Threading.Tasks;
namespace Tapeti.Config
{
+ ///
+ /// Denotes middleware that processes all published messages.
+ ///
public interface IPublishMiddleware
{
+ ///
+ /// Called when a message is published.
+ ///
+ ///
+ /// Call to pass the message to the next handler in the chain
Task Handle(IPublishContext context, Func next);
}
}
diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs
index 80b30ff..016e022 100644
--- a/Tapeti/Connection/TapetiBasicConsumer.cs
+++ b/Tapeti/Connection/TapetiBasicConsumer.cs
@@ -12,11 +12,11 @@ namespace Tapeti.Connection
public class TapetiBasicConsumer : DefaultBasicConsumer
{
private readonly IConsumer consumer;
- private readonly Func onRespond;
+ private readonly Func onRespond;
///
- public TapetiBasicConsumer(IConsumer consumer, Func onRespond)
+ public TapetiBasicConsumer(IConsumer consumer, Func onRespond)
{
this.consumer = consumer;
this.onRespond = onRespond;
@@ -35,7 +35,7 @@ namespace Tapeti.Connection
}
catch
{
- await onRespond(deliveryTag, ConsumeResponse.Nack);
+ await onRespond(deliveryTag, ConsumeResult.Error);
}
});
}
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index 35ca29f..c8b3bcd 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -5,7 +5,6 @@ using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
-using System.Web;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
@@ -50,7 +49,7 @@ namespace Tapeti.Connection
private IModel channelInstance;
private ulong lastDeliveryTag;
private DateTime connectedDateTime;
- private HttpClient managementClient;
+ private readonly HttpClient managementClient;
// These fields must be locked, since the callbacks for BasicAck/BasicReturn can run in a different thread
private readonly object confirmLock = new object();
@@ -186,28 +185,29 @@ namespace Tapeti.Connection
}
- private async Task Respond(ulong deliveryTag, ConsumeResponse response)
+ private async Task Respond(ulong deliveryTag, ConsumeResult result)
{
await taskQueue.Value.Add(() =>
{
// No need for a retryable channel here, if the connection is lost we can't
// use the deliveryTag anymore.
- switch (response)
+ switch (result)
{
- case ConsumeResponse.Ack:
+ case ConsumeResult.Success:
+ case ConsumeResult.ExternalRequeue:
GetChannel().BasicAck(deliveryTag, false);
break;
- case ConsumeResponse.Nack:
+ case ConsumeResult.Error:
GetChannel().BasicNack(deliveryTag, false, false);
break;
- case ConsumeResponse.Requeue:
+ case ConsumeResult.Requeue:
GetChannel().BasicNack(deliveryTag, false, true);
break;
default:
- throw new ArgumentOutOfRangeException(nameof(response), response, null);
+ throw new ArgumentOutOfRangeException(nameof(result), result, null);
}
});
@@ -454,7 +454,7 @@ namespace Tapeti.Connection
{
try
{
- logger.Connect(connectionParams);
+ logger.Connect(connectionParams, isReconnect);
connection = connectionFactory.CreateConnection();
channelInstance = connection.CreateModel();
@@ -510,7 +510,7 @@ namespace Tapeti.Connection
else
ConnectionEventListener?.Connected();
- logger.ConnectSuccess(connectionParams);
+ logger.ConnectSuccess(connectionParams, isReconnect);
isReconnect = true;
break;
diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs
index fff74ef..ff3ea78 100644
--- a/Tapeti/Connection/TapetiConsumer.cs
+++ b/Tapeti/Connection/TapetiConsumer.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Runtime.ExceptionServices;
using Tapeti.Config;
using Tapeti.Default;
using System.Threading.Tasks;
@@ -37,106 +38,49 @@ namespace Tapeti.Connection
///
- public async Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body)
+ public async Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body)
{
+ object message = null;
try
{
- var message = messageSerializer.Deserialize(body, properties);
+ message = messageSerializer.Deserialize(body, properties);
if (message == null)
- throw new ArgumentException($"Message body could not be deserialized into a message object in queue {queueName}", nameof(body));
+ throw new ArgumentException("Message body could not be deserialized into a message object", nameof(body));
- await DispatchMessage(message, new MessageContextData
+ return await DispatchMessage(message, new MessageContextData
{
Exchange = exchange,
RoutingKey = routingKey,
Properties = properties
});
-
- return ConsumeResponse.Ack;
}
- catch (Exception e)
+ catch (Exception dispatchException)
{
- // TODO exception strategy
- // TODO logger
- return ConsumeResponse.Nack;
- }
+ // TODO check if this is still necessary:
+ // var exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch));
-
- /*
-
- handlingResult = new HandlingResult
- {
- ConsumeResponse = ConsumeResponse.Ack,
- MessageAction = MessageAction.None
- };
- }
- catch (Exception eDispatch)
+ using (var emptyContext = new MessageContext
{
- var exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch));
- logger.HandlerException(eDispatch);
- try
- {
- var exceptionStrategyContext = new ExceptionStrategyContext(context, exception.SourceException);
-
- exceptionStrategy.HandleException(exceptionStrategyContext);
-
- handlingResult = exceptionStrategyContext.HandlingResult.ToHandlingResult();
- }
- catch (Exception eStrategy)
- {
- logger.HandlerException(eStrategy);
- }
- }
- try
+ Config = config,
+ Queue = queueName,
+ Exchange = exchange,
+ RoutingKey = routingKey,
+ Message = message,
+ Properties = properties,
+ Binding = null
+ })
{
- if (handlingResult == null)
- {
- handlingResult = new HandlingResult
- {
- ConsumeResponse = ConsumeResponse.Nack,
- MessageAction = MessageAction.None
- };
- }
- await RunCleanup(context, handlingResult);
- }
- catch (Exception eCleanup)
- {
- logger.HandlerException(eCleanup);
+ var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException);
+ HandleException(exceptionContext);
+ return exceptionContext.ConsumeResult;
}
}
- finally
- {
- try
- {
- if (handlingResult == null)
- {
- handlingResult = new HandlingResult
- {
- ConsumeResponse = ConsumeResponse.Nack,
- MessageAction = MessageAction.None
- };
- }
- await client.Respond(deliveryTag, handlingResult.ConsumeResponse);
- }
- catch (Exception eRespond)
- {
- logger.HandlerException(eRespond);
- }
- try
- {
- context?.Dispose();
- }
- catch (Exception eDispose)
- {
- logger.HandlerException(eDispose);
- }
- }
- */
}
- private async Task DispatchMessage(object message, MessageContextData messageContextData)
+ private async Task DispatchMessage(object message, MessageContextData messageContextData)
{
+ var returnResult = ConsumeResult.Success;
var messageType = message.GetType();
var validMessageType = false;
@@ -145,18 +89,23 @@ namespace Tapeti.Connection
if (!binding.Accept(messageType))
continue;
- await InvokeUsingBinding(message, messageContextData, binding);
+ var consumeResult = await InvokeUsingBinding(message, messageContextData, binding);
validMessageType = true;
+
+ if (consumeResult != ConsumeResult.Success)
+ returnResult = consumeResult;
}
if (!validMessageType)
- throw new ArgumentException($"Unsupported message type in queue {queueName}: {message.GetType().FullName}");
+ throw new ArgumentException($"No binding found for message type: {message.GetType().FullName}");
+
+ return returnResult;
}
- private async Task InvokeUsingBinding(object message, MessageContextData messageContextData, IBinding binding)
+ private async Task InvokeUsingBinding(object message, MessageContextData messageContextData, IBinding binding)
{
- var context = new MessageContext
+ using (var context = new MessageContext
{
Config = config,
Queue = queueName,
@@ -165,21 +114,44 @@ namespace Tapeti.Connection
Message = message,
Properties = messageContextData.Properties,
Binding = binding
- };
+ })
+ {
+ try
+ {
+ await MiddlewareHelper.GoAsync(config.Middleware.Message,
+ (handler, next) => handler.Handle(context, next),
+ async () => { await binding.Invoke(context); });
- try
- {
- await MiddlewareHelper.GoAsync(config.Middleware.Message,
- (handler, next) => handler.Handle(context, next),
- async () => { await binding.Invoke(context); });
- }
- finally
- {
- context.Dispose();
+ return ConsumeResult.Success;
+ }
+ catch (Exception invokeException)
+ {
+ var exceptionContext = new ExceptionStrategyContext(context, invokeException);
+ HandleException(exceptionContext);
+ return exceptionContext.ConsumeResult;
+ }
}
}
+ private void HandleException(ExceptionStrategyContext exceptionContext)
+ {
+ try
+ {
+ exceptionStrategy.HandleException(exceptionContext);
+ }
+ catch (Exception strategyException)
+ {
+ // Exception in the exception strategy. Oh dear.
+ exceptionContext.SetConsumeResult(ConsumeResult.Error);
+ logger.ConsumeException(strategyException, exceptionContext.MessageContext, ConsumeResult.Error);
+ }
+
+ logger.ConsumeException(exceptionContext.Exception, exceptionContext.MessageContext, exceptionContext.ConsumeResult);
+ }
+
+
+
private struct MessageContextData
{
public string Exchange;
diff --git a/Tapeti/ConsumeResponse.cs b/Tapeti/ConsumeResponse.cs
deleted file mode 100644
index d539170..0000000
--- a/Tapeti/ConsumeResponse.cs
+++ /dev/null
@@ -1,23 +0,0 @@
-namespace Tapeti
-{
- ///
- /// Determines the response sent back after handling a message.
- ///
- public enum ConsumeResponse
- {
- ///
- /// Acknowledge the message and remove it from the queue
- ///
- Ack,
-
- ///
- /// Negatively acknowledge the message and remove it from the queue, send to dead-letter queue if configured on the bus
- ///
- Nack,
-
- ///
- /// Negatively acknowledge the message and put it back in the queue to try again later
- ///
- Requeue
- }
-}
diff --git a/Tapeti/ConsumeResult.cs b/Tapeti/ConsumeResult.cs
new file mode 100644
index 0000000..e31d274
--- /dev/null
+++ b/Tapeti/ConsumeResult.cs
@@ -0,0 +1,33 @@
+namespace Tapeti
+{
+ ///
+ /// Determines how the message has been handled and the response given to the message bus.
+ ///
+ public enum ConsumeResult
+ {
+ ///
+ /// Acknowledge the message and remove it from the queue.
+ ///
+ Success,
+
+ ///
+ /// Negatively acknowledge the message and remove it from the queue, send to dead-letter queue if configured on the bus.
+ ///
+ Error,
+
+ ///
+ /// Negatively acknowledge the message and put it back in the queue to try again later.
+ ///
+ Requeue,
+
+ ///
+ /// The message has been stored for republishing and will be delivered again by some other means.
+ /// It will be acknowledged and removed from the queue as if succesful.
+ ///
+ ///
+ /// This option is for compatibility with external scheduler services. The exception strategy must guarantee that the
+ /// message will eventually be republished.
+ ///
+ ExternalRequeue
+ }
+}
diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs
index a00f8d2..d42f46c 100644
--- a/Tapeti/Default/ConsoleLogger.cs
+++ b/Tapeti/Default/ConsoleLogger.cs
@@ -1,27 +1,49 @@
using System;
+using Tapeti.Config;
namespace Tapeti.Default
{
+ ///
+ ///
+ /// Default ILogger implementation for console applications.
+ ///
public class ConsoleLogger : ILogger
{
- public void Connect(TapetiConnectionParams connectionParams)
+ ///
+ public void Connect(TapetiConnectionParams connectionParams, bool isReconnect)
{
- Console.WriteLine($"[Tapeti] Connecting to {connectionParams.HostName}:{connectionParams.Port}{connectionParams.VirtualHost}");
+ Console.WriteLine($"[Tapeti] {(isReconnect ? "Reconnecting" : "Connecting")} to {connectionParams.HostName}:{connectionParams.Port}{connectionParams.VirtualHost}");
}
+ ///
public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception)
{
Console.WriteLine($"[Tapeti] Connection failed: {exception}");
}
- public void ConnectSuccess(TapetiConnectionParams connectionParams)
+ ///
+ public void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect)
{
- Console.WriteLine("[Tapeti] Connected");
+ Console.WriteLine($"[Tapeti] {(isReconnect ? "Reconnected" : "Connected")}");
}
- public void HandlerException(Exception e)
+ ///
+ public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
- Console.WriteLine(e.ToString());
+ Console.WriteLine("[Tapeti] Exception while handling message");
+ Console.WriteLine($" Result : {consumeResult}");
+ Console.WriteLine($" Exchange : {messageContext.Exchange}");
+ Console.WriteLine($" Queue : {messageContext.Queue}");
+ Console.WriteLine($" RoutingKey : {messageContext.RoutingKey}");
+
+ if (messageContext is IControllerMessageContext controllerMessageContext)
+ {
+ Console.WriteLine($" Controller : {controllerMessageContext.Binding.Controller.FullName}");
+ Console.WriteLine($" Method : {controllerMessageContext.Binding.Method.Name}");
+ }
+
+ Console.WriteLine();
+ Console.WriteLine(exception);
}
}
}
diff --git a/Tapeti/Default/ControllerMessageContext.cs b/Tapeti/Default/ControllerMessageContext.cs
index 4b29a07..4fc410a 100644
--- a/Tapeti/Default/ControllerMessageContext.cs
+++ b/Tapeti/Default/ControllerMessageContext.cs
@@ -7,7 +7,7 @@ namespace Tapeti.Default
///
public class ControllerMessageContext : MessageContext, IControllerMessageContext
{
- private Dictionary items = new Dictionary();
+ private readonly Dictionary items = new Dictionary();
///
diff --git a/Tapeti/Default/DevNullLogger.cs b/Tapeti/Default/DevNullLogger.cs
index df7952f..5e247ea 100644
--- a/Tapeti/Default/DevNullLogger.cs
+++ b/Tapeti/Default/DevNullLogger.cs
@@ -1,22 +1,31 @@
using System;
+using Tapeti.Config;
namespace Tapeti.Default
{
+ ///
+ ///
+ /// Default ILogger implementation which does not log anything.
+ ///
public class DevNullLogger : ILogger
{
- public void Connect(TapetiConnectionParams connectionParams)
+ ///
+ public void Connect(TapetiConnectionParams connectionParams, bool isReconnect)
{
}
+ ///
public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception)
{
}
- public void ConnectSuccess(TapetiConnectionParams connectionParams)
+ ///
+ public void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect)
{
}
- public void HandlerException(Exception e)
+ ///
+ public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
}
}
diff --git a/Tapeti/Default/ExceptionStrategyContext.cs b/Tapeti/Default/ExceptionStrategyContext.cs
index 89280ee..bcd163e 100644
--- a/Tapeti/Default/ExceptionStrategyContext.cs
+++ b/Tapeti/Default/ExceptionStrategyContext.cs
@@ -3,23 +3,37 @@ using Tapeti.Config;
namespace Tapeti.Default
{
+ ///
+ ///
+ /// Default implementation of IExceptionStrategyContext.
+ ///
public class ExceptionStrategyContext : IExceptionStrategyContext
{
- internal ExceptionStrategyContext(IMessageContext messageContext, Exception exception)
+ ///
+ /// The ConsumeResult as set by the exception strategy. Defaults to Error.
+ ///
+ public ConsumeResult ConsumeResult { get; set; } = ConsumeResult.Error;
+
+
+ ///
+ public IMessageContext MessageContext { get; }
+
+ ///
+ public Exception Exception { get; }
+
+
+ ///
+ public ExceptionStrategyContext(IMessageContext messageContext, Exception exception)
{
MessageContext = messageContext;
Exception = exception;
}
- public IMessageContext MessageContext { get; }
- public Exception Exception { get; }
-
- private HandlingResultBuilder handlingResult;
- public HandlingResultBuilder HandlingResult
+ ///
+ public void SetConsumeResult(ConsumeResult consumeResult)
{
- get => handlingResult ?? (handlingResult = new HandlingResultBuilder());
- set => handlingResult = value;
+ ConsumeResult = consumeResult;
}
}
}
diff --git a/Tapeti/Default/NackExceptionStrategy.cs b/Tapeti/Default/NackExceptionStrategy.cs
index 3bbb2d5..06510f2 100644
--- a/Tapeti/Default/NackExceptionStrategy.cs
+++ b/Tapeti/Default/NackExceptionStrategy.cs
@@ -2,11 +2,16 @@
namespace Tapeti.Default
{
+ ///
+ ///
+ /// Default implementation of an exception strategy which marks the messages as Error.
+ ///
public class NackExceptionStrategy : IExceptionStrategy
{
+ ///
public void HandleException(IExceptionStrategyContext context)
{
- context.HandlingResult.ConsumeResponse = ConsumeResponse.Nack;
+ context.SetConsumeResult(ConsumeResult.Error);
}
}
}
diff --git a/Tapeti/Default/NamespaceMatchExchangeStrategy.cs b/Tapeti/Default/NamespaceMatchExchangeStrategy.cs
index 01ee0a6..3c4465e 100644
--- a/Tapeti/Default/NamespaceMatchExchangeStrategy.cs
+++ b/Tapeti/Default/NamespaceMatchExchangeStrategy.cs
@@ -3,13 +3,20 @@ using System.Text.RegularExpressions;
namespace Tapeti.Default
{
+ ///
+ ///
+ /// IExchangeStrategy implementation which uses the first identifier in the namespace in lower case,
+ /// skipping the first identifier if it is 'Messaging'.
+ ///
+ ///
+ /// Messaging.Service.Optional.Further.Parts will result in the exchange name 'service'.
+ ///
public class NamespaceMatchExchangeStrategy : IExchangeStrategy
{
- // If the namespace starts with "Messaging.Service[.Optional.Further.Parts]", the exchange will be "Service".
- // If no Messaging prefix is present, the first part of the namespace will be used instead.
private static readonly Regex NamespaceRegex = new Regex("^(Messaging\\.)?(?[^\\.]+)", RegexOptions.Compiled | RegexOptions.Singleline);
+ ///
public string GetExchange(Type messageType)
{
if (messageType.Namespace == null)
diff --git a/Tapeti/Default/RabbitMQMessageProperties.cs b/Tapeti/Default/RabbitMQMessageProperties.cs
index 6de9719..c560e56 100644
--- a/Tapeti/Default/RabbitMQMessageProperties.cs
+++ b/Tapeti/Default/RabbitMQMessageProperties.cs
@@ -12,6 +12,9 @@ namespace Tapeti.Default
///
public class RabbitMQMessageProperties : IMessageProperties
{
+ ///
+ /// Provides access to the wrapped IBasicProperties
+ ///
public IBasicProperties BasicProperties { get; }
diff --git a/Tapeti/Default/RequeueExceptionStrategy.cs b/Tapeti/Default/RequeueExceptionStrategy.cs
index 6c014f6..87fa8a2 100644
--- a/Tapeti/Default/RequeueExceptionStrategy.cs
+++ b/Tapeti/Default/RequeueExceptionStrategy.cs
@@ -4,11 +4,25 @@
namespace Tapeti.Default
{
+ ///
+ ///
+ /// Example exception strategy which requeues all messages that result in an error.
+ ///
+ ///
+ /// You probably do not want to use this strategy as-is in production code, unless
+ /// you are sure that all your exceptions are transient. A better way would be to
+ /// check for exceptions you know are transient. An even better way would be to
+ /// never requeue but retry transient errors internally. See the Tapeti documentation
+ /// for an example of this pattern:
+ ///
+ /// https://tapeti.readthedocs.io/en/latest/
+ ///
public class RequeueExceptionStrategy : IExceptionStrategy
{
+ ///
public void HandleException(IExceptionStrategyContext context)
{
- context.HandlingResult.ConsumeResponse = ConsumeResponse.Requeue;
+ context.SetConsumeResult(ConsumeResult.Requeue);
}
}
}
diff --git a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs
index 99bf6b9..de51fc2 100644
--- a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs
+++ b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs
@@ -6,6 +6,13 @@ using System.Text.RegularExpressions;
namespace Tapeti.Default
{
+ ///
+ /// IRoutingKeyStrategy implementation which transforms the class name into a dot-separated routing key based
+ /// on the casing. Accounts for acronyms. If the class name ends with 'Message' it is not included in the routing key.
+ ///
+ ///
+ /// ExampleClassNameMessage will result in example.class.name
+ ///
public class TypeNameRoutingKeyStrategy : IRoutingKeyStrategy
{
private const string SeparatorPattern = @"
@@ -24,12 +31,17 @@ namespace Tapeti.Default
private static readonly ConcurrentDictionary RoutingKeyCache = new ConcurrentDictionary();
+ ///
public string GetRoutingKey(Type messageType)
{
return RoutingKeyCache.GetOrAdd(messageType, BuildRoutingKey);
}
+ ///
+ /// Actual implementation of GetRoutingKey, called only when the type has not been cached yet.
+ ///
+ ///
protected virtual string BuildRoutingKey(Type messageType)
{
// Split PascalCase into dot-separated parts. If the class name ends in "Message" leave that out.
@@ -43,6 +55,7 @@ namespace Tapeti.Default
return string.Join(".", words.Select(s => s.ToLower()));
}
+
private static List SplitPascalCase(string value)
{
var split = SeparatorRegex.Split(value);
diff --git a/Tapeti/Exceptions/NackException.cs b/Tapeti/Exceptions/NackException.cs
index 408dd71..a2fb7fa 100644
--- a/Tapeti/Exceptions/NackException.cs
+++ b/Tapeti/Exceptions/NackException.cs
@@ -2,8 +2,13 @@
namespace Tapeti.Exceptions
{
+ ///
+ ///
+ /// Raised when a message is nacked by the message bus.
+ ///
public class NackException : Exception
{
+ ///
public NackException(string message) : base(message) { }
}
}
diff --git a/Tapeti/Exceptions/NoRouteException.cs b/Tapeti/Exceptions/NoRouteException.cs
index 2dcd591..3f1ac64 100644
--- a/Tapeti/Exceptions/NoRouteException.cs
+++ b/Tapeti/Exceptions/NoRouteException.cs
@@ -2,8 +2,13 @@
namespace Tapeti.Exceptions
{
+ ///
+ ///
+ /// Raised when a mandatory message has no route.
+ ///
public class NoRouteException : Exception
{
+ ///
public NoRouteException(string message) : base(message) { }
}
}
diff --git a/Tapeti/HandlingResult.cs b/Tapeti/HandlingResult.cs
deleted file mode 100644
index 107c206..0000000
--- a/Tapeti/HandlingResult.cs
+++ /dev/null
@@ -1,63 +0,0 @@
-// ReSharper disable UnusedMember.Global
-
-namespace Tapeti
-{
- public class HandlingResult
- {
- public HandlingResult()
- {
- ConsumeResponse = ConsumeResponse.Nack;
- MessageAction = MessageAction.None;
- }
-
- ///
- /// Determines which response will be given to the message bus from where the message originates.
- ///
- public ConsumeResponse ConsumeResponse { get; internal set; }
-
- ///
- /// Registers which action the Exception strategy has taken or will take to handle the error condition
- /// on the message. This is important to know for cleanup handlers registered by middleware.
- ///
- public MessageAction MessageAction { get; internal set; }
-
- }
-
- public class HandlingResultBuilder
- {
- private static readonly HandlingResult Default = new HandlingResult();
-
- private HandlingResult data = Default;
-
- public ConsumeResponse ConsumeResponse {
- get => data.ConsumeResponse;
- set => GetWritableData().ConsumeResponse = value;
- }
-
- public MessageAction MessageAction
- {
- get => data.MessageAction;
- set => GetWritableData().MessageAction = value;
- }
-
- public HandlingResult ToHandlingResult()
- {
- if (data == Default)
- {
- return new HandlingResult();
- }
- var result = GetWritableData();
- data = Default;
- return result;
- }
-
- private HandlingResult GetWritableData()
- {
- if (data == Default)
- {
- data = new HandlingResult();
- }
- return data;
- }
- }
-}
diff --git a/Tapeti/Helpers/ConsoleHelper.cs b/Tapeti/Helpers/ConsoleHelper.cs
index 0769de8..350947e 100644
--- a/Tapeti/Helpers/ConsoleHelper.cs
+++ b/Tapeti/Helpers/ConsoleHelper.cs
@@ -2,9 +2,17 @@
namespace Tapeti.Helpers
{
+ ///
+ /// Helper class for console applications.
+ ///
public static class ConsoleHelper
{
- // Source: http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console
+ ///
+ /// Determines if the application is running in a console.
+ ///
+ ///
+ /// Source: http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console
+ ///
public static bool IsAvailable()
{
try
diff --git a/Tapeti/Helpers/MiddlewareHelper.cs b/Tapeti/Helpers/MiddlewareHelper.cs
index a40e2bd..ba1158e 100644
--- a/Tapeti/Helpers/MiddlewareHelper.cs
+++ b/Tapeti/Helpers/MiddlewareHelper.cs
@@ -4,8 +4,18 @@ using System.Threading.Tasks;
namespace Tapeti.Helpers
{
+ ///
+ /// Helper class for executing the middleware pattern.
+ ///
public static class MiddlewareHelper
{
+ ///
+ /// Executes the chain of middleware synchronously, starting with the last item in the list.
+ ///
+ /// The list of middleware to run
+ /// Receives the middleware which should be called and a reference to the action which will call the next. Pass this on to the middleware.
+ /// The action to execute when the innermost middleware calls next.
+ ///
public static void Go(IReadOnlyList middleware, Action handle, Action lastHandler)
{
var handlerIndex = middleware?.Count - 1 ?? -1;
@@ -28,6 +38,13 @@ namespace Tapeti.Helpers
}
+ ///
+ /// Executes the chain of middleware asynchronously, starting with the last item in the list.
+ ///
+ /// The list of middleware to run
+ /// Receives the middleware which should be called and a reference to the action which will call the next. Pass this on to the middleware.
+ /// The action to execute when the innermost middleware calls next.
+ ///
public static async Task GoAsync(IReadOnlyList middleware, Func, Task> handle, Func lastHandler)
{
var handlerIndex = middleware?.Count - 1 ?? -1;
diff --git a/Tapeti/Helpers/TaskTypeHelper.cs b/Tapeti/Helpers/TaskTypeHelper.cs
index 416a7ba..44e0c99 100644
--- a/Tapeti/Helpers/TaskTypeHelper.cs
+++ b/Tapeti/Helpers/TaskTypeHelper.cs
@@ -3,8 +3,18 @@ using System.Threading.Tasks;
namespace Tapeti.Helpers
{
+ ///
+ /// Helper methods for working with synchronous and asynchronous versions of methods.
+ ///
public static class TaskTypeHelper
{
+ ///
+ /// Determines if the given type matches the predicate, taking Task types into account.
+ ///
+ ///
+ ///
+ ///
+ ///
public static bool IsTypeOrTaskOf(this Type type, Func predicate, out bool isTaskOf, out Type actualType)
{
if (type == typeof(Task))
@@ -32,11 +42,24 @@ namespace Tapeti.Helpers
}
+ ///
+ /// Determines if the given type matches the predicate, taking Task types into account.
+ ///
+ ///
+ ///
+ ///
public static bool IsTypeOrTaskOf(this Type type, Func predicate, out bool isTaskOf)
{
return IsTypeOrTaskOf(type, predicate, out isTaskOf, out _);
}
+
+ ///
+ /// Determines if the given type matches the compareTo type, taking Task types into account.
+ ///
+ ///
+ ///
+ ///
public static bool IsTypeOrTaskOf(this Type type, Type compareTo, out bool isTaskOf)
{
return IsTypeOrTaskOf(type, t => t == compareTo, out isTaskOf);
diff --git a/Tapeti/IConsumer.cs b/Tapeti/IConsumer.cs
index f8be17a..7204bc5 100644
--- a/Tapeti/IConsumer.cs
+++ b/Tapeti/IConsumer.cs
@@ -16,6 +16,6 @@ namespace Tapeti
/// Metadata included in the message
/// The raw body of the message
///
- Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body);
+ Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body);
}
}
diff --git a/Tapeti/IDependencyResolver.cs b/Tapeti/IDependencyResolver.cs
index 1862aa4..12870bf 100644
--- a/Tapeti/IDependencyResolver.cs
+++ b/Tapeti/IDependencyResolver.cs
@@ -7,24 +7,79 @@ namespace Tapeti
///
public interface IDependencyResolver
{
+ ///
+ /// Resolve an instance of T
+ ///
+ /// The type to instantiate
+ /// A new or singleton instance, depending on the registration
T Resolve() where T : class;
+
+ ///
+ /// Resolve an instance of T
+ ///
+ /// The type to instantiate
+ /// A new or singleton instance, depending on the registration
object Resolve(Type type);
}
+ ///
///
/// Allows registering controller classes into the IoC container. Also registers default implementations,
/// so that the calling application may override these.
///
+ ///
+ /// All implementations of IDependencyResolver should implement IDependencyContainer as well,
+ /// otherwise all registrations of Tapeti components will have to be done manually by the application.
+ ///
public interface IDependencyContainer : IDependencyResolver
{
+ ///
+ /// Registers a default implementation in the IoC container. If an alternative implementation
+ /// was registered before, it is not replaced.
+ ///
+ ///
+ ///
void RegisterDefault() where TService : class where TImplementation : class, TService;
+
+ ///
+ /// Registers a default implementation in the IoC container. If an alternative implementation
+ /// was registered before, it is not replaced.
+ ///
+ ///
+ ///
void RegisterDefault(Func factory) where TService : class;
+
+ ///
+ /// Registers a default singleton implementation in the IoC container. If an alternative implementation
+ /// was registered before, it is not replaced.
+ ///
+ ///
+ ///
void RegisterDefaultSingleton() where TService : class where TImplementation : class, TService;
+
+ ///
+ /// Registers a default singleton implementation in the IoC container. If an alternative implementation
+ /// was registered before, it is not replaced.
+ ///
+ ///
+ ///
void RegisterDefaultSingleton(TService instance) where TService : class;
+
+ ///
+ /// Registers a default singleton implementation in the IoC container. If an alternative implementation
+ /// was registered before, it is not replaced.
+ ///
+ ///
+ ///
void RegisterDefaultSingleton(Func factory) where TService : class;
+
+ ///
+ /// Registers a concrete controller class in the IoC container.
+ ///
+ ///
void RegisterController(Type type);
}
}
diff --git a/Tapeti/IExceptionStrategy.cs b/Tapeti/IExceptionStrategy.cs
index 979f454..5aeb5e1 100644
--- a/Tapeti/IExceptionStrategy.cs
+++ b/Tapeti/IExceptionStrategy.cs
@@ -2,14 +2,16 @@
namespace Tapeti
{
+ ///
+ /// Called when an exception occurs while handling a message. Determines how it should be handled.
+ ///
public interface IExceptionStrategy
{
///
/// Called when an exception occurs while handling a message.
///
/// The exception strategy context containing the necessary data including the message context and the thrown exception.
- /// Also the response to the message can be set.
- /// If there is any other handling of the message than the expected default than HandlingResult.MessageFutureAction must be set accordingly.
+ /// Also proivdes methods for the exception strategy to indicate how the message should be handled.
void HandleException(IExceptionStrategyContext context);
}
}
diff --git a/Tapeti/IExchangeStrategy.cs b/Tapeti/IExchangeStrategy.cs
index e7aaa7e..2878e71 100644
--- a/Tapeti/IExchangeStrategy.cs
+++ b/Tapeti/IExchangeStrategy.cs
@@ -2,8 +2,16 @@
namespace Tapeti
{
+ ///
+ /// Translates message classes into their target exchange.
+ ///
public interface IExchangeStrategy
{
+ ///
+ /// Determines the exchange belonging to the given message class.
+ ///
+ ///
+ ///
string GetExchange(Type messageType);
}
}
diff --git a/Tapeti/ILogger.cs b/Tapeti/ILogger.cs
index 8ec857d..cbba1c7 100644
--- a/Tapeti/ILogger.cs
+++ b/Tapeti/ILogger.cs
@@ -1,16 +1,46 @@
using System;
+using Tapeti.Config;
// ReSharper disable UnusedMember.Global
namespace Tapeti
{
- // This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog)
- // instead of only string-based logging without control over the output.
+ ///
+ /// Handles the logging of various events in Tapeti
+ ///
+ ///
+ /// This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog)
+ /// instead of only string-based logging without control over the output.
+ ///
public interface ILogger
{
- void Connect(TapetiConnectionParams connectionParams);
+ ///
+ /// Called before a connection to RabbitMQ is attempted.
+ ///
+ ///
+ /// Indicates whether this is the initial connection or a reconnect
+ void Connect(TapetiConnectionParams connectionParams, bool isReconnect);
+
+ ///
+ /// Called when the connection has failed or is lost.
+ ///
+ ///
+ ///
void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception);
- void ConnectSuccess(TapetiConnectionParams connectionParams);
- void HandlerException(Exception e);
+
+ ///
+ /// Called when a connection to RabbitMQ has been succesfully established.
+ ///
+ ///
+ /// Indicates whether this is the initial connection or a reconnect
+ void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect);
+
+ ///
+ /// Called when an exception occurs in a consumer.
+ ///
+ ///
+ ///
+ /// Indicates the action taken by the exception handler
+ void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult);
}
}
diff --git a/Tapeti/IRoutingKeyStrategy.cs b/Tapeti/IRoutingKeyStrategy.cs
index e13f287..db4ff14 100644
--- a/Tapeti/IRoutingKeyStrategy.cs
+++ b/Tapeti/IRoutingKeyStrategy.cs
@@ -2,8 +2,16 @@
namespace Tapeti
{
+ ///
+ /// Translates message classes into routing keys.
+ ///
public interface IRoutingKeyStrategy
{
+ ///
+ /// Determines the routing key for the given message class.
+ ///
+ ///
+ ///
string GetRoutingKey(Type messageType);
}
}
diff --git a/Tapeti/MessageAction.cs b/Tapeti/MessageAction.cs
new file mode 100644
index 0000000..a78793a
--- /dev/null
+++ b/Tapeti/MessageAction.cs
@@ -0,0 +1,29 @@
+// ReSharper disable UnusedMember.Global
+
+namespace Tapeti
+{
+ ///
+ /// Indicates how the message was handled.
+ ///
+ public enum MessageAction
+ {
+ ///
+ /// The message was handled succesfully.
+ ///
+ Success,
+
+ ///
+ /// There was an error while processing the message.
+ ///
+ Error,
+
+ ///
+ /// The message has been stored for republishing and will be delivered again
+ /// even if the current messages has been Acked or Nacked.
+ ///
+ ///
+ /// This option is for compatibility with external scheduler services that do not immediately requeue a message.
+ ///
+ ExternalRetry
+ }
+}
diff --git a/Tapeti/MessageFutureAction.cs b/Tapeti/MessageFutureAction.cs
deleted file mode 100644
index bc48049..0000000
--- a/Tapeti/MessageFutureAction.cs
+++ /dev/null
@@ -1,11 +0,0 @@
-// ReSharper disable UnusedMember.Global
-
-namespace Tapeti
-{
- public enum MessageAction
- {
- None = 1,
- ErrorLog = 2,
- Retry = 3,
- }
-}
diff --git a/Tapeti/TapetiAppSettingsConnectionParams.cs b/Tapeti/TapetiAppSettingsConnectionParams.cs
index e6312e5..9140f4a 100644
--- a/Tapeti/TapetiAppSettingsConnectionParams.cs
+++ b/Tapeti/TapetiAppSettingsConnectionParams.cs
@@ -4,17 +4,35 @@ using System.Linq;
namespace Tapeti
{
+ ///
+ ///
+ /// Implementation of TapetiConnectionParams which reads the values from the AppSettings.
+ ///
+ ///
+ ///
+ /// AppSettings keys
+ ///
+ /// rabbitmq:hostname
+ /// rabbitmq:port
+ /// rabbitmq:virtualhost
+ /// rabbitmq:username
+ /// rabbitmq:password
+ /// rabbitmq:prefetchcount
+ ///
public class TapetiAppSettingsConnectionParams : TapetiConnectionParams
{
- public const string DefaultPrefix = "rabbitmq:";
- public const string KeyHostname = "hostname";
- public const string KeyPort = "port";
- public const string KeyVirtualHost = "virtualhost";
- public const string KeyUsername = "username";
- public const string KeyPassword = "password";
- public const string KeyPrefetchCount = "prefetchcount";
+ private const string DefaultPrefix = "rabbitmq:";
+ private const string KeyHostname = "hostname";
+ private const string KeyPort = "port";
+ private const string KeyVirtualHost = "virtualhost";
+ private const string KeyUsername = "username";
+ private const string KeyPassword = "password";
+ private const string KeyPrefetchCount = "prefetchcount";
+ ///
+ ///
+ /// The prefix to apply to the keys. Defaults to "rabbitmq:"
public TapetiAppSettingsConnectionParams(string prefix = DefaultPrefix)
{
var keys = ConfigurationManager.AppSettings.AllKeys;