From 6c32665c8a4ab2ab9cfa75e1dc95b5c89171e8db Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 14 Aug 2019 12:20:53 +0200 Subject: [PATCH] [ci skip] Refactored how consume result is handled Reimplemented the exception strategy and logging Much XML documentation, such wow --- Tapeti.Annotations/DurableQueueAttribute.cs | 5 - Tapeti.Annotations/Tapeti.Annotations.csproj | 2 +- .../RequiredGuidAttribute.cs | 6 +- .../Tapeti.DataAnnotations.Extensions.csproj | 3 +- Tapeti.DataAnnotations/ConfigExtensions.cs | 15 +- ...dleware.cs => DataAnnotationsExtension.cs} | 8 +- .../DataAnnotationsMessageMiddleware.cs | 5 + .../DataAnnotationsPublishMiddleware.cs | 5 + .../Tapeti.DataAnnotations.csproj | 2 +- Tapeti.Flow.SQL/ConfigExtensions.cs | 17 +- .../SqlConnectionFlowRepository.cs | 32 ++-- Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj | 2 +- Tapeti.Flow/ConfigExtensions.cs | 6 +- Tapeti.Flow/Default/FlowMiddleware.cs | 8 +- Tapeti.Flow/Default/FlowStarter.cs | 14 +- Tapeti.Serilog/Tapeti.Serilog.csproj | 2 +- Tapeti.Serilog/TapetiSeriLogger.cs | 61 +++++-- .../SimpleInjectorDependencyResolver.cs | 15 ++ .../Tapeti.SimpleInjector.csproj | 2 +- ...onfigExtentions.cs => ConfigExtensions.cs} | 2 +- Tapeti/Config/IControllerCleanupMiddleware.cs | 4 +- Tapeti/Config/IExceptionStrategyContext.cs | 16 +- Tapeti/Config/IPublishMiddleware.cs | 8 + Tapeti/Connection/TapetiBasicConsumer.cs | 6 +- Tapeti/Connection/TapetiClient.cs | 20 +-- Tapeti/Connection/TapetiConsumer.cs | 158 +++++++----------- Tapeti/ConsumeResponse.cs | 23 --- Tapeti/ConsumeResult.cs | 33 ++++ Tapeti/Default/ConsoleLogger.cs | 34 +++- Tapeti/Default/ControllerMessageContext.cs | 2 +- Tapeti/Default/DevNullLogger.cs | 15 +- Tapeti/Default/ExceptionStrategyContext.cs | 30 +++- Tapeti/Default/NackExceptionStrategy.cs | 7 +- .../Default/NamespaceMatchExchangeStrategy.cs | 11 +- Tapeti/Default/RabbitMQMessageProperties.cs | 3 + Tapeti/Default/RequeueExceptionStrategy.cs | 16 +- Tapeti/Default/TypeNameRoutingKeyStrategy.cs | 13 ++ Tapeti/Exceptions/NackException.cs | 5 + Tapeti/Exceptions/NoRouteException.cs | 5 + Tapeti/HandlingResult.cs | 63 ------- Tapeti/Helpers/ConsoleHelper.cs | 10 +- Tapeti/Helpers/MiddlewareHelper.cs | 17 ++ Tapeti/Helpers/TaskTypeHelper.cs | 23 +++ Tapeti/IConsumer.cs | 2 +- Tapeti/IDependencyResolver.cs | 55 ++++++ Tapeti/IExceptionStrategy.cs | 6 +- Tapeti/IExchangeStrategy.cs | 8 + Tapeti/ILogger.cs | 40 ++++- Tapeti/IRoutingKeyStrategy.cs | 8 + Tapeti/MessageAction.cs | 29 ++++ Tapeti/MessageFutureAction.cs | 11 -- Tapeti/TapetiAppSettingsConnectionParams.cs | 32 +++- 52 files changed, 615 insertions(+), 310 deletions(-) rename Tapeti.DataAnnotations/{DataAnnotationsMiddleware.cs => DataAnnotationsExtension.cs} (67%) rename Tapeti.Transient/{ConfigExtentions.cs => ConfigExtensions.cs} (91%) delete mode 100644 Tapeti/ConsumeResponse.cs create mode 100644 Tapeti/ConsumeResult.cs delete mode 100644 Tapeti/HandlingResult.cs create mode 100644 Tapeti/MessageAction.cs delete mode 100644 Tapeti/MessageFutureAction.cs diff --git a/Tapeti.Annotations/DurableQueueAttribute.cs b/Tapeti.Annotations/DurableQueueAttribute.cs index 8971044..ae99278 100644 --- a/Tapeti.Annotations/DurableQueueAttribute.cs +++ b/Tapeti.Annotations/DurableQueueAttribute.cs @@ -8,11 +8,6 @@ namespace Tapeti.Annotations /// Binds to an existing durable queue to receive messages. Can be used /// on an entire MessageController class or on individual methods. /// - /// - /// At the moment there is no support for creating a durable queue and managing the - /// bindings. The author recommends https://git.x2software.net/pub/RabbitMetaQueue - /// for deploy-time management of durable queues (shameless plug intended). - /// [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)] public class DurableQueueAttribute : Attribute diff --git a/Tapeti.Annotations/Tapeti.Annotations.csproj b/Tapeti.Annotations/Tapeti.Annotations.csproj index f584c07..8ede527 100644 --- a/Tapeti.Annotations/Tapeti.Annotations.csproj +++ b/Tapeti.Annotations/Tapeti.Annotations.csproj @@ -6,7 +6,7 @@ - 1701;1702;1591 + 1701;1702 diff --git a/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs b/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs index cef8d1e..b706ccd 100644 --- a/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs +++ b/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs @@ -1,9 +1,11 @@ using System; using System.ComponentModel.DataAnnotations; -using System.Globalization; + +// ReSharper disable UnusedMember.Global namespace Tapeti.DataAnnotations.Extensions { + /// /// /// Can be used on Guid fields which are supposed to be Required, as the Required attribute does /// not work for Guids and making them Nullable is counter-intuitive. @@ -13,10 +15,12 @@ namespace Tapeti.DataAnnotations.Extensions private const string DefaultErrorMessage = "'{0}' does not contain a valid guid"; private const string InvalidTypeErrorMessage = "'{0}' is not of type Guid"; + /// public RequiredGuidAttribute() : base(DefaultErrorMessage) { } + /// protected override ValidationResult IsValid(object value, ValidationContext validationContext) { if (value == null) diff --git a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj index 6ad9eab..8855449 100644 --- a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj +++ b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj @@ -2,10 +2,11 @@ netstandard2.0 + true - 1701;1702;1591 + 1701;1702 diff --git a/Tapeti.DataAnnotations/ConfigExtensions.cs b/Tapeti.DataAnnotations/ConfigExtensions.cs index 3001fe9..72a3cfb 100644 --- a/Tapeti.DataAnnotations/ConfigExtensions.cs +++ b/Tapeti.DataAnnotations/ConfigExtensions.cs @@ -1,10 +1,19 @@ -namespace Tapeti.DataAnnotations +using Tapeti.Config; + +namespace Tapeti.DataAnnotations { + /// + /// Extends ITapetiConfigBuilder to enable DataAnnotations. + /// public static class ConfigExtensions { - public static TapetiConfig WithDataAnnotations(this TapetiConfig config) + /// + /// Enables the DataAnnotations validation middleware. + /// + /// + public static ITapetiConfigBuilder WithDataAnnotations(this ITapetiConfigBuilder config) { - config.Use(new DataAnnotationsMiddleware()); + config.Use(new DataAnnotationsExtension()); return config; } } diff --git a/Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs b/Tapeti.DataAnnotations/DataAnnotationsExtension.cs similarity index 67% rename from Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs rename to Tapeti.DataAnnotations/DataAnnotationsExtension.cs index ffbaac4..abdbc5c 100644 --- a/Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs +++ b/Tapeti.DataAnnotations/DataAnnotationsExtension.cs @@ -3,12 +3,18 @@ using Tapeti.Config; namespace Tapeti.DataAnnotations { - public class DataAnnotationsMiddleware : ITapetiExtension + /// + /// + /// Provides the DataAnnotations validation middleware. + /// + public class DataAnnotationsExtension : ITapetiExtension { + /// public void RegisterDefaults(IDependencyContainer container) { } + /// public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) { return new object[] diff --git a/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs b/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs index 3228671..5ac4e98 100644 --- a/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs +++ b/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs @@ -5,8 +5,13 @@ using Tapeti.Config; namespace Tapeti.DataAnnotations { + /// + /// + /// Validates consumed messages using System.ComponentModel.DataAnnotations + /// public class DataAnnotationsMessageMiddleware : IMessageMiddleware { + /// public Task Handle(IMessageContext context, Func next) { var validationContext = new ValidationContext(context.Message); diff --git a/Tapeti.DataAnnotations/DataAnnotationsPublishMiddleware.cs b/Tapeti.DataAnnotations/DataAnnotationsPublishMiddleware.cs index f3d70b4..4d01777 100644 --- a/Tapeti.DataAnnotations/DataAnnotationsPublishMiddleware.cs +++ b/Tapeti.DataAnnotations/DataAnnotationsPublishMiddleware.cs @@ -5,8 +5,13 @@ using Tapeti.Config; namespace Tapeti.DataAnnotations { + /// + /// + /// Validates published messages using System.ComponentModel.DataAnnotations + /// public class DataAnnotationsPublishMiddleware : IPublishMiddleware { + /// public Task Handle(IPublishContext context, Func next) { var validationContext = new ValidationContext(context.Message); diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj index 2237cd1..834acf7 100644 --- a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj +++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj @@ -6,7 +6,7 @@ - 1701;1702;1591 + 1701;1702 diff --git a/Tapeti.Flow.SQL/ConfigExtensions.cs b/Tapeti.Flow.SQL/ConfigExtensions.cs index 2e2e247..ef285d9 100644 --- a/Tapeti.Flow.SQL/ConfigExtensions.cs +++ b/Tapeti.Flow.SQL/ConfigExtensions.cs @@ -5,23 +5,32 @@ using Tapeti.Config; namespace Tapeti.Flow.SQL { + /// + /// Extends ITapetiConfigBuilder to enable Flow SQL. + /// public static class ConfigExtensions { - public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config, string connectionString, string tableName = "Flow") + /// + /// Enables the Flow SQL repository. + /// + /// + /// + /// + public static ITapetiConfigBuilder WithFlowSqlRepository(this ITapetiConfigBuilder config, string connectionString, string tableName = "Flow") { - config.Use(new FlowSqlRepositoryBundle(connectionString, tableName)); + config.Use(new FlowSqlRepositoryExtension(connectionString, tableName)); return config; } } - internal class FlowSqlRepositoryBundle : ITapetiExtension + internal class FlowSqlRepositoryExtension : ITapetiExtension { private readonly string connectionString; private readonly string tableName; - public FlowSqlRepositoryBundle(string connectionString, string tableName) + public FlowSqlRepositoryExtension(string connectionString, string tableName) { this.connectionString = connectionString; this.tableName = tableName; diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs index d32b645..07c045b 100644 --- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs +++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs @@ -7,25 +7,27 @@ using Newtonsoft.Json; namespace Tapeti.Flow.SQL { - /* - Assumes the following table layout (table name configurable and may include schema): - - - create table Flow - ( - FlowID uniqueidentifier not null, - CreationTime datetime2(3) not null, - StateJson nvarchar(max) null, - - constraint PK_Flow primary key clustered (FlowID) - ); - */ + /// + /// IFlowRepository implementation for SQL server. + /// + /// + /// Assumes the following table layout (table name configurable and may include schema): + /// + /// create table Flow + /// ( + /// FlowID uniqueidentifier not null, + /// CreationTime datetime2(3) not null, + /// StateJson nvarchar(max) null, + /// constraint PK_Flow primary key clustered(FlowID) + /// ); + /// public class SqlConnectionFlowRepository : IFlowRepository { private readonly string connectionString; private readonly string tableName; + /// public SqlConnectionFlowRepository(string connectionString, string tableName = "Flow") { this.connectionString = connectionString; @@ -33,6 +35,7 @@ namespace Tapeti.Flow.SQL } + /// public async Task>> GetStates() { using (var connection = await GetConnection()) @@ -56,6 +59,7 @@ namespace Tapeti.Flow.SQL } + /// public async Task CreateState(Guid flowID, T state, DateTime timestamp) { using (var connection = await GetConnection()) @@ -76,6 +80,7 @@ namespace Tapeti.Flow.SQL } } + /// public async Task UpdateState(Guid flowID, T state) { using (var connection = await GetConnection()) @@ -92,6 +97,7 @@ namespace Tapeti.Flow.SQL } } + /// public async Task DeleteState(Guid flowID) { using (var connection = await GetConnection()) diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj index 51ce58a..2da63e5 100644 --- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj +++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj @@ -6,7 +6,7 @@ - 1701;1702;1591 + 1701;1702 diff --git a/Tapeti.Flow/ConfigExtensions.cs b/Tapeti.Flow/ConfigExtensions.cs index 127a0c2..e016172 100644 --- a/Tapeti.Flow/ConfigExtensions.cs +++ b/Tapeti.Flow/ConfigExtensions.cs @@ -1,8 +1,10 @@ -namespace Tapeti.Flow +using Tapeti.Config; + +namespace Tapeti.Flow { public static class ConfigExtensions { - public static TapetiConfig WithFlow(this TapetiConfig config, IFlowRepository flowRepository = null) + public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository flowRepository = null) { config.Use(new FlowMiddleware(flowRepository)); return config; diff --git a/Tapeti.Flow/Default/FlowMiddleware.cs b/Tapeti.Flow/Default/FlowMiddleware.cs index c90332a..1dd1571 100644 --- a/Tapeti.Flow/Default/FlowMiddleware.cs +++ b/Tapeti.Flow/Default/FlowMiddleware.cs @@ -44,7 +44,7 @@ namespace Tapeti.Flow.Default } - public async Task Cleanup(IControllerMessageContext context, HandlingResult handlingResult, Func next) + public async Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next) { await next(); @@ -53,11 +53,9 @@ namespace Tapeti.Flow.Default if (flowContext?.FlowStateLock != null) { - if (handlingResult.ConsumeResponse == ConsumeResponse.Nack - || handlingResult.MessageAction == MessageAction.ErrorLog) - { + if (consumeResult == ConsumeResult.Error) await flowContext.FlowStateLock.DeleteFlowState(); - } + flowContext.FlowStateLock.Dispose(); } } diff --git a/Tapeti.Flow/Default/FlowStarter.cs b/Tapeti.Flow/Default/FlowStarter.cs index ab8a152..04b4dc9 100644 --- a/Tapeti.Flow/Default/FlowStarter.cs +++ b/Tapeti.Flow/Default/FlowStarter.cs @@ -42,7 +42,7 @@ namespace Tapeti.Flow.Default } - private async Task CallControllerMethod(MethodInfo method, Func> getYieldPointResult, object[] parameters) where TController : class + private async Task CallControllerMethod(MethodBase method, Func> getYieldPointResult, object[] parameters) where TController : class { var controller = config.DependencyResolver.Resolve(); var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters)); @@ -55,24 +55,20 @@ namespace Tapeti.Flow.Default var flowHandler = config.DependencyResolver.Resolve(); - HandlingResultBuilder handlingResult = new HandlingResultBuilder - { - ConsumeResponse = ConsumeResponse.Nack, - }; try { await flowHandler.Execute(context, yieldPoint); - handlingResult.ConsumeResponse = ConsumeResponse.Ack; + //handlingResult.ConsumeResponse = ConsumeResponse.Ack; } finally { - await RunCleanup(context, handlingResult.ToHandlingResult()); + //await RunCleanup(context, handlingResult.ToHandlingResult()); } } + /* private async Task RunCleanup(MessageContext context, HandlingResult handlingResult) { - /* foreach (var handler in config.CleanupMiddleware) { try @@ -84,8 +80,8 @@ namespace Tapeti.Flow.Default logger.HandlerException(eCleanup); } } - */ } + */ private static MethodInfo GetExpressionMethod(Expression>> methodSelector) diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj index d0780ff..dbe7a51 100644 --- a/Tapeti.Serilog/Tapeti.Serilog.csproj +++ b/Tapeti.Serilog/Tapeti.Serilog.csproj @@ -6,7 +6,7 @@ - 1701;1702;1591 + 1701;1702 diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs index 5968966..ae168f1 100644 --- a/Tapeti.Serilog/TapetiSeriLogger.cs +++ b/Tapeti.Serilog/TapetiSeriLogger.cs @@ -1,27 +1,39 @@ using System; -using ISeriLogger = Serilog.ILogger; +using Tapeti.Config; +using ISerilogLogger = Serilog.ILogger; // ReSharper disable UnusedMember.Global namespace Tapeti.Serilog { + /// + /// + /// Implements the Tapeti ILogger interface for Serilog output. + /// public class TapetiSeriLogger: ILogger { - private readonly ISeriLogger seriLogger; + private readonly ISerilogLogger seriLogger; - public TapetiSeriLogger(ISeriLogger seriLogger) + + /// + public TapetiSeriLogger(ISerilogLogger seriLogger) { this.seriLogger = seriLogger; } - public void Connect(TapetiConnectionParams connectionParams) + + /// + public void Connect(TapetiConnectionParams connectionParams, bool isReconnect) { - seriLogger.Information("Tapeti: trying to connect to {host}:{port}/{virtualHost}", - connectionParams.HostName, - connectionParams.Port, - connectionParams.VirtualHost); + seriLogger + .ForContext("isReconnect", isReconnect) + .Information("Tapeti: trying to connect to {host}:{port}/{virtualHost}", + connectionParams.HostName, + connectionParams.Port, + connectionParams.VirtualHost); } + /// public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception) { seriLogger.Error(exception, "Tapeti: could not connect to {host}:{port}/{virtualHost}", @@ -30,17 +42,34 @@ namespace Tapeti.Serilog connectionParams.VirtualHost); } - public void ConnectSuccess(TapetiConnectionParams connectionParams) + /// + public void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect) { - seriLogger.Information("Tapeti: successfully connected to {host}:{port}/{virtualHost}", - connectionParams.HostName, - connectionParams.Port, - connectionParams.VirtualHost); + seriLogger + .ForContext("isReconnect", isReconnect) + .Information("Tapeti: successfully connected to {host}:{port}/{virtualHost}", + connectionParams.HostName, + connectionParams.Port, + connectionParams.VirtualHost); } - - public void HandlerException(Exception e) + + /// + public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult) { - seriLogger.Error(e, "Tapeti: exception in message handler"); + var contextLogger = seriLogger + .ForContext("consumeResult", consumeResult) + .ForContext("exchange", messageContext.Exchange) + .ForContext("queue", messageContext.Queue) + .ForContext("routingKey", messageContext.RoutingKey); + + if (messageContext is IControllerMessageContext controllerMessageContext) + { + contextLogger = contextLogger + .ForContext("controller", controllerMessageContext.Binding.Controller.FullName) + .ForContext("method", controllerMessageContext.Binding.Method.Name); + } + + contextLogger.Error(exception, "Tapeti: exception in message handler"); } } } diff --git a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs index 7daacaf..1cfbaef 100644 --- a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs +++ b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs @@ -4,12 +4,17 @@ using SimpleInjector; namespace Tapeti.SimpleInjector { + /// + /// + /// Dependency resolver and container implementation for SimpleInjector. + /// public class SimpleInjectorDependencyResolver : IDependencyContainer { private readonly Container container; private readonly Lifestyle defaultsLifestyle; private readonly Lifestyle controllersLifestyle; + /// public SimpleInjectorDependencyResolver(Container container, Lifestyle defaultsLifestyle = null, Lifestyle controllersLifestyle = null) { this.container = container; @@ -17,17 +22,21 @@ namespace Tapeti.SimpleInjector this.controllersLifestyle = controllersLifestyle; } + + /// public T Resolve() where T : class { return container.GetInstance(); } + /// public object Resolve(Type type) { return container.GetInstance(type); } + /// public void RegisterDefault() where TService : class where TImplementation : class, TService { if (!CanRegisterDefault()) @@ -39,6 +48,7 @@ namespace Tapeti.SimpleInjector container.Register(); } + /// public void RegisterDefault(Func factory) where TService : class { if (!CanRegisterDefault()) @@ -50,24 +60,29 @@ namespace Tapeti.SimpleInjector container.Register(factory); } + + /// public void RegisterDefaultSingleton() where TService : class where TImplementation : class, TService { if (CanRegisterDefault()) container.RegisterSingleton(); } + /// public void RegisterDefaultSingleton(TService instance) where TService : class { if (CanRegisterDefault()) container.RegisterInstance(instance); } + /// public void RegisterDefaultSingleton(Func factory) where TService : class { if (CanRegisterDefault()) container.RegisterSingleton(factory); } + /// public void RegisterController(Type type) { if (controllersLifestyle != null) diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj index 6378431..436911d 100644 --- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj @@ -6,7 +6,7 @@ - 1701;1702;1591 + 1701;1702 diff --git a/Tapeti.Transient/ConfigExtentions.cs b/Tapeti.Transient/ConfigExtensions.cs similarity index 91% rename from Tapeti.Transient/ConfigExtentions.cs rename to Tapeti.Transient/ConfigExtensions.cs index fd62748..aba7641 100644 --- a/Tapeti.Transient/ConfigExtentions.cs +++ b/Tapeti.Transient/ConfigExtensions.cs @@ -4,7 +4,7 @@ using Tapeti.Config; namespace Tapeti.Transient { /// - /// TapetiConfig extension to register Tapeti.Transient + /// ITapetiConfigBuilder extension to register Tapeti.Transient /// public static class ConfigExtensions { diff --git a/Tapeti/Config/IControllerCleanupMiddleware.cs b/Tapeti/Config/IControllerCleanupMiddleware.cs index 3bbd0ee..fc6c686 100644 --- a/Tapeti/Config/IControllerCleanupMiddleware.cs +++ b/Tapeti/Config/IControllerCleanupMiddleware.cs @@ -13,8 +13,8 @@ namespace Tapeti.Config /// Called after the message handler method, even if exceptions occured. /// /// - /// + /// /// Always call to allow the next in the chain to clean up - Task Cleanup(IControllerMessageContext context, HandlingResult handlingResult, Func next); + Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next); } } diff --git a/Tapeti/Config/IExceptionStrategyContext.cs b/Tapeti/Config/IExceptionStrategyContext.cs index 4aae1fd..e418a96 100644 --- a/Tapeti/Config/IExceptionStrategyContext.cs +++ b/Tapeti/Config/IExceptionStrategyContext.cs @@ -4,12 +4,26 @@ namespace Tapeti.Config { + /// + /// Provides access to information about the message being consumed. + /// Allows the strategy to determine how the exception should be handled. + /// public interface IExceptionStrategyContext { + /// + /// Provides access to the message context. + /// IMessageContext MessageContext { get; } + /// + /// Contains the exception being handled. + /// Exception Exception { get; } - HandlingResultBuilder HandlingResult { get; set; } + /// + /// Determines how the message has been handled. Defaults to Error. + /// + /// + void SetConsumeResult(ConsumeResult consumeResult); } } diff --git a/Tapeti/Config/IPublishMiddleware.cs b/Tapeti/Config/IPublishMiddleware.cs index 9a0eccc..c8069e3 100644 --- a/Tapeti/Config/IPublishMiddleware.cs +++ b/Tapeti/Config/IPublishMiddleware.cs @@ -3,8 +3,16 @@ using System.Threading.Tasks; namespace Tapeti.Config { + /// + /// Denotes middleware that processes all published messages. + /// public interface IPublishMiddleware { + /// + /// Called when a message is published. + /// + /// + /// Call to pass the message to the next handler in the chain Task Handle(IPublishContext context, Func next); } } diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs index 80b30ff..016e022 100644 --- a/Tapeti/Connection/TapetiBasicConsumer.cs +++ b/Tapeti/Connection/TapetiBasicConsumer.cs @@ -12,11 +12,11 @@ namespace Tapeti.Connection public class TapetiBasicConsumer : DefaultBasicConsumer { private readonly IConsumer consumer; - private readonly Func onRespond; + private readonly Func onRespond; /// - public TapetiBasicConsumer(IConsumer consumer, Func onRespond) + public TapetiBasicConsumer(IConsumer consumer, Func onRespond) { this.consumer = consumer; this.onRespond = onRespond; @@ -35,7 +35,7 @@ namespace Tapeti.Connection } catch { - await onRespond(deliveryTag, ConsumeResponse.Nack); + await onRespond(deliveryTag, ConsumeResult.Error); } }); } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 35ca29f..c8b3bcd 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -5,7 +5,6 @@ using System.Net; using System.Net.Http; using System.Threading; using System.Threading.Tasks; -using System.Web; using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -50,7 +49,7 @@ namespace Tapeti.Connection private IModel channelInstance; private ulong lastDeliveryTag; private DateTime connectedDateTime; - private HttpClient managementClient; + private readonly HttpClient managementClient; // These fields must be locked, since the callbacks for BasicAck/BasicReturn can run in a different thread private readonly object confirmLock = new object(); @@ -186,28 +185,29 @@ namespace Tapeti.Connection } - private async Task Respond(ulong deliveryTag, ConsumeResponse response) + private async Task Respond(ulong deliveryTag, ConsumeResult result) { await taskQueue.Value.Add(() => { // No need for a retryable channel here, if the connection is lost we can't // use the deliveryTag anymore. - switch (response) + switch (result) { - case ConsumeResponse.Ack: + case ConsumeResult.Success: + case ConsumeResult.ExternalRequeue: GetChannel().BasicAck(deliveryTag, false); break; - case ConsumeResponse.Nack: + case ConsumeResult.Error: GetChannel().BasicNack(deliveryTag, false, false); break; - case ConsumeResponse.Requeue: + case ConsumeResult.Requeue: GetChannel().BasicNack(deliveryTag, false, true); break; default: - throw new ArgumentOutOfRangeException(nameof(response), response, null); + throw new ArgumentOutOfRangeException(nameof(result), result, null); } }); @@ -454,7 +454,7 @@ namespace Tapeti.Connection { try { - logger.Connect(connectionParams); + logger.Connect(connectionParams, isReconnect); connection = connectionFactory.CreateConnection(); channelInstance = connection.CreateModel(); @@ -510,7 +510,7 @@ namespace Tapeti.Connection else ConnectionEventListener?.Connected(); - logger.ConnectSuccess(connectionParams); + logger.ConnectSuccess(connectionParams, isReconnect); isReconnect = true; break; diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index fff74ef..ff3ea78 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Runtime.ExceptionServices; using Tapeti.Config; using Tapeti.Default; using System.Threading.Tasks; @@ -37,106 +38,49 @@ namespace Tapeti.Connection /// - public async Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body) + public async Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body) { + object message = null; try { - var message = messageSerializer.Deserialize(body, properties); + message = messageSerializer.Deserialize(body, properties); if (message == null) - throw new ArgumentException($"Message body could not be deserialized into a message object in queue {queueName}", nameof(body)); + throw new ArgumentException("Message body could not be deserialized into a message object", nameof(body)); - await DispatchMessage(message, new MessageContextData + return await DispatchMessage(message, new MessageContextData { Exchange = exchange, RoutingKey = routingKey, Properties = properties }); - - return ConsumeResponse.Ack; } - catch (Exception e) + catch (Exception dispatchException) { - // TODO exception strategy - // TODO logger - return ConsumeResponse.Nack; - } + // TODO check if this is still necessary: + // var exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch)); - - /* - - handlingResult = new HandlingResult - { - ConsumeResponse = ConsumeResponse.Ack, - MessageAction = MessageAction.None - }; - } - catch (Exception eDispatch) + using (var emptyContext = new MessageContext { - var exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch)); - logger.HandlerException(eDispatch); - try - { - var exceptionStrategyContext = new ExceptionStrategyContext(context, exception.SourceException); - - exceptionStrategy.HandleException(exceptionStrategyContext); - - handlingResult = exceptionStrategyContext.HandlingResult.ToHandlingResult(); - } - catch (Exception eStrategy) - { - logger.HandlerException(eStrategy); - } - } - try + Config = config, + Queue = queueName, + Exchange = exchange, + RoutingKey = routingKey, + Message = message, + Properties = properties, + Binding = null + }) { - if (handlingResult == null) - { - handlingResult = new HandlingResult - { - ConsumeResponse = ConsumeResponse.Nack, - MessageAction = MessageAction.None - }; - } - await RunCleanup(context, handlingResult); - } - catch (Exception eCleanup) - { - logger.HandlerException(eCleanup); + var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException); + HandleException(exceptionContext); + return exceptionContext.ConsumeResult; } } - finally - { - try - { - if (handlingResult == null) - { - handlingResult = new HandlingResult - { - ConsumeResponse = ConsumeResponse.Nack, - MessageAction = MessageAction.None - }; - } - await client.Respond(deliveryTag, handlingResult.ConsumeResponse); - } - catch (Exception eRespond) - { - logger.HandlerException(eRespond); - } - try - { - context?.Dispose(); - } - catch (Exception eDispose) - { - logger.HandlerException(eDispose); - } - } - */ } - private async Task DispatchMessage(object message, MessageContextData messageContextData) + private async Task DispatchMessage(object message, MessageContextData messageContextData) { + var returnResult = ConsumeResult.Success; var messageType = message.GetType(); var validMessageType = false; @@ -145,18 +89,23 @@ namespace Tapeti.Connection if (!binding.Accept(messageType)) continue; - await InvokeUsingBinding(message, messageContextData, binding); + var consumeResult = await InvokeUsingBinding(message, messageContextData, binding); validMessageType = true; + + if (consumeResult != ConsumeResult.Success) + returnResult = consumeResult; } if (!validMessageType) - throw new ArgumentException($"Unsupported message type in queue {queueName}: {message.GetType().FullName}"); + throw new ArgumentException($"No binding found for message type: {message.GetType().FullName}"); + + return returnResult; } - private async Task InvokeUsingBinding(object message, MessageContextData messageContextData, IBinding binding) + private async Task InvokeUsingBinding(object message, MessageContextData messageContextData, IBinding binding) { - var context = new MessageContext + using (var context = new MessageContext { Config = config, Queue = queueName, @@ -165,21 +114,44 @@ namespace Tapeti.Connection Message = message, Properties = messageContextData.Properties, Binding = binding - }; + }) + { + try + { + await MiddlewareHelper.GoAsync(config.Middleware.Message, + (handler, next) => handler.Handle(context, next), + async () => { await binding.Invoke(context); }); - try - { - await MiddlewareHelper.GoAsync(config.Middleware.Message, - (handler, next) => handler.Handle(context, next), - async () => { await binding.Invoke(context); }); - } - finally - { - context.Dispose(); + return ConsumeResult.Success; + } + catch (Exception invokeException) + { + var exceptionContext = new ExceptionStrategyContext(context, invokeException); + HandleException(exceptionContext); + return exceptionContext.ConsumeResult; + } } } + private void HandleException(ExceptionStrategyContext exceptionContext) + { + try + { + exceptionStrategy.HandleException(exceptionContext); + } + catch (Exception strategyException) + { + // Exception in the exception strategy. Oh dear. + exceptionContext.SetConsumeResult(ConsumeResult.Error); + logger.ConsumeException(strategyException, exceptionContext.MessageContext, ConsumeResult.Error); + } + + logger.ConsumeException(exceptionContext.Exception, exceptionContext.MessageContext, exceptionContext.ConsumeResult); + } + + + private struct MessageContextData { public string Exchange; diff --git a/Tapeti/ConsumeResponse.cs b/Tapeti/ConsumeResponse.cs deleted file mode 100644 index d539170..0000000 --- a/Tapeti/ConsumeResponse.cs +++ /dev/null @@ -1,23 +0,0 @@ -namespace Tapeti -{ - /// - /// Determines the response sent back after handling a message. - /// - public enum ConsumeResponse - { - /// - /// Acknowledge the message and remove it from the queue - /// - Ack, - - /// - /// Negatively acknowledge the message and remove it from the queue, send to dead-letter queue if configured on the bus - /// - Nack, - - /// - /// Negatively acknowledge the message and put it back in the queue to try again later - /// - Requeue - } -} diff --git a/Tapeti/ConsumeResult.cs b/Tapeti/ConsumeResult.cs new file mode 100644 index 0000000..e31d274 --- /dev/null +++ b/Tapeti/ConsumeResult.cs @@ -0,0 +1,33 @@ +namespace Tapeti +{ + /// + /// Determines how the message has been handled and the response given to the message bus. + /// + public enum ConsumeResult + { + /// + /// Acknowledge the message and remove it from the queue. + /// + Success, + + /// + /// Negatively acknowledge the message and remove it from the queue, send to dead-letter queue if configured on the bus. + /// + Error, + + /// + /// Negatively acknowledge the message and put it back in the queue to try again later. + /// + Requeue, + + /// + /// The message has been stored for republishing and will be delivered again by some other means. + /// It will be acknowledged and removed from the queue as if succesful. + /// + /// + /// This option is for compatibility with external scheduler services. The exception strategy must guarantee that the + /// message will eventually be republished. + /// + ExternalRequeue + } +} diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index a00f8d2..d42f46c 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -1,27 +1,49 @@ using System; +using Tapeti.Config; namespace Tapeti.Default { + /// + /// + /// Default ILogger implementation for console applications. + /// public class ConsoleLogger : ILogger { - public void Connect(TapetiConnectionParams connectionParams) + /// + public void Connect(TapetiConnectionParams connectionParams, bool isReconnect) { - Console.WriteLine($"[Tapeti] Connecting to {connectionParams.HostName}:{connectionParams.Port}{connectionParams.VirtualHost}"); + Console.WriteLine($"[Tapeti] {(isReconnect ? "Reconnecting" : "Connecting")} to {connectionParams.HostName}:{connectionParams.Port}{connectionParams.VirtualHost}"); } + /// public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception) { Console.WriteLine($"[Tapeti] Connection failed: {exception}"); } - public void ConnectSuccess(TapetiConnectionParams connectionParams) + /// + public void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect) { - Console.WriteLine("[Tapeti] Connected"); + Console.WriteLine($"[Tapeti] {(isReconnect ? "Reconnected" : "Connected")}"); } - public void HandlerException(Exception e) + /// + public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult) { - Console.WriteLine(e.ToString()); + Console.WriteLine("[Tapeti] Exception while handling message"); + Console.WriteLine($" Result : {consumeResult}"); + Console.WriteLine($" Exchange : {messageContext.Exchange}"); + Console.WriteLine($" Queue : {messageContext.Queue}"); + Console.WriteLine($" RoutingKey : {messageContext.RoutingKey}"); + + if (messageContext is IControllerMessageContext controllerMessageContext) + { + Console.WriteLine($" Controller : {controllerMessageContext.Binding.Controller.FullName}"); + Console.WriteLine($" Method : {controllerMessageContext.Binding.Method.Name}"); + } + + Console.WriteLine(); + Console.WriteLine(exception); } } } diff --git a/Tapeti/Default/ControllerMessageContext.cs b/Tapeti/Default/ControllerMessageContext.cs index 4b29a07..4fc410a 100644 --- a/Tapeti/Default/ControllerMessageContext.cs +++ b/Tapeti/Default/ControllerMessageContext.cs @@ -7,7 +7,7 @@ namespace Tapeti.Default /// public class ControllerMessageContext : MessageContext, IControllerMessageContext { - private Dictionary items = new Dictionary(); + private readonly Dictionary items = new Dictionary(); /// diff --git a/Tapeti/Default/DevNullLogger.cs b/Tapeti/Default/DevNullLogger.cs index df7952f..5e247ea 100644 --- a/Tapeti/Default/DevNullLogger.cs +++ b/Tapeti/Default/DevNullLogger.cs @@ -1,22 +1,31 @@ using System; +using Tapeti.Config; namespace Tapeti.Default { + /// + /// + /// Default ILogger implementation which does not log anything. + /// public class DevNullLogger : ILogger { - public void Connect(TapetiConnectionParams connectionParams) + /// + public void Connect(TapetiConnectionParams connectionParams, bool isReconnect) { } + /// public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception) { } - public void ConnectSuccess(TapetiConnectionParams connectionParams) + /// + public void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect) { } - public void HandlerException(Exception e) + /// + public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult) { } } diff --git a/Tapeti/Default/ExceptionStrategyContext.cs b/Tapeti/Default/ExceptionStrategyContext.cs index 89280ee..bcd163e 100644 --- a/Tapeti/Default/ExceptionStrategyContext.cs +++ b/Tapeti/Default/ExceptionStrategyContext.cs @@ -3,23 +3,37 @@ using Tapeti.Config; namespace Tapeti.Default { + /// + /// + /// Default implementation of IExceptionStrategyContext. + /// public class ExceptionStrategyContext : IExceptionStrategyContext { - internal ExceptionStrategyContext(IMessageContext messageContext, Exception exception) + /// + /// The ConsumeResult as set by the exception strategy. Defaults to Error. + /// + public ConsumeResult ConsumeResult { get; set; } = ConsumeResult.Error; + + + /// + public IMessageContext MessageContext { get; } + + /// + public Exception Exception { get; } + + + /// + public ExceptionStrategyContext(IMessageContext messageContext, Exception exception) { MessageContext = messageContext; Exception = exception; } - public IMessageContext MessageContext { get; } - public Exception Exception { get; } - - private HandlingResultBuilder handlingResult; - public HandlingResultBuilder HandlingResult + /// + public void SetConsumeResult(ConsumeResult consumeResult) { - get => handlingResult ?? (handlingResult = new HandlingResultBuilder()); - set => handlingResult = value; + ConsumeResult = consumeResult; } } } diff --git a/Tapeti/Default/NackExceptionStrategy.cs b/Tapeti/Default/NackExceptionStrategy.cs index 3bbb2d5..06510f2 100644 --- a/Tapeti/Default/NackExceptionStrategy.cs +++ b/Tapeti/Default/NackExceptionStrategy.cs @@ -2,11 +2,16 @@ namespace Tapeti.Default { + /// + /// + /// Default implementation of an exception strategy which marks the messages as Error. + /// public class NackExceptionStrategy : IExceptionStrategy { + /// public void HandleException(IExceptionStrategyContext context) { - context.HandlingResult.ConsumeResponse = ConsumeResponse.Nack; + context.SetConsumeResult(ConsumeResult.Error); } } } diff --git a/Tapeti/Default/NamespaceMatchExchangeStrategy.cs b/Tapeti/Default/NamespaceMatchExchangeStrategy.cs index 01ee0a6..3c4465e 100644 --- a/Tapeti/Default/NamespaceMatchExchangeStrategy.cs +++ b/Tapeti/Default/NamespaceMatchExchangeStrategy.cs @@ -3,13 +3,20 @@ using System.Text.RegularExpressions; namespace Tapeti.Default { + /// + /// + /// IExchangeStrategy implementation which uses the first identifier in the namespace in lower case, + /// skipping the first identifier if it is 'Messaging'. + /// + /// + /// Messaging.Service.Optional.Further.Parts will result in the exchange name 'service'. + /// public class NamespaceMatchExchangeStrategy : IExchangeStrategy { - // If the namespace starts with "Messaging.Service[.Optional.Further.Parts]", the exchange will be "Service". - // If no Messaging prefix is present, the first part of the namespace will be used instead. private static readonly Regex NamespaceRegex = new Regex("^(Messaging\\.)?(?[^\\.]+)", RegexOptions.Compiled | RegexOptions.Singleline); + /// public string GetExchange(Type messageType) { if (messageType.Namespace == null) diff --git a/Tapeti/Default/RabbitMQMessageProperties.cs b/Tapeti/Default/RabbitMQMessageProperties.cs index 6de9719..c560e56 100644 --- a/Tapeti/Default/RabbitMQMessageProperties.cs +++ b/Tapeti/Default/RabbitMQMessageProperties.cs @@ -12,6 +12,9 @@ namespace Tapeti.Default /// public class RabbitMQMessageProperties : IMessageProperties { + /// + /// Provides access to the wrapped IBasicProperties + /// public IBasicProperties BasicProperties { get; } diff --git a/Tapeti/Default/RequeueExceptionStrategy.cs b/Tapeti/Default/RequeueExceptionStrategy.cs index 6c014f6..87fa8a2 100644 --- a/Tapeti/Default/RequeueExceptionStrategy.cs +++ b/Tapeti/Default/RequeueExceptionStrategy.cs @@ -4,11 +4,25 @@ namespace Tapeti.Default { + /// + /// + /// Example exception strategy which requeues all messages that result in an error. + /// + /// + /// You probably do not want to use this strategy as-is in production code, unless + /// you are sure that all your exceptions are transient. A better way would be to + /// check for exceptions you know are transient. An even better way would be to + /// never requeue but retry transient errors internally. See the Tapeti documentation + /// for an example of this pattern: + /// + /// https://tapeti.readthedocs.io/en/latest/ + /// public class RequeueExceptionStrategy : IExceptionStrategy { + /// public void HandleException(IExceptionStrategyContext context) { - context.HandlingResult.ConsumeResponse = ConsumeResponse.Requeue; + context.SetConsumeResult(ConsumeResult.Requeue); } } } diff --git a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs index 99bf6b9..de51fc2 100644 --- a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs +++ b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs @@ -6,6 +6,13 @@ using System.Text.RegularExpressions; namespace Tapeti.Default { + /// + /// IRoutingKeyStrategy implementation which transforms the class name into a dot-separated routing key based + /// on the casing. Accounts for acronyms. If the class name ends with 'Message' it is not included in the routing key. + /// + /// + /// ExampleClassNameMessage will result in example.class.name + /// public class TypeNameRoutingKeyStrategy : IRoutingKeyStrategy { private const string SeparatorPattern = @" @@ -24,12 +31,17 @@ namespace Tapeti.Default private static readonly ConcurrentDictionary RoutingKeyCache = new ConcurrentDictionary(); + /// public string GetRoutingKey(Type messageType) { return RoutingKeyCache.GetOrAdd(messageType, BuildRoutingKey); } + /// + /// Actual implementation of GetRoutingKey, called only when the type has not been cached yet. + /// + /// protected virtual string BuildRoutingKey(Type messageType) { // Split PascalCase into dot-separated parts. If the class name ends in "Message" leave that out. @@ -43,6 +55,7 @@ namespace Tapeti.Default return string.Join(".", words.Select(s => s.ToLower())); } + private static List SplitPascalCase(string value) { var split = SeparatorRegex.Split(value); diff --git a/Tapeti/Exceptions/NackException.cs b/Tapeti/Exceptions/NackException.cs index 408dd71..a2fb7fa 100644 --- a/Tapeti/Exceptions/NackException.cs +++ b/Tapeti/Exceptions/NackException.cs @@ -2,8 +2,13 @@ namespace Tapeti.Exceptions { + /// + /// + /// Raised when a message is nacked by the message bus. + /// public class NackException : Exception { + /// public NackException(string message) : base(message) { } } } diff --git a/Tapeti/Exceptions/NoRouteException.cs b/Tapeti/Exceptions/NoRouteException.cs index 2dcd591..3f1ac64 100644 --- a/Tapeti/Exceptions/NoRouteException.cs +++ b/Tapeti/Exceptions/NoRouteException.cs @@ -2,8 +2,13 @@ namespace Tapeti.Exceptions { + /// + /// + /// Raised when a mandatory message has no route. + /// public class NoRouteException : Exception { + /// public NoRouteException(string message) : base(message) { } } } diff --git a/Tapeti/HandlingResult.cs b/Tapeti/HandlingResult.cs deleted file mode 100644 index 107c206..0000000 --- a/Tapeti/HandlingResult.cs +++ /dev/null @@ -1,63 +0,0 @@ -// ReSharper disable UnusedMember.Global - -namespace Tapeti -{ - public class HandlingResult - { - public HandlingResult() - { - ConsumeResponse = ConsumeResponse.Nack; - MessageAction = MessageAction.None; - } - - /// - /// Determines which response will be given to the message bus from where the message originates. - /// - public ConsumeResponse ConsumeResponse { get; internal set; } - - /// - /// Registers which action the Exception strategy has taken or will take to handle the error condition - /// on the message. This is important to know for cleanup handlers registered by middleware. - /// - public MessageAction MessageAction { get; internal set; } - - } - - public class HandlingResultBuilder - { - private static readonly HandlingResult Default = new HandlingResult(); - - private HandlingResult data = Default; - - public ConsumeResponse ConsumeResponse { - get => data.ConsumeResponse; - set => GetWritableData().ConsumeResponse = value; - } - - public MessageAction MessageAction - { - get => data.MessageAction; - set => GetWritableData().MessageAction = value; - } - - public HandlingResult ToHandlingResult() - { - if (data == Default) - { - return new HandlingResult(); - } - var result = GetWritableData(); - data = Default; - return result; - } - - private HandlingResult GetWritableData() - { - if (data == Default) - { - data = new HandlingResult(); - } - return data; - } - } -} diff --git a/Tapeti/Helpers/ConsoleHelper.cs b/Tapeti/Helpers/ConsoleHelper.cs index 0769de8..350947e 100644 --- a/Tapeti/Helpers/ConsoleHelper.cs +++ b/Tapeti/Helpers/ConsoleHelper.cs @@ -2,9 +2,17 @@ namespace Tapeti.Helpers { + /// + /// Helper class for console applications. + /// public static class ConsoleHelper { - // Source: http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console + /// + /// Determines if the application is running in a console. + /// + /// + /// Source: http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console + /// public static bool IsAvailable() { try diff --git a/Tapeti/Helpers/MiddlewareHelper.cs b/Tapeti/Helpers/MiddlewareHelper.cs index a40e2bd..ba1158e 100644 --- a/Tapeti/Helpers/MiddlewareHelper.cs +++ b/Tapeti/Helpers/MiddlewareHelper.cs @@ -4,8 +4,18 @@ using System.Threading.Tasks; namespace Tapeti.Helpers { + /// + /// Helper class for executing the middleware pattern. + /// public static class MiddlewareHelper { + /// + /// Executes the chain of middleware synchronously, starting with the last item in the list. + /// + /// The list of middleware to run + /// Receives the middleware which should be called and a reference to the action which will call the next. Pass this on to the middleware. + /// The action to execute when the innermost middleware calls next. + /// public static void Go(IReadOnlyList middleware, Action handle, Action lastHandler) { var handlerIndex = middleware?.Count - 1 ?? -1; @@ -28,6 +38,13 @@ namespace Tapeti.Helpers } + /// + /// Executes the chain of middleware asynchronously, starting with the last item in the list. + /// + /// The list of middleware to run + /// Receives the middleware which should be called and a reference to the action which will call the next. Pass this on to the middleware. + /// The action to execute when the innermost middleware calls next. + /// public static async Task GoAsync(IReadOnlyList middleware, Func, Task> handle, Func lastHandler) { var handlerIndex = middleware?.Count - 1 ?? -1; diff --git a/Tapeti/Helpers/TaskTypeHelper.cs b/Tapeti/Helpers/TaskTypeHelper.cs index 416a7ba..44e0c99 100644 --- a/Tapeti/Helpers/TaskTypeHelper.cs +++ b/Tapeti/Helpers/TaskTypeHelper.cs @@ -3,8 +3,18 @@ using System.Threading.Tasks; namespace Tapeti.Helpers { + /// + /// Helper methods for working with synchronous and asynchronous versions of methods. + /// public static class TaskTypeHelper { + /// + /// Determines if the given type matches the predicate, taking Task types into account. + /// + /// + /// + /// + /// public static bool IsTypeOrTaskOf(this Type type, Func predicate, out bool isTaskOf, out Type actualType) { if (type == typeof(Task)) @@ -32,11 +42,24 @@ namespace Tapeti.Helpers } + /// + /// Determines if the given type matches the predicate, taking Task types into account. + /// + /// + /// + /// public static bool IsTypeOrTaskOf(this Type type, Func predicate, out bool isTaskOf) { return IsTypeOrTaskOf(type, predicate, out isTaskOf, out _); } + + /// + /// Determines if the given type matches the compareTo type, taking Task types into account. + /// + /// + /// + /// public static bool IsTypeOrTaskOf(this Type type, Type compareTo, out bool isTaskOf) { return IsTypeOrTaskOf(type, t => t == compareTo, out isTaskOf); diff --git a/Tapeti/IConsumer.cs b/Tapeti/IConsumer.cs index f8be17a..7204bc5 100644 --- a/Tapeti/IConsumer.cs +++ b/Tapeti/IConsumer.cs @@ -16,6 +16,6 @@ namespace Tapeti /// Metadata included in the message /// The raw body of the message /// - Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body); + Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body); } } diff --git a/Tapeti/IDependencyResolver.cs b/Tapeti/IDependencyResolver.cs index 1862aa4..12870bf 100644 --- a/Tapeti/IDependencyResolver.cs +++ b/Tapeti/IDependencyResolver.cs @@ -7,24 +7,79 @@ namespace Tapeti /// public interface IDependencyResolver { + /// + /// Resolve an instance of T + /// + /// The type to instantiate + /// A new or singleton instance, depending on the registration T Resolve() where T : class; + + /// + /// Resolve an instance of T + /// + /// The type to instantiate + /// A new or singleton instance, depending on the registration object Resolve(Type type); } + /// /// /// Allows registering controller classes into the IoC container. Also registers default implementations, /// so that the calling application may override these. /// + /// + /// All implementations of IDependencyResolver should implement IDependencyContainer as well, + /// otherwise all registrations of Tapeti components will have to be done manually by the application. + /// public interface IDependencyContainer : IDependencyResolver { + /// + /// Registers a default implementation in the IoC container. If an alternative implementation + /// was registered before, it is not replaced. + /// + /// + /// void RegisterDefault() where TService : class where TImplementation : class, TService; + + /// + /// Registers a default implementation in the IoC container. If an alternative implementation + /// was registered before, it is not replaced. + /// + /// + /// void RegisterDefault(Func factory) where TService : class; + + /// + /// Registers a default singleton implementation in the IoC container. If an alternative implementation + /// was registered before, it is not replaced. + /// + /// + /// void RegisterDefaultSingleton() where TService : class where TImplementation : class, TService; + + /// + /// Registers a default singleton implementation in the IoC container. If an alternative implementation + /// was registered before, it is not replaced. + /// + /// + /// void RegisterDefaultSingleton(TService instance) where TService : class; + + /// + /// Registers a default singleton implementation in the IoC container. If an alternative implementation + /// was registered before, it is not replaced. + /// + /// + /// void RegisterDefaultSingleton(Func factory) where TService : class; + + /// + /// Registers a concrete controller class in the IoC container. + /// + /// void RegisterController(Type type); } } diff --git a/Tapeti/IExceptionStrategy.cs b/Tapeti/IExceptionStrategy.cs index 979f454..5aeb5e1 100644 --- a/Tapeti/IExceptionStrategy.cs +++ b/Tapeti/IExceptionStrategy.cs @@ -2,14 +2,16 @@ namespace Tapeti { + /// + /// Called when an exception occurs while handling a message. Determines how it should be handled. + /// public interface IExceptionStrategy { /// /// Called when an exception occurs while handling a message. /// /// The exception strategy context containing the necessary data including the message context and the thrown exception. - /// Also the response to the message can be set. - /// If there is any other handling of the message than the expected default than HandlingResult.MessageFutureAction must be set accordingly. + /// Also proivdes methods for the exception strategy to indicate how the message should be handled. void HandleException(IExceptionStrategyContext context); } } diff --git a/Tapeti/IExchangeStrategy.cs b/Tapeti/IExchangeStrategy.cs index e7aaa7e..2878e71 100644 --- a/Tapeti/IExchangeStrategy.cs +++ b/Tapeti/IExchangeStrategy.cs @@ -2,8 +2,16 @@ namespace Tapeti { + /// + /// Translates message classes into their target exchange. + /// public interface IExchangeStrategy { + /// + /// Determines the exchange belonging to the given message class. + /// + /// + /// string GetExchange(Type messageType); } } diff --git a/Tapeti/ILogger.cs b/Tapeti/ILogger.cs index 8ec857d..cbba1c7 100644 --- a/Tapeti/ILogger.cs +++ b/Tapeti/ILogger.cs @@ -1,16 +1,46 @@ using System; +using Tapeti.Config; // ReSharper disable UnusedMember.Global namespace Tapeti { - // This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog) - // instead of only string-based logging without control over the output. + /// + /// Handles the logging of various events in Tapeti + /// + /// + /// This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog) + /// instead of only string-based logging without control over the output. + /// public interface ILogger { - void Connect(TapetiConnectionParams connectionParams); + /// + /// Called before a connection to RabbitMQ is attempted. + /// + /// + /// Indicates whether this is the initial connection or a reconnect + void Connect(TapetiConnectionParams connectionParams, bool isReconnect); + + /// + /// Called when the connection has failed or is lost. + /// + /// + /// void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception); - void ConnectSuccess(TapetiConnectionParams connectionParams); - void HandlerException(Exception e); + + /// + /// Called when a connection to RabbitMQ has been succesfully established. + /// + /// + /// Indicates whether this is the initial connection or a reconnect + void ConnectSuccess(TapetiConnectionParams connectionParams, bool isReconnect); + + /// + /// Called when an exception occurs in a consumer. + /// + /// + /// + /// Indicates the action taken by the exception handler + void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult); } } diff --git a/Tapeti/IRoutingKeyStrategy.cs b/Tapeti/IRoutingKeyStrategy.cs index e13f287..db4ff14 100644 --- a/Tapeti/IRoutingKeyStrategy.cs +++ b/Tapeti/IRoutingKeyStrategy.cs @@ -2,8 +2,16 @@ namespace Tapeti { + /// + /// Translates message classes into routing keys. + /// public interface IRoutingKeyStrategy { + /// + /// Determines the routing key for the given message class. + /// + /// + /// string GetRoutingKey(Type messageType); } } diff --git a/Tapeti/MessageAction.cs b/Tapeti/MessageAction.cs new file mode 100644 index 0000000..a78793a --- /dev/null +++ b/Tapeti/MessageAction.cs @@ -0,0 +1,29 @@ +// ReSharper disable UnusedMember.Global + +namespace Tapeti +{ + /// + /// Indicates how the message was handled. + /// + public enum MessageAction + { + /// + /// The message was handled succesfully. + /// + Success, + + /// + /// There was an error while processing the message. + /// + Error, + + /// + /// The message has been stored for republishing and will be delivered again + /// even if the current messages has been Acked or Nacked. + /// + /// + /// This option is for compatibility with external scheduler services that do not immediately requeue a message. + /// + ExternalRetry + } +} diff --git a/Tapeti/MessageFutureAction.cs b/Tapeti/MessageFutureAction.cs deleted file mode 100644 index bc48049..0000000 --- a/Tapeti/MessageFutureAction.cs +++ /dev/null @@ -1,11 +0,0 @@ -// ReSharper disable UnusedMember.Global - -namespace Tapeti -{ - public enum MessageAction - { - None = 1, - ErrorLog = 2, - Retry = 3, - } -} diff --git a/Tapeti/TapetiAppSettingsConnectionParams.cs b/Tapeti/TapetiAppSettingsConnectionParams.cs index e6312e5..9140f4a 100644 --- a/Tapeti/TapetiAppSettingsConnectionParams.cs +++ b/Tapeti/TapetiAppSettingsConnectionParams.cs @@ -4,17 +4,35 @@ using System.Linq; namespace Tapeti { + /// + /// + /// Implementation of TapetiConnectionParams which reads the values from the AppSettings. + /// + /// + /// + /// AppSettings keys + /// + /// rabbitmq:hostname + /// rabbitmq:port + /// rabbitmq:virtualhost + /// rabbitmq:username + /// rabbitmq:password + /// rabbitmq:prefetchcount + /// public class TapetiAppSettingsConnectionParams : TapetiConnectionParams { - public const string DefaultPrefix = "rabbitmq:"; - public const string KeyHostname = "hostname"; - public const string KeyPort = "port"; - public const string KeyVirtualHost = "virtualhost"; - public const string KeyUsername = "username"; - public const string KeyPassword = "password"; - public const string KeyPrefetchCount = "prefetchcount"; + private const string DefaultPrefix = "rabbitmq:"; + private const string KeyHostname = "hostname"; + private const string KeyPort = "port"; + private const string KeyVirtualHost = "virtualhost"; + private const string KeyUsername = "username"; + private const string KeyPassword = "password"; + private const string KeyPrefetchCount = "prefetchcount"; + /// + /// + /// The prefix to apply to the keys. Defaults to "rabbitmq:" public TapetiAppSettingsConnectionParams(string prefix = DefaultPrefix) { var keys = ConfigurationManager.AppSettings.AllKeys;