diff --git a/Tapeti.Annotations/Tapeti.Annotations.csproj b/Tapeti.Annotations/Tapeti.Annotations.csproj index be5c9ef..f584c07 100644 --- a/Tapeti.Annotations/Tapeti.Annotations.csproj +++ b/Tapeti.Annotations/Tapeti.Annotations.csproj @@ -5,4 +5,8 @@ true + + 1701;1702;1591 + + diff --git a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj index 56cdff2..6ad9eab 100644 --- a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj +++ b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj @@ -4,6 +4,10 @@ netstandard2.0 + + 1701;1702;1591 + + diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj index 52e0d73..2237cd1 100644 --- a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj +++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj @@ -5,6 +5,10 @@ true + + 1701;1702;1591 + + diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj index eaa2e91..51ce58a 100644 --- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj +++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj @@ -5,6 +5,10 @@ true + + 1701;1702;1591 + + diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index fd796c4..7a65642 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -8,15 +8,15 @@ using Tapeti.Helpers; namespace Tapeti.Flow.Default { - internal class FlowBindingMiddleware : IBindingMiddleware + internal class FlowBindingMiddleware : IControllerBindingMiddleware { - public void Handle(IBindingContext context, Action next) + public void Handle(IControllerBindingContext context, Action next) { if (context.Method.GetCustomAttribute() != null) return; if (context.Method.GetCustomAttribute() != null) - context.QueueBindingMode = QueueBindingMode.DirectToQueue; + context.SetBindingTargetMode(BindingTargetMode.Direct); RegisterYieldPointResult(context); RegisterContinuationFilter(context); @@ -27,14 +27,13 @@ namespace Tapeti.Flow.Default } - private static void RegisterContinuationFilter(IBindingContext context) + private static void RegisterContinuationFilter(IControllerBindingContext context) { var continuationAttribute = context.Method.GetCustomAttribute(); if (continuationAttribute == null) return; - context.Use(new FlowMessageFilterMiddleware()); - context.Use(new FlowMessageMiddleware()); + context.Use(new FlowMiddleware()); if (context.Result.HasHandler) return; @@ -58,7 +57,7 @@ namespace Tapeti.Flow.Default } - private static void RegisterYieldPointResult(IBindingContext context) + private static void RegisterYieldPointResult(IControllerBindingContext context) { if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out var isTaskOf)) return; @@ -77,16 +76,16 @@ namespace Tapeti.Flow.Default } - private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint) + private static Task HandleYieldPoint(IControllerMessageContext context, IYieldPoint yieldPoint) { - var flowHandler = context.DependencyResolver.Resolve(); + var flowHandler = context.Config.DependencyResolver.Resolve(); return flowHandler.Execute(context, yieldPoint); } - private static Task HandleParallelResponse(IMessageContext context) + private static Task HandleParallelResponse(IControllerMessageContext context) { - var flowHandler = context.DependencyResolver.Resolve(); + var flowHandler = context.Config.DependencyResolver.Resolve(); return flowHandler.Execute(context, new DelegateYieldPoint(async flowContext => { await flowContext.Store(); @@ -94,7 +93,7 @@ namespace Tapeti.Flow.Default } - private static void ValidateRequestResponse(IBindingContext context) + private static void ValidateRequestResponse(IControllerBindingContext context) { var request = context.MessageClass?.GetCustomAttribute(); if (request?.Response == null) diff --git a/Tapeti.Flow/Default/FlowCleanupMiddleware.cs b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs deleted file mode 100644 index 12673ad..0000000 --- a/Tapeti.Flow/Default/FlowCleanupMiddleware.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System.Threading.Tasks; -using Tapeti.Config; - -namespace Tapeti.Flow.Default -{ - public class FlowCleanupMiddleware : ICleanupMiddleware - { - public async Task Handle(IMessageContext context, HandlingResult handlingResult) - { - if (!context.Items.TryGetValue(ContextItems.FlowContext, out var flowContextObj)) - return; - var flowContext = (FlowContext)flowContextObj; - - if (flowContext?.FlowStateLock != null) - { - if (handlingResult.ConsumeResponse == ConsumeResponse.Nack - || handlingResult.MessageAction == MessageAction.ErrorLog) - { - await flowContext.FlowStateLock.DeleteFlowState(); - } - flowContext.FlowStateLock.Dispose(); - } - } - } -} diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs index dbadf08..0746a31 100644 --- a/Tapeti.Flow/Default/FlowContext.cs +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -6,7 +6,7 @@ namespace Tapeti.Flow.Default { internal class FlowContext : IDisposable { - public IMessageContext MessageContext { get; set; } + public IControllerMessageContext MessageContext { get; set; } public IFlowStateLock FlowStateLock { get; set; } public FlowState FlowState { get; set; } diff --git a/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs b/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs deleted file mode 100644 index 8df46e8..0000000 --- a/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs +++ /dev/null @@ -1,62 +0,0 @@ -using System; -using System.Threading.Tasks; -using Tapeti.Config; -using Tapeti.Flow.FlowHelpers; - -namespace Tapeti.Flow.Default -{ - public class FlowMessageFilterMiddleware : IMessageFilterMiddleware - { - public async Task Handle(IMessageContext context, Func next) - { - var flowContext = await GetFlowContext(context); - if (flowContext?.ContinuationMetadata == null) - return; - - if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method)) - return; - - await next(); - } - - - private static async Task GetFlowContext(IMessageContext context) - { - if (context.Items.ContainsKey(ContextItems.FlowContext)) - return (FlowContext)context.Items[ContextItems.FlowContext]; - - if (context.Properties.CorrelationId == null) - return null; - - if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID)) - return null; - - var flowStore = context.DependencyResolver.Resolve(); - - var flowID = await flowStore.FindFlowID(continuationID); - if (!flowID.HasValue) - return null; - - var flowStateLock = await flowStore.LockFlowState(flowID.Value); - - var flowState = await flowStateLock.GetFlowState(); - if (flowState == null) - return null; - - var flowContext = new FlowContext - { - MessageContext = context, - - FlowStateLock = flowStateLock, - FlowState = flowState, - - ContinuationID = continuationID, - ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out var continuation) ? continuation : null - }; - - // IDisposable items in the IMessageContext are automatically disposed - context.Items.Add(ContextItems.FlowContext, flowContext); - return flowContext; - } - } -} diff --git a/Tapeti.Flow/Default/FlowMessageMiddleware.cs b/Tapeti.Flow/Default/FlowMessageMiddleware.cs deleted file mode 100644 index 394ae0b..0000000 --- a/Tapeti.Flow/Default/FlowMessageMiddleware.cs +++ /dev/null @@ -1,54 +0,0 @@ -using System; -using System.Reflection; -using System.Threading.Tasks; -using Tapeti.Config; - -namespace Tapeti.Flow.Default -{ - public class FlowMessageMiddleware : IMessageMiddleware - { - public async Task Handle(IMessageContext context, Func next) - { - var flowContext = (FlowContext)context.Items[ContextItems.FlowContext]; - if (flowContext != null) - { - Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller); - - // Remove Continuation now because the IYieldPoint result handler will store the new state - flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); - var converge = flowContext.FlowState.Continuations.Count == 0 && - flowContext.ContinuationMetadata.ConvergeMethodName != null; - - await next(); - - if (converge) - await CallConvergeMethod(context, - flowContext.ContinuationMetadata.ConvergeMethodName, - flowContext.ContinuationMetadata.ConvergeMethodSync); - } - else - await next(); - } - - - private static async Task CallConvergeMethod(IMessageContext context, string methodName, bool sync) - { - IYieldPoint yieldPoint; - - var method = context.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance); - if (method == null) - throw new ArgumentException($"Unknown converge method in controller {context.Controller.GetType().Name}: {methodName}"); - - if (sync) - yieldPoint = (IYieldPoint)method.Invoke(context.Controller, new object[] {}); - else - yieldPoint = await (Task)method.Invoke(context.Controller, new object[] { }); - - if (yieldPoint == null) - throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for converge method {methodName}"); - - var flowHandler = context.DependencyResolver.Resolve(); - await flowHandler.Execute(context, yieldPoint); - } - } -} diff --git a/Tapeti.Flow/Default/FlowMiddleware.cs b/Tapeti.Flow/Default/FlowMiddleware.cs new file mode 100644 index 0000000..c90332a --- /dev/null +++ b/Tapeti.Flow/Default/FlowMiddleware.cs @@ -0,0 +1,128 @@ +using System; +using System.Reflection; +using System.Threading.Tasks; +using Tapeti.Config; +using Tapeti.Flow.FlowHelpers; + +namespace Tapeti.Flow.Default +{ + public class FlowMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware + { + public async Task Filter(IControllerMessageContext context, Func next) + { + var flowContext = await CreateFlowContext(context); + if (flowContext?.ContinuationMetadata == null) + return; + + if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method)) + return; + + await next(); + } + + + public async Task Handle(IControllerMessageContext context, Func next) + { + if (context.Get(ContextItems.FlowContext, out FlowContext flowContext)) + { + Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller); + + // Remove Continuation now because the IYieldPoint result handler will store the new state + flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); + var converge = flowContext.FlowState.Continuations.Count == 0 && + flowContext.ContinuationMetadata.ConvergeMethodName != null; + + await next(); + + if (converge) + await CallConvergeMethod(context, + flowContext.ContinuationMetadata.ConvergeMethodName, + flowContext.ContinuationMetadata.ConvergeMethodSync); + } + else + await next(); + } + + + public async Task Cleanup(IControllerMessageContext context, HandlingResult handlingResult, Func next) + { + await next(); + + if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext)) + return; + + if (flowContext?.FlowStateLock != null) + { + if (handlingResult.ConsumeResponse == ConsumeResponse.Nack + || handlingResult.MessageAction == MessageAction.ErrorLog) + { + await flowContext.FlowStateLock.DeleteFlowState(); + } + flowContext.FlowStateLock.Dispose(); + } + } + + + + private static async Task CreateFlowContext(IControllerMessageContext context) + { + if (context.Get(ContextItems.FlowContext, out FlowContext flowContext)) + return flowContext; + + + if (context.Properties.CorrelationId == null) + return null; + + if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID)) + return null; + + var flowStore = context.Config.DependencyResolver.Resolve(); + + var flowID = await flowStore.FindFlowID(continuationID); + if (!flowID.HasValue) + return null; + + var flowStateLock = await flowStore.LockFlowState(flowID.Value); + + var flowState = await flowStateLock.GetFlowState(); + if (flowState == null) + return null; + + flowContext = new FlowContext + { + MessageContext = context, + + FlowStateLock = flowStateLock, + FlowState = flowState, + + ContinuationID = continuationID, + ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out var continuation) ? continuation : null + }; + + // IDisposable items in the IMessageContext are automatically disposed + context.Store(ContextItems.FlowContext, flowContext); + return flowContext; + } + + + private static async Task CallConvergeMethod(IControllerMessageContext context, string methodName, bool sync) + { + IYieldPoint yieldPoint; + + var method = context.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance); + if (method == null) + throw new ArgumentException($"Unknown converge method in controller {context.Controller.GetType().Name}: {methodName}"); + + if (sync) + yieldPoint = (IYieldPoint)method.Invoke(context.Controller, new object[] {}); + else + yieldPoint = await (Task)method.Invoke(context.Controller, new object[] { }); + + if (yieldPoint == null) + throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for converge method {methodName}"); + + var flowHandler = context.Config.DependencyResolver.Resolve(); + await flowHandler.Execute(context, yieldPoint); + } + } +} diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 5c6d9d9..9da8a7d 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -4,9 +4,9 @@ using System.Diagnostics; using System.Linq; using System.Reflection; using System.Threading.Tasks; -using RabbitMQ.Client.Framing; using Tapeti.Annotations; using Tapeti.Config; +using Tapeti.Default; using Tapeti.Flow.Annotations; using Tapeti.Flow.FlowHelpers; @@ -14,11 +14,11 @@ namespace Tapeti.Flow.Default { public class FlowProvider : IFlowProvider, IFlowHandler { - private readonly IConfig config; + private readonly ITapetiConfig config; private readonly IInternalPublisher publisher; - public FlowProvider(IConfig config, IPublisher publisher) + public FlowProvider(ITapetiConfig config, IPublisher publisher) { this.config = config; this.publisher = (IInternalPublisher)publisher; @@ -72,7 +72,7 @@ namespace Tapeti.Flow.Default ConvergeMethodSync = convergeMethodTaskSync }); - var properties = new BasicProperties + var properties = new MessageProperties { CorrelationId = continuationID.ToString(), ReplyTo = responseHandlerInfo.ReplyToQueue @@ -96,12 +96,10 @@ namespace Tapeti.Flow.Default if (message.GetType().FullName != reply.ResponseTypeName) throw new YieldPointException($"Flow must end with a response message of type {reply.ResponseTypeName}, {message.GetType().FullName} was returned instead"); - var properties = new BasicProperties(); - - // Only set the property if it's not null, otherwise a string reference exception can occur: - // http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html - if (reply.CorrelationId != null) - properties.CorrelationId = reply.CorrelationId; + var properties = new MessageProperties + { + CorrelationId = reply.CorrelationId + }; // TODO disallow if replyto is not specified? if (reply.ReplyTo != null) @@ -122,9 +120,9 @@ namespace Tapeti.Flow.Default } - private static ResponseHandlerInfo GetResponseHandlerInfo(IConfig config, object request, Delegate responseHandler) + private static ResponseHandlerInfo GetResponseHandlerInfo(ITapetiConfig config, object request, Delegate responseHandler) { - var binding = config.GetBinding(responseHandler); + var binding = config.Bindings.ForMethod(responseHandler); if (binding == null) throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler)); @@ -158,13 +156,13 @@ namespace Tapeti.Flow.Default CorrelationId = context.Properties.CorrelationId, ReplyTo = context.Properties.ReplyTo, ResponseTypeName = requestAttribute.Response.FullName, - Mandatory = context.Properties.Persistent + Mandatory = context.Properties.Persistent.GetValueOrDefault(true) }; } - private async Task CreateNewFlowState(FlowContext flowContext) + private static async Task CreateNewFlowState(FlowContext flowContext) { - var flowStore = flowContext.MessageContext.DependencyResolver.Resolve(); + var flowStore = flowContext.MessageContext.Config.DependencyResolver.Resolve(); var flowID = Guid.NewGuid(); flowContext.FlowStateLock = await flowStore.LockFlowState(flowID); @@ -181,25 +179,20 @@ namespace Tapeti.Flow.Default }; } - public async Task Execute(IMessageContext context, IYieldPoint yieldPoint) + public async Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint) { if (!(yieldPoint is DelegateYieldPoint executableYieldPoint)) throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Binding.Method.Name}"); - FlowContext flowContext; - - if (!context.Items.TryGetValue(ContextItems.FlowContext, out var flowContextItem)) + if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext)) { flowContext = new FlowContext { MessageContext = context }; - context.Items.Add(ContextItems.FlowContext, flowContext); + context.Store(ContextItems.FlowContext, flowContext); } - else - flowContext = (FlowContext)flowContextItem; - try { @@ -234,12 +227,12 @@ namespace Tapeti.Flow.Default } - private readonly IConfig config; + private readonly ITapetiConfig config; private readonly SendRequestFunc sendRequest; private readonly List requests = new List(); - public ParallelRequestBuilder(IConfig config, SendRequestFunc sendRequest) + public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest) { this.config = config; this.sendRequest = sendRequest; diff --git a/Tapeti.Flow/Default/FlowStarter.cs b/Tapeti.Flow/Default/FlowStarter.cs index 306f034..ab8a152 100644 --- a/Tapeti.Flow/Default/FlowStarter.cs +++ b/Tapeti.Flow/Default/FlowStarter.cs @@ -9,11 +9,11 @@ namespace Tapeti.Flow.Default { public class FlowStarter : IFlowStarter { - private readonly IConfig config; + private readonly ITapetiConfig config; private readonly ILogger logger; - public FlowStarter(IConfig config, ILogger logger) + public FlowStarter(ITapetiConfig config, ILogger logger) { this.config = config; this.logger = logger; @@ -47,9 +47,9 @@ namespace Tapeti.Flow.Default var controller = config.DependencyResolver.Resolve(); var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters)); - var context = new MessageContext + var context = new ControllerMessageContext { - DependencyResolver = config.DependencyResolver, + Config = config, Controller = controller }; @@ -72,6 +72,7 @@ namespace Tapeti.Flow.Default private async Task RunCleanup(MessageContext context, HandlingResult handlingResult) { + /* foreach (var handler in config.CleanupMiddleware) { try @@ -83,6 +84,7 @@ namespace Tapeti.Flow.Default logger.HandlerException(eCleanup); } } + */ } diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowMiddleware.cs index ddc1d61..a4b9cdb 100644 --- a/Tapeti.Flow/FlowMiddleware.cs +++ b/Tapeti.Flow/FlowMiddleware.cs @@ -25,7 +25,6 @@ namespace Tapeti.Flow public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) { yield return new FlowBindingMiddleware(); - yield return new FlowCleanupMiddleware(); } } } diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index c8f6982..b5fd107 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -38,7 +38,7 @@ namespace Tapeti.Flow /// public interface IFlowHandler { - Task Execute(IMessageContext context, IYieldPoint yieldPoint); + Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint); } public interface IFlowParallelRequestBuilder diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index 105aa14..a2c4094 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -5,6 +5,10 @@ true + + 1701;1702;1591 + + diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj index b33e71b..d0780ff 100644 --- a/Tapeti.Serilog/Tapeti.Serilog.csproj +++ b/Tapeti.Serilog/Tapeti.Serilog.csproj @@ -5,6 +5,10 @@ true + + 1701;1702;1591 + + diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj index ed72a19..6378431 100644 --- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj @@ -5,6 +5,10 @@ true + + 1701;1702;1591 + + diff --git a/Tapeti.Tests/TypeNameRoutingKeyStrategyTests.cs b/Tapeti.Tests/Default/TypeNameRoutingKeyStrategyTests.cs similarity index 98% rename from Tapeti.Tests/TypeNameRoutingKeyStrategyTests.cs rename to Tapeti.Tests/Default/TypeNameRoutingKeyStrategyTests.cs index d42ca26..d61f0b2 100644 --- a/Tapeti.Tests/TypeNameRoutingKeyStrategyTests.cs +++ b/Tapeti.Tests/Default/TypeNameRoutingKeyStrategyTests.cs @@ -2,7 +2,7 @@ using Tapeti.Default; using Xunit; -namespace Tapeti.Tests +namespace Tapeti.Tests.Default { // ReSharper disable InconsistentNaming public class TypeNameRoutingKeyStrategyTests diff --git a/Tapeti.Tests/ConnectionStringParser.cs b/Tapeti.Tests/Helpers/ConnectionStringParser.cs similarity index 98% rename from Tapeti.Tests/ConnectionStringParser.cs rename to Tapeti.Tests/Helpers/ConnectionStringParser.cs index d32240b..b0084de 100644 --- a/Tapeti.Tests/ConnectionStringParser.cs +++ b/Tapeti.Tests/Helpers/ConnectionStringParser.cs @@ -1,9 +1,8 @@ using Tapeti.Helpers; using Xunit; -namespace Tapeti.Tests +namespace Tapeti.Tests.Helpers { - // ReSharper disable InconsistentNaming public class ConnectionStringParserTest { [Fact] diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index 674cbd2..41ef4bd 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -4,6 +4,10 @@ netcoreapp2.1 + + 1701;1702;1591 + + @@ -14,4 +18,8 @@ + + + + diff --git a/Tapeti.Tests/TransientFilterMiddleware.cs b/Tapeti.Tests/TransientFilterMiddleware.cs deleted file mode 100644 index d311f03..0000000 --- a/Tapeti.Tests/TransientFilterMiddleware.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; -using System.Threading.Tasks; -using Tapeti.Config; - -namespace Tapeti.Tests -{ - public class TransientFilterMiddleware : IMessageFilterMiddleware - { - public Task Handle(IMessageContext context, Func next) - { - throw new NotImplementedException(); - } - } -} \ No newline at end of file diff --git a/Tapeti.Transient/ConfigExtentions.cs b/Tapeti.Transient/ConfigExtentions.cs index 7401578..fd62748 100644 --- a/Tapeti.Transient/ConfigExtentions.cs +++ b/Tapeti.Transient/ConfigExtentions.cs @@ -1,12 +1,23 @@ using System; +using Tapeti.Config; namespace Tapeti.Transient { + /// + /// TapetiConfig extension to register Tapeti.Transient + /// public static class ConfigExtensions { - public static TapetiConfig WithTransient(this TapetiConfig config, TimeSpan defaultTimeout, string dynamicQueuePrefix = "transient") + /// + /// Registers the transient publisher and required middleware + /// + /// + /// + /// + /// + public static ITapetiConfigBuilder WithTransient(this ITapetiConfigBuilder config, TimeSpan defaultTimeout, string dynamicQueuePrefix = "transient") { - config.Use(new TransientMiddleware(defaultTimeout, dynamicQueuePrefix)); + config.Use(new TransientExtension(defaultTimeout, dynamicQueuePrefix)); return config; } } diff --git a/Tapeti.Transient/ITransientPublisher.cs b/Tapeti.Transient/ITransientPublisher.cs index 2765259..494a9b4 100644 --- a/Tapeti.Transient/ITransientPublisher.cs +++ b/Tapeti.Transient/ITransientPublisher.cs @@ -2,8 +2,17 @@ namespace Tapeti.Transient { + /// + /// Provides a publisher which can send request messages, and await the response on a dynamic queue. + /// public interface ITransientPublisher { + /// + /// Sends a request and waits for the response with the timeout specified in the WithTransient config call. + /// + /// + /// + /// Task RequestResponse(TRequest request); } } \ No newline at end of file diff --git a/Tapeti.Transient/Tapeti.Transient.csproj b/Tapeti.Transient/Tapeti.Transient.csproj index f3cca6f..dbe9e14 100644 --- a/Tapeti.Transient/Tapeti.Transient.csproj +++ b/Tapeti.Transient/Tapeti.Transient.csproj @@ -5,6 +5,10 @@ true + + 1701;1702 + + diff --git a/Tapeti.Transient/TransientMiddleware.cs b/Tapeti.Transient/TransientExtension.cs similarity index 56% rename from Tapeti.Transient/TransientMiddleware.cs rename to Tapeti.Transient/TransientExtension.cs index 5077fa5..2ce8477 100644 --- a/Tapeti.Transient/TransientMiddleware.cs +++ b/Tapeti.Transient/TransientExtension.cs @@ -4,29 +4,38 @@ using Tapeti.Config; namespace Tapeti.Transient { - public class TransientMiddleware : ITapetiExtension, ITapetiExtentionBinding + /// + public class TransientExtension : ITapetiExtensionBinding { - private string dynamicQueuePrefix; + private readonly string dynamicQueuePrefix; private readonly TransientRouter router; - public TransientMiddleware(TimeSpan defaultTimeout, string dynamicQueuePrefix) + + /// + public TransientExtension(TimeSpan defaultTimeout, string dynamicQueuePrefix) { this.dynamicQueuePrefix = dynamicQueuePrefix; - this.router = new TransientRouter(defaultTimeout); + router = new TransientRouter(defaultTimeout); } + + /// public void RegisterDefaults(IDependencyContainer container) { container.RegisterDefaultSingleton(router); container.RegisterDefault(); } + + /// public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) { - return new object[0]; + return null; } - public IEnumerable GetBindings(IDependencyResolver dependencyResolver) + + /// + public IEnumerable GetBindings(IDependencyResolver dependencyResolver) { yield return new TransientGenericBinding(router, dynamicQueuePrefix); } diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs index f28643d..f2f3e5e 100644 --- a/Tapeti.Transient/TransientGenericBinding.cs +++ b/Tapeti.Transient/TransientGenericBinding.cs @@ -1,52 +1,51 @@ using System; -using System.Reflection; using System.Threading.Tasks; using Tapeti.Config; namespace Tapeti.Transient { - public class TransientGenericBinding : ICustomBinding + /// + /// + /// Implements a binding for transient request response messages. + /// Register this binding using the WithTransient config extension method. + /// + public class TransientGenericBinding : IBinding { private readonly TransientRouter router; + private readonly string dynamicQueuePrefix; + /// + public string QueueName { get; private set; } + + + /// public TransientGenericBinding(TransientRouter router, string dynamicQueuePrefix) { this.router = router; - DynamicQueuePrefix = dynamicQueuePrefix; - Method = typeof(TransientRouter).GetMethod("GenericHandleResponse"); + this.dynamicQueuePrefix = dynamicQueuePrefix; } - public Type Controller => typeof(TransientRouter); - public MethodInfo Method { get; } + /// + public async Task Apply(IBindingTarget target) + { + QueueName = await target.BindDirectDynamic(dynamicQueuePrefix); + router.TransientResponseQueueName = QueueName; + } - public QueueBindingMode QueueBindingMode => QueueBindingMode.DirectToQueue; - - public string StaticQueueName => null; - - public string DynamicQueuePrefix { get; } - - public Type MessageClass => null; + /// public bool Accept(Type messageClass) { return true; } - public bool Accept(IMessageContext context, object message) - { - return true; - } - public Task Invoke(IMessageContext context, object message) + /// + public Task Invoke(IMessageContext context) { - router.GenericHandleResponse(message, context); + router.HandleMessage(context); return Task.CompletedTask; } - - public void SetQueueName(string queueName) - { - router.TransientResponseQueueName = queueName; - } } } \ No newline at end of file diff --git a/Tapeti.Transient/TransientPublisher.cs b/Tapeti.Transient/TransientPublisher.cs index 62715e7..16343db 100644 --- a/Tapeti.Transient/TransientPublisher.cs +++ b/Tapeti.Transient/TransientPublisher.cs @@ -1,21 +1,26 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; +using System.Threading.Tasks; namespace Tapeti.Transient { + /// + /// + /// Default implementation of ITransientPublisher + /// public class TransientPublisher : ITransientPublisher { private readonly TransientRouter router; private readonly IPublisher publisher; + + /// public TransientPublisher(TransientRouter router, IPublisher publisher) { this.router = router; this.publisher = publisher; } + + /// public async Task RequestResponse(TRequest request) { return (TResponse)(await router.RequestResponse(publisher, request)); diff --git a/Tapeti.Transient/TransientRouter.cs b/Tapeti.Transient/TransientRouter.cs index 775576b..7eb11e5 100644 --- a/Tapeti.Transient/TransientRouter.cs +++ b/Tapeti.Transient/TransientRouter.cs @@ -2,26 +2,37 @@ using System.Collections.Concurrent; using System.Threading.Tasks; using System.Threading; -using RabbitMQ.Client.Framing; using Tapeti.Config; +using Tapeti.Default; namespace Tapeti.Transient { + /// + /// Manages active requests and responses. For internal use. + /// public class TransientRouter { private readonly int defaultTimeoutMs; - private readonly ConcurrentDictionary> map = new ConcurrentDictionary>(); - + /// + /// The generated name of the dynamic queue to which responses should be sent. + /// public string TransientResponseQueueName { get; set; } + + /// public TransientRouter(TimeSpan defaultTimeout) { defaultTimeoutMs = (int)defaultTimeout.TotalMilliseconds; } - public void GenericHandleResponse(object response, IMessageContext context) + + /// + /// Processes incoming messages to complete the corresponding request task. + /// + /// + public void HandleMessage(IMessageContext context) { if (context.Properties.CorrelationId == null) return; @@ -30,9 +41,16 @@ namespace Tapeti.Transient return; if (map.TryRemove(continuationID, out var tcs)) - tcs.SetResult(response); + tcs.SetResult(context.Message); } + + /// + /// Sends a request and waits for the response. Do not call directly, instead use ITransientPublisher.RequestResponse. + /// + /// + /// + /// public async Task RequestResponse(IPublisher publisher, object request) { var correlation = Guid.NewGuid(); @@ -40,7 +58,7 @@ namespace Tapeti.Transient try { - var properties = new BasicProperties + var properties = new MessageProperties { CorrelationId = correlation.ToString(), ReplyTo = TransientResponseQueueName, @@ -64,6 +82,7 @@ namespace Tapeti.Transient } } + private void TimeoutResponse(object tcs) { ((TaskCompletionSource)tcs).SetException(new TimeoutException("Transient RequestResponse timed out at (ms) " + defaultTimeoutMs)); diff --git a/Tapeti.png b/Tapeti.png new file mode 100644 index 0000000..b4b8350 Binary files /dev/null and b/Tapeti.png differ diff --git a/Tapeti.sln.DotSettings b/Tapeti.sln.DotSettings index 06fcad6..a951efb 100644 --- a/Tapeti.sln.DotSettings +++ b/Tapeti.sln.DotSettings @@ -1,4 +1,9 @@  + False ID KV - <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /> \ No newline at end of file + <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /> + True + True + True + True \ No newline at end of file diff --git a/Tapeti/Config/IBinding.cs b/Tapeti/Config/IBinding.cs new file mode 100644 index 0000000..e717a62 --- /dev/null +++ b/Tapeti/Config/IBinding.cs @@ -0,0 +1,89 @@ +using System; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + /// + /// Represents a registered binding to handle incoming messages. + /// + public interface IBinding + { + /// + /// The name of the queue the binding is consuming. May change after a reconnect for dynamic queues. + /// + string QueueName { get; } + + + /// + /// Called after a connection is established to set up the binding. + /// + /// + Task Apply(IBindingTarget target); + + + /// + /// Determines if the message as specified by the message class can be handled by this binding. + /// + /// + bool Accept(Type messageClass); + + + /// + /// Invokes the handler for the message as specified by the context. + /// + /// + Task Invoke(IMessageContext context); + } + + + + /// + /// Allows the binding to specify to which queue it should bind to and how. + /// At most one of these methods can be called, calling a second method will result in an exception. + /// + public interface IBindingTarget + { + /// + /// Binds the messageClass to the specified durable queue. + /// + /// The message class to be bound to the queue + /// The name of the durable queue + Task BindDurable(Type messageClass, string queueName); + + /// + /// Binds the messageClass to a dynamic auto-delete queue. + /// + /// + /// Dynamic bindings for different messageClasses will be bundled into a single dynamic queue. + /// Specifying a different queuePrefix is a way to force bindings into separate queues. + /// + /// The message class to be bound to the queue + /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. + /// The generated name of the dynamic queue + Task BindDynamic(Type messageClass, string queuePrefix = null); + + /// + /// Declares a durable queue but does not add a binding for a messageClass' routing key. + /// Used for direct-to-queue messages. + /// + /// The name of the durable queue + Task BindDirectDurable(string queueName); + + /// + /// Declares a dynamic queue but does not add a binding for a messageClass' routing key. + /// Used for direct-to-queue messages. The messageClass is used to ensure each queue only handles unique message types. + /// + /// The message class which will be handled on the queue. It is not actually bound to the queue. + /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. + /// The generated name of the dynamic queue + Task BindDirectDynamic(Type messageClass = null, string queuePrefix = null); + + /// + /// Declares a dynamic queue but does not add a binding for a messageClass' routing key. + /// Used for direct-to-queue messages. Guarantees a unique queue. + /// + /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. + /// The generated name of the dynamic queue + Task BindDirectDynamic(string queuePrefix = null); + } +} diff --git a/Tapeti/Config/IBindingContext.cs b/Tapeti/Config/IBindingContext.cs deleted file mode 100644 index b5cc3b7..0000000 --- a/Tapeti/Config/IBindingContext.cs +++ /dev/null @@ -1,57 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Threading.Tasks; - -namespace Tapeti.Config -{ - public delegate object ValueFactory(IMessageContext context); - public delegate Task ResultHandler(IMessageContext context, object value); - - - public enum QueueBindingMode - { - /// - /// Allow binding of the routing key from the message's source exchange to the queue - /// - RoutingKey, - - /// - /// Do not bind, rely on the direct-to-queue exchange - /// - DirectToQueue - } - - - public interface IBindingContext - { - Type MessageClass { get; set; } - - MethodInfo Method { get; } - IReadOnlyList Parameters { get; } - IBindingResult Result { get; } - - QueueBindingMode QueueBindingMode { get; set; } - - void Use(IMessageFilterMiddleware filterMiddleware); - void Use(IMessageMiddleware middleware); - } - - - public interface IBindingParameter - { - ParameterInfo Info { get; } - bool HasBinding { get; } - - void SetBinding(ValueFactory valueFactory); - } - - - public interface IBindingResult - { - ParameterInfo Info { get; } - bool HasHandler { get; } - - void SetHandler(ResultHandler resultHandler); - } -} diff --git a/Tapeti/Config/IBindingMiddleware.cs b/Tapeti/Config/IBindingMiddleware.cs deleted file mode 100644 index e2d977c..0000000 --- a/Tapeti/Config/IBindingMiddleware.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; - -namespace Tapeti.Config -{ - public interface IBindingMiddleware - { - void Handle(IBindingContext context, Action next); - } -} diff --git a/Tapeti/Config/ICleanupMiddleware.cs b/Tapeti/Config/ICleanupMiddleware.cs deleted file mode 100644 index 132991b..0000000 --- a/Tapeti/Config/ICleanupMiddleware.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Threading.Tasks; - -namespace Tapeti.Config -{ - public interface ICleanupMiddleware - { - Task Handle(IMessageContext context, HandlingResult handlingResult); - } -} diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs deleted file mode 100644 index 1cdaad7..0000000 --- a/Tapeti/Config/IConfig.cs +++ /dev/null @@ -1,59 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Threading.Tasks; - -namespace Tapeti.Config -{ - public interface IConfig - { - bool UsePublisherConfirms { get; } - - IDependencyResolver DependencyResolver { get; } - IReadOnlyList MessageMiddleware { get; } - IReadOnlyList CleanupMiddleware { get; } - IReadOnlyList PublishMiddleware { get; } - IEnumerable Queues { get; } - - IBinding GetBinding(Delegate method); - } - - - public interface IQueue - { - bool Dynamic { get; } - string Name { get; } - - IEnumerable Bindings { get; } - } - - - public interface IDynamicQueue : IQueue - { - string GetDeclareQueueName(); - void SetName(string name); - } - - - public interface IBinding - { - Type Controller { get; } - MethodInfo Method { get; } - Type MessageClass { get; } - string QueueName { get; } - QueueBindingMode QueueBindingMode { get; set; } - - IReadOnlyList MessageFilterMiddleware { get; } - IReadOnlyList MessageMiddleware { get; } - - bool Accept(Type messageClass); - bool Accept(IMessageContext context, object message); - Task Invoke(IMessageContext context, object message); - } - - - public interface IBuildBinding : IBinding - { - void SetQueueName(string queueName); - } -} diff --git a/Tapeti/Config/IControllerBindingContext.cs b/Tapeti/Config/IControllerBindingContext.cs new file mode 100644 index 0000000..9877ff0 --- /dev/null +++ b/Tapeti/Config/IControllerBindingContext.cs @@ -0,0 +1,137 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + /// + /// Injects a value for a controller method parameter. + /// + /// + public delegate object ValueFactory(IControllerMessageContext context); + + + /// + /// Handles the return value of a controller method. + /// + /// + /// + public delegate Task ResultHandler(IControllerMessageContext context, object value); + + + /// + /// Determines how the binding target is configured. + /// + public enum BindingTargetMode + { + /// + /// Bind to a queue using the message's routing key + /// + Default, + + /// + /// Bind to a queue without registering the message's routing key + /// + Direct + } + + + /// + /// Provides information about the controller and method being registered. + /// + public interface IControllerBindingContext + { + /// + /// The message class for this method. + /// + Type MessageClass { get; } + + /// + /// The controller class for this binding. + /// + Type Controller { get; } + + /// + /// The method for this binding. + /// + MethodInfo Method { get; } + + /// + /// The list of parameters passed to the method. + /// + IReadOnlyList Parameters { get; } + + /// + /// The return type of the method. + /// + IBindingResult Result { get; } + + + /// + /// Sets the message class for this method. Can only be called once, which is always done first by the default MessageBinding. + /// + /// + void SetMessageClass(Type messageClass); + + + /// + /// Determines how the binding target is configured. Can only be called once. Defaults to 'Default'. + /// + /// + void SetBindingTargetMode(BindingTargetMode mode); + + + /// + /// Add middleware specific to this method. + /// + /// + void Use(IControllerMiddlewareBase handler); + } + + + /// + /// Information about a method parameter and how it gets it's value. + /// + public interface IBindingParameter + { + /// + /// Reference to the reflection info for this parameter. + /// + ParameterInfo Info { get; } + + /// + /// Determines if a binding has been set. + /// + bool HasBinding { get; } + + /// + /// Sets the binding for this parameter. Can only be called once. + /// + /// + void SetBinding(ValueFactory valueFactory); + } + + + /// + /// Information about the return type of a method. + /// + public interface IBindingResult + { + /// + /// Reference to the reflection info for this return value. + /// + ParameterInfo Info { get; } + + /// + /// Determines if a handler has been set. + /// + bool HasHandler { get; } + + /// + /// Sets the handler for this return type. Can only be called once. + /// + /// + void SetHandler(ResultHandler resultHandler); + } +} diff --git a/Tapeti/Config/IControllerBindingMiddleware.cs b/Tapeti/Config/IControllerBindingMiddleware.cs new file mode 100644 index 0000000..d88c951 --- /dev/null +++ b/Tapeti/Config/IControllerBindingMiddleware.cs @@ -0,0 +1,19 @@ +using System; + +namespace Tapeti.Config +{ + /// + /// + /// Called when a Controller method is registered. + /// + public interface IControllerBindingMiddleware : IControllerMiddlewareBase + { + /// + /// Called before a Controller method is registered. Can change the way parameters and return values are handled, + /// and can inject message middleware specific to a method. + /// + /// + /// Must be called to activate the new layer of middleware. + void Handle(IControllerBindingContext context, Action next); + } +} diff --git a/Tapeti/Config/IControllerCleanupMiddleware.cs b/Tapeti/Config/IControllerCleanupMiddleware.cs new file mode 100644 index 0000000..3bbd0ee --- /dev/null +++ b/Tapeti/Config/IControllerCleanupMiddleware.cs @@ -0,0 +1,20 @@ +using System; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + /// + /// + /// Denotes middleware that runs after controller methods. + /// + public interface IControllerCleanupMiddleware : IControllerMiddlewareBase + { + /// + /// Called after the message handler method, even if exceptions occured. + /// + /// + /// + /// Always call to allow the next in the chain to clean up + Task Cleanup(IControllerMessageContext context, HandlingResult handlingResult, Func next); + } +} diff --git a/Tapeti/Config/IControllerFilterMiddleware.cs b/Tapeti/Config/IControllerFilterMiddleware.cs new file mode 100644 index 0000000..ec8391a --- /dev/null +++ b/Tapeti/Config/IControllerFilterMiddleware.cs @@ -0,0 +1,20 @@ +using System; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + /// + /// + /// Denotes middleware that runs before the controller is instantiated. + /// + public interface IControllerFilterMiddleware : IControllerMiddlewareBase + { + /// + /// Called before the + /// + /// + /// + /// + Task Filter(IControllerMessageContext context, Func next); + } +} diff --git a/Tapeti/Config/IControllerMessageContext.cs b/Tapeti/Config/IControllerMessageContext.cs new file mode 100644 index 0000000..25108c9 --- /dev/null +++ b/Tapeti/Config/IControllerMessageContext.cs @@ -0,0 +1,37 @@ +namespace Tapeti.Config +{ + /// + /// + /// Extends the message context with information about the controller. + /// + public interface IControllerMessageContext : IMessageContext + { + /// + /// An instance of the controller referenced by the binding. + /// + object Controller { get; } + + + /// + /// Provides access to the binding which is currently processing the message. + /// + new IControllerMethodBinding Binding { get; } + + + /// + /// Stores a key-value pair in the context for passing information between the various + /// controller middleware stages (IControllerMiddlewareBase descendants). + /// + /// A unique key. It is recommended to prefix it with the package name which hosts the middleware to prevent conflicts + /// Will be disposed if the value implements IDisposable + void Store(string key, object value); + + /// + /// Retrieves a previously stored value. + /// + /// + /// + /// True if the value was found, False otherwise + bool Get(string key, out T value) where T : class; + } +} diff --git a/Tapeti/Config/IControllerMessageMiddleware.cs b/Tapeti/Config/IControllerMessageMiddleware.cs new file mode 100644 index 0000000..a252081 --- /dev/null +++ b/Tapeti/Config/IControllerMessageMiddleware.cs @@ -0,0 +1,19 @@ +using System; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + /// + /// Denotes middleware that runs for controller methods. + /// + public interface IControllerMessageMiddleware + { + /// + /// Called after the message has passed any filter middleware and the controller has been instantiated, + /// but before the method has been called. + /// + /// + /// Call to pass the message to the next handler in the chain or call the controller method + Task Handle(IControllerMessageContext context, Func next); + } +} diff --git a/Tapeti/Config/IControllerMethodBinding.cs b/Tapeti/Config/IControllerMethodBinding.cs new file mode 100644 index 0000000..0fb4ce5 --- /dev/null +++ b/Tapeti/Config/IControllerMethodBinding.cs @@ -0,0 +1,22 @@ +using System; +using System.Reflection; + +namespace Tapeti.Config +{ + /// + /// + /// Represents a binding to a method in a controller class to handle incoming messages. + /// + public interface IControllerMethodBinding : IBinding + { + /// + /// The controller class. + /// + Type Controller { get; } + + /// + /// The method on the Controller class to which this binding is bound. + /// + MethodInfo Method { get; } + } +} diff --git a/Tapeti/Config/IControllerMiddlewareBase.cs b/Tapeti/Config/IControllerMiddlewareBase.cs new file mode 100644 index 0000000..acb5de1 --- /dev/null +++ b/Tapeti/Config/IControllerMiddlewareBase.cs @@ -0,0 +1,9 @@ +namespace Tapeti.Config +{ + /// + /// Base interface for all controller middleware. Implement at least one of the descendant interfaces to be able to register. + /// + public interface IControllerMiddlewareBase + { + } +} diff --git a/Tapeti/Config/ICustomBinding.cs b/Tapeti/Config/ICustomBinding.cs deleted file mode 100644 index 8b39247..0000000 --- a/Tapeti/Config/ICustomBinding.cs +++ /dev/null @@ -1,31 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Text; -using System.Threading.Tasks; - -namespace Tapeti.Config -{ - public interface ICustomBinding - { - Type Controller { get; } - - MethodInfo Method { get; } - - QueueBindingMode QueueBindingMode { get; } - - string StaticQueueName { get; } - - string DynamicQueuePrefix { get; } - - Type MessageClass { get; } // Needed to get routing key information when QueueBindingMode = RoutingKey - - bool Accept(Type messageClass); - - bool Accept(IMessageContext context, object message); - - Task Invoke(IMessageContext context, object message); - - void SetQueueName(string queueName); - } -} diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs index 658636c..f8a839f 100644 --- a/Tapeti/Config/IMessageContext.cs +++ b/Tapeti/Config/IMessageContext.cs @@ -1,29 +1,46 @@ using System; -using System.Collections.Generic; -using RabbitMQ.Client; - -// ReSharper disable UnusedMember.Global namespace Tapeti.Config { + /// + /// + /// Provides information about the message currently being handled. + /// public interface IMessageContext : IDisposable { - IDependencyResolver DependencyResolver { get; } + /// + /// Provides access to the Tapeti config. + /// + ITapetiConfig Config { get; } + /// + /// Contains the name of the queue the message was consumed from. + /// string Queue { get; } - string RoutingKey { get; } - object Message { get; } - IBasicProperties Properties { get; } - IDictionary Items { get; } + /// + /// Contains the exchange to which the message was published. + /// + string Exchange { get; } + + /// + /// Contains the routing key as provided when the message was published. + /// + string RoutingKey { get; } + + /// + /// Contains the decoded message instance. + /// + object Message { get; } + + /// + /// Provides access to the message metadata. + /// + IMessageProperties Properties { get; } /// - /// Controller will be null when passed to a IMessageFilterMiddleware + /// Provides access to the binding which is currently processing the message. /// - object Controller { get; } - IBinding Binding { get; } - - IMessageContext SetupNestedContext(); } } diff --git a/Tapeti/Config/IMessageFilterMiddleware.cs b/Tapeti/Config/IMessageFilterMiddleware.cs deleted file mode 100644 index 497909c..0000000 --- a/Tapeti/Config/IMessageFilterMiddleware.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; -using System.Threading.Tasks; - -namespace Tapeti.Config -{ - public interface IMessageFilterMiddleware - { - Task Handle(IMessageContext context, Func next); - } -} diff --git a/Tapeti/Config/IMessageMiddleware.cs b/Tapeti/Config/IMessageMiddleware.cs index 38ee22b..134b5de 100644 --- a/Tapeti/Config/IMessageMiddleware.cs +++ b/Tapeti/Config/IMessageMiddleware.cs @@ -3,8 +3,16 @@ using System.Threading.Tasks; namespace Tapeti.Config { + /// + /// Denotes middleware that processes all messages. + /// public interface IMessageMiddleware { + /// + /// Called for all bindings when a message needs to be handled. + /// + /// + /// Call to pass the message to the next handler in the chain Task Handle(IMessageContext context, Func next); } } diff --git a/Tapeti/Config/IMessageProperties.cs b/Tapeti/Config/IMessageProperties.cs new file mode 100644 index 0000000..31c203b --- /dev/null +++ b/Tapeti/Config/IMessageProperties.cs @@ -0,0 +1,48 @@ +using System; +using System.Collections.Generic; + +namespace Tapeti.Config +{ + /// + /// Metadata properties attached to a message, equivalent to the RabbitMQ Client's IBasicProperties. + /// + public interface IMessageProperties + { + /// + string ContentType { get; set; } + + /// + string CorrelationId { get; set; } + + /// + string ReplyTo { get; set; } + + /// + bool? Persistent { get; set; } + + /// + DateTime? Timestamp { get; set; } + + + /// + /// Writes a custom header. + /// + /// + /// + void SetHeader(string name, string value); + + + /// + /// Retrieves the value of a custom header field. + /// + /// + /// The value if found, null otherwise + string GetHeader(string name); + + + /// + /// Retrieves all custom headers. + /// + IEnumerable> GetHeaders(); + } +} diff --git a/Tapeti/Config/IPublishContext.cs b/Tapeti/Config/IPublishContext.cs index 4bb4ff8..c66e765 100644 --- a/Tapeti/Config/IPublishContext.cs +++ b/Tapeti/Config/IPublishContext.cs @@ -4,13 +4,34 @@ namespace Tapeti.Config { + /// + /// Provides access to information about the message being published. + /// public interface IPublishContext { - IDependencyResolver DependencyResolver { get; } + /// + /// Provides access to the Tapeti config. + /// + ITapetiConfig Config { get; } - string Exchange { get; } + /// + /// The exchange to which the message will be published. + /// + string Exchange { get; set; } + + /// + /// The routing key which will be included with the message. + /// string RoutingKey { get; } + + /// + /// The instance of the message class. + /// object Message { get; } - IBasicProperties Properties { get; } + + /// + /// Provides access to the message metadata. + /// + IMessageProperties Properties { get; } } } diff --git a/Tapeti/Config/ITapetiConfig.cs b/Tapeti/Config/ITapetiConfig.cs new file mode 100644 index 0000000..b1108f7 --- /dev/null +++ b/Tapeti/Config/ITapetiConfig.cs @@ -0,0 +1,112 @@ +using System; +using System.Collections.Generic; + +namespace Tapeti.Config +{ + /// + /// Provides access to the Tapeti configuration. + /// + public interface ITapetiConfig + { + /// + /// Reference to the wrapper for an IoC container, to provide dependency injection to Tapeti. + /// + IDependencyResolver DependencyResolver { get; } + + /// + /// Various Tapeti features which can be turned on or off. + /// + ITapetiConfigFeatues Features { get; } + + /// + /// Provides access to the different kinds of registered middleware. + /// + ITapetiConfigMiddleware Middleware { get; } + + /// + /// A list of all registered bindings. + /// + ITapetiConfigBindings Bindings { get; } + } + + + /// + /// Various Tapeti features which can be turned on or off. + /// + public interface ITapetiConfigFeatues + { + /// + /// Determines whether 'publisher confirms' are used. This RabbitMQ features allows Tapeti to + /// be notified if a message has no route, and guarantees delivery for request-response style + /// messages and those marked with the Mandatory attribute. On by default, can only be turned + /// off by explicitly calling DisablePublisherConfirms, which is not recommended. + /// + bool PublisherConfirms { get; } + + /// + /// If enabled, durable queues will be created at startup and their bindings will be updated + /// with the currently registered message handlers. If not enabled all durable queues must + /// already be present when the connection is made. + /// + bool DeclareDurableQueues { get; } + } + + + /// + /// Provides access to the different kinds of registered middleware. + /// + public interface ITapetiConfigMiddleware + { + /// + /// A list of message middleware which is called when a message is being consumed. + /// + IReadOnlyList Message { get; } + + /// + /// A list of publish middleware which is called when a message is being published. + /// + IReadOnlyList Publish { get; } + } + + + /// + /// + /// Contains a list of registered bindings, with a few added helpers. + /// + public interface ITapetiConfigBindings : IReadOnlyList + { + /// + /// Searches for a binding linked to the specified method. + /// + /// + /// The binding if found, null otherwise + IControllerMethodBinding ForMethod(Delegate method); + } + + + /* + public interface IBinding + { + Type Controller { get; } + MethodInfo Method { get; } + Type MessageClass { get; } + string QueueName { get; } + QueueBindingMode QueueBindingMode { get; set; } + + IReadOnlyList MessageFilterMiddleware { get; } + IReadOnlyList MessageMiddleware { get; } + + bool Accept(Type messageClass); + bool Accept(IMessageContext context, object message); + Task Invoke(IMessageContext context, object message); + } + */ + + + /* + public interface IBuildBinding : IBinding + { + void SetQueueName(string queueName); + } + */ +} diff --git a/Tapeti/Config/ITapetiConfigBuilder.cs b/Tapeti/Config/ITapetiConfigBuilder.cs new file mode 100644 index 0000000..dab033d --- /dev/null +++ b/Tapeti/Config/ITapetiConfigBuilder.cs @@ -0,0 +1,116 @@ +using System; + +namespace Tapeti.Config +{ + /// + /// Configures Tapeti. Every method other than Build returns the builder instance + /// for method chaining. + /// + public interface ITapetiConfigBuilder + { + /// + /// Returns a locked version of the configuration which can be used to establish a connection. + /// + ITapetiConfig Build(); + + + /// + /// Registers binding middleware which is called when a binding is created for a controller method. + /// + /// + ITapetiConfigBuilder Use(IControllerBindingMiddleware handler); + + /// + /// Registers message middleware which is called to handle an incoming message. + /// + /// + ITapetiConfigBuilder Use(IMessageMiddleware handler); + + /// + /// Registers publish middleware which is called when a message is published. + /// + /// + ITapetiConfigBuilder Use(IPublishMiddleware handler); + + + /// + /// Registers a Tapeti extension, which is a bundling mechanism for features that require more than one middleware and + /// optionally other dependency injected implementations. + /// + /// + ITapetiConfigBuilder Use(ITapetiExtension extension); + + + /// + /// Registers a binding which can accept messages. In most cases this method should not be called outside + /// of Tapeti. Instead use the RegisterAllControllers extension method to automatically create bindings. + /// + /// + void RegisterBinding(IBinding binding); + + + /// + /// Disables 'publisher confirms'. This RabbitMQ features allows Tapeti to be notified if a message + /// has no route, and guarantees delivery for request-response style messages and those marked with + /// the Mandatory attribute. On by default. + /// + /// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds, + /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may + /// result in never-ending flows. Only disable if you can accept those consequences. + /// + ITapetiConfigBuilder DisablePublisherConfirms(); + + + /// + /// Configures 'publisher confirms'. This RabbitMQ features allows Tapeti to be notified if a message + /// has no route, and guarantees delivery for request-response style messages and those marked with + /// the Mandatory attribute. On by default. + /// + /// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds, + /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may + /// result in never-ending flows. Only disable if you can accept those consequences. + /// + ITapetiConfigBuilder SetPublisherConfirms(bool enabled); + + + /// + /// Enables the automatic creation of durable queues and updating of their bindings. + /// + /// + /// Note that access to the RabbitMQ Management plugin's REST API is required for this + /// feature to work, since AMQP does not provide a way to query existing bindings. + /// + ITapetiConfigBuilder EnableDeclareDurableQueues(); + + + /// + /// Configures the automatic creation of durable queues and updating of their bindings. + /// + /// + /// Note that access to the RabbitMQ Management plugin's REST API is required for this + /// feature to work, since AMQP does not provide a way to query existing bindings. + /// + ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled); + } + + + /// + /// Access interface for ITapetiConfigBuilder extension methods. Allows access to the registered middleware + /// before the configuration is built. Implementations of ITapetiConfigBuilder should also implement this interface. + /// Should not be used outside of Tapeti packages. + /// + public interface ITapetiConfigBuilderAccess + { + /// + /// Provides access to the dependency resolver. + /// + IDependencyResolver DependencyResolver { get; } + + /// + /// Applies the currently registered binding middleware to + /// + /// + /// + void ApplyBindingMiddleware(IControllerBindingContext context, Action lastHandler); + } +} diff --git a/Tapeti/Config/ITapetiExtension.cs b/Tapeti/Config/ITapetiExtension.cs index 6bc6f6c..24ffc06 100644 --- a/Tapeti/Config/ITapetiExtension.cs +++ b/Tapeti/Config/ITapetiExtension.cs @@ -2,10 +2,23 @@ namespace Tapeti.Config { + /// + /// A bundling mechanism for Tapeti extension packages. Allows the calling application to + /// pass all the necessary components to TapetiConfig.Use in one call. + /// public interface ITapetiExtension { + /// + /// Allows the extension to register default implementations into the IoC container. + /// + /// void RegisterDefaults(IDependencyContainer container); + /// + /// Produces a list of middleware implementations to be passed to the TapetiConfig.Use method. + /// + /// + /// A list of middleware implementations or null if no middleware needs to be registered IEnumerable GetMiddleware(IDependencyResolver dependencyResolver); } } diff --git a/Tapeti/Config/ITapetiExtensionBinding.cs b/Tapeti/Config/ITapetiExtensionBinding.cs new file mode 100644 index 0000000..33b064e --- /dev/null +++ b/Tapeti/Config/ITapetiExtensionBinding.cs @@ -0,0 +1,18 @@ +using System.Collections.Generic; + +namespace Tapeti.Config +{ + /// + /// + /// Provides a way for Tapeti extensions to register custom bindings. + /// + public interface ITapetiExtensionBinding : ITapetiExtension + { + /// + /// Produces a list of bindings to be registered. + /// + /// + /// A list of bindings or null if no bindings need to be registered + IEnumerable GetBindings(IDependencyResolver dependencyResolver); + } +} \ No newline at end of file diff --git a/Tapeti/Config/ITapetiExtentionBinding.cs b/Tapeti/Config/ITapetiExtentionBinding.cs deleted file mode 100644 index 5eee3a4..0000000 --- a/Tapeti/Config/ITapetiExtentionBinding.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System.Collections.Generic; - -namespace Tapeti.Config -{ - public interface ITapetiExtentionBinding - { - IEnumerable GetBindings(IDependencyResolver dependencyResolver); - - } -} \ No newline at end of file diff --git a/Tapeti/Connection/IConnectionEventListener.cs b/Tapeti/Connection/IConnectionEventListener.cs index d86feab..395b393 100644 --- a/Tapeti/Connection/IConnectionEventListener.cs +++ b/Tapeti/Connection/IConnectionEventListener.cs @@ -1,16 +1,25 @@ namespace Tapeti.Connection { - public class DisconnectedEventArgs - { - public ushort ReplyCode; - public string ReplyText; - } - - + /// + /// Receives notifications on the state of the connection. + /// public interface IConnectionEventListener { + /// + /// Called when a connection to RabbitMQ has been established. + /// void Connected(); + + + /// + /// Called when the connection to RabbitMQ has been lost. + /// void Reconnected(); + + + /// + /// Called when the connection to RabbitMQ has been recovered after an unexpected disconnect. + /// void Disconnected(DisconnectedEventArgs e); } } diff --git a/Tapeti/Connection/ITapetiClient.cs b/Tapeti/Connection/ITapetiClient.cs new file mode 100644 index 0000000..ce83888 --- /dev/null +++ b/Tapeti/Connection/ITapetiClient.cs @@ -0,0 +1,113 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Connection +{ + /// + /// + /// Defines a queue binding to an exchange using a routing key + /// + public struct QueueBinding : IEquatable + { + /// + public readonly string Exchange; + + /// + public readonly string RoutingKey; + + + /// + /// Initializes a new QueueBinding + /// + /// + /// + public QueueBinding(string exchange, string routingKey) + { + Exchange = exchange; + RoutingKey = routingKey; + } + + + /// + public bool Equals(QueueBinding other) + { + return string.Equals(Exchange, other.Exchange) && string.Equals(RoutingKey, other.RoutingKey); + } + + /// + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + return obj is QueueBinding other && Equals(other); + } + + /// + public override int GetHashCode() + { + unchecked + { + return ((Exchange != null ? Exchange.GetHashCode() : 0) * 397) ^ (RoutingKey != null ? RoutingKey.GetHashCode() : 0); + } + } + } + + + /// + /// Provides a bridge between Tapeti and the actual RabbitMQ client + /// + public interface ITapetiClient + { + /// + /// Publishes a message. The exchange and routing key are determined by the registered strategies. + /// + /// The raw message data to publish + /// Metadata to include in the message + /// The exchange to publish the message to, or empty to send it directly to a queue + /// The routing key for the message, or queue name if exchange is empty + /// If true, an exception will be raised if the message can not be delivered to at least one queue + Task Publish(byte[] body, IMessageProperties properties, string exchange, string routingKey, bool mandatory); + + + /// + /// Starts a consumer for the specified queue, using the provided bindings to handle messages. + /// + /// + /// The consumer implementation which will receive the messages from the queue + Task Consume(string queueName, IConsumer consumer); + + + /// + /// Creates a durable queue if it does not already exist, and updates the bindings. + /// + /// The name of the queue to create + /// A list of bindings. Any bindings already on the queue which are not in this list will be removed + Task DurableQueueDeclare(string queueName, IEnumerable bindings); + + /// + /// Verifies a durable queue exists. Will raise an exception if it does not. + /// + /// The name of the queue to verify + Task DurableQueueVerify(string queueName); + + /// + /// Creates a dynamic queue. + /// + /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. + Task DynamicQueueDeclare(string queuePrefix = null); + + /// + /// Add a binding to a dynamic queue. + /// + /// The name of the dynamic queue previously created using DynamicQueueDeclare + /// The binding to add to the dynamic queue + Task DynamicQueueBind(string queueName, QueueBinding binding); + + + /// + /// Closes the connection to RabbitMQ gracefully. + /// + Task Close(); + } +} \ No newline at end of file diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs new file mode 100644 index 0000000..80b30ff --- /dev/null +++ b/Tapeti/Connection/TapetiBasicConsumer.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading.Tasks; +using RabbitMQ.Client; +using Tapeti.Default; + +namespace Tapeti.Connection +{ + /// + /// + /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer + /// + public class TapetiBasicConsumer : DefaultBasicConsumer + { + private readonly IConsumer consumer; + private readonly Func onRespond; + + + /// + public TapetiBasicConsumer(IConsumer consumer, Func onRespond) + { + this.consumer = consumer; + this.onRespond = onRespond; + } + + + /// + public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) + { + Task.Run(async () => + { + try + { + var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), body); + await onRespond(deliveryTag, response); + } + catch + { + await onRespond(deliveryTag, ConsumeResponse.Nack); + } + }); + } + } +} diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiClient.cs similarity index 50% rename from Tapeti/Connection/TapetiWorker.cs rename to Tapeti/Connection/TapetiClient.cs index aca3a3b..35ca29f 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -1,34 +1,45 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; +using System.Net; +using System.Net.Http; using System.Threading; using System.Threading.Tasks; +using System.Web; +using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; using Tapeti.Config; +using Tapeti.Default; using Tapeti.Exceptions; -using Tapeti.Helpers; using Tapeti.Tasks; namespace Tapeti.Connection { - public class TapetiWorker + /// + /// + /// Implementation of ITapetiClient for the RabbitMQ Client library + /// + public class TapetiClient : ITapetiClient { private const int ReconnectDelay = 5000; private const int MandatoryReturnTimeout = 30000; private const int MinimumConnectedReconnectDelay = 1000; - private readonly IConfig config; + private readonly TapetiConnectionParams connectionParams; + + private readonly ITapetiConfig config; private readonly ILogger logger; - public TapetiConnectionParams ConnectionParams { get; set; } + + + /// + /// Receives events when the connection state changes. + /// public IConnectionEventListener ConnectionEventListener { get; set; } - private readonly IMessageSerializer messageSerializer; - private readonly IRoutingKeyStrategy routingKeyStrategy; - private readonly IExchangeStrategy exchangeStrategy; + private readonly Lazy taskQueue = new Lazy(); @@ -39,6 +50,7 @@ namespace Tapeti.Connection private IModel channelInstance; private ulong lastDeliveryTag; private DateTime connectedDateTime; + private HttpClient managementClient; // These fields must be locked, since the callbacks for BasicAck/BasicReturn can run in a different thread private readonly object confirmLock = new object(); @@ -60,88 +72,123 @@ namespace Tapeti.Connection } - - public TapetiWorker(IConfig config) + /// + public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams) { this.config = config; + this.connectionParams = connectionParams; logger = config.DependencyResolver.Resolve(); - messageSerializer = config.DependencyResolver.Resolve(); - routingKeyStrategy = config.DependencyResolver.Resolve(); - exchangeStrategy = config.DependencyResolver.Resolve(); - } - public Task Publish(object message, IBasicProperties properties, bool mandatory) - { - return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType()), mandatory); - } - - - public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory) - { - return Publish(message, properties, "", queueName, mandatory); - } - - - public Task Consume(string queueName, IEnumerable bindings) - { - if (string.IsNullOrEmpty(queueName)) - throw new ArgumentNullException(nameof(queueName)); - - return taskQueue.Value.Add(() => + var handler = new HttpClientHandler { - WithRetryableChannel(channel => channel.BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware))); + Credentials = new NetworkCredential(connectionParams.Username, connectionParams.Password) + }; + + managementClient = new HttpClient(handler) + { + Timeout = TimeSpan.FromSeconds(30) + }; + + managementClient.DefaultRequestHeaders.Add("Connection", "close"); + } + + + /// + public async Task Publish(byte[] body, IMessageProperties properties, string exchange, string routingKey, bool mandatory) + { + var publishProperties = new RabbitMQMessageProperties(new BasicProperties(), properties); + + await taskQueue.Value.Add(async () => + { + Task publishResultTask = null; + var messageInfo = new ConfirmMessageInfo + { + ReturnKey = GetReturnKey(exchange, routingKey), + CompletionSource = new TaskCompletionSource() + }; + + + WithRetryableChannel(channel => + { + // The delivery tag is lost after a reconnect, register under the new tag + if (config.Features.PublisherConfirms) + { + lastDeliveryTag++; + + Monitor.Enter(confirmLock); + try + { + confirmMessages.Add(lastDeliveryTag, messageInfo); + } + finally + { + Monitor.Exit(confirmLock); + } + + publishResultTask = messageInfo.CompletionSource.Task; + } + else + mandatory = false; + + channel.BasicPublish(exchange, routingKey, mandatory, publishProperties.BasicProperties, body); + }); + + + if (publishResultTask == null) + return; + + var delayCancellationTokenSource = new CancellationTokenSource(); + var signalledTask = await Task.WhenAny( + publishResultTask, + Task.Delay(MandatoryReturnTimeout, delayCancellationTokenSource.Token)); + + if (signalledTask != publishResultTask) + throw new TimeoutException( + $"Timeout while waiting for basic.return for message with exchange '{exchange}' and routing key '{routingKey}'"); + + delayCancellationTokenSource.Cancel(); + + if (publishResultTask.IsCanceled) + throw new NackException( + $"Mandatory message with with exchange '{exchange}' and routing key '{routingKey}' was nacked"); + + var replyCode = publishResultTask.Result; + + // There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code + // at the time of writing... + if (replyCode == 312) + throw new NoRouteException( + $"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' does not have a route"); + + if (replyCode > 0) + throw new NoRouteException( + $"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' could not be delivered, reply code: {replyCode}"); }); } - public Task Subscribe(IQueue queue) + /// + public async Task Consume(string queueName, IConsumer consumer) { - return taskQueue.Value.Add(() => + if (string.IsNullOrEmpty(queueName)) + throw new ArgumentNullException(nameof(queueName)); + + await taskQueue.Value.Add(() => { - WithRetryableChannel(channel => + WithRetryableChannel(channel => { - if (queue.Dynamic) - { - if (!(queue is IDynamicQueue dynamicQueue)) - throw new NullReferenceException("Queue with Dynamic = true must implement IDynamicQueue"); - - var declaredQueue = channel.QueueDeclare(dynamicQueue.GetDeclareQueueName()); - dynamicQueue.SetName(declaredQueue.QueueName); - - foreach (var binding in queue.Bindings) - { - if (binding.QueueBindingMode == QueueBindingMode.RoutingKey) - { - if (binding.MessageClass == null) - throw new NullReferenceException("Binding with QueueBindingMode = RoutingKey must have a MessageClass"); - - var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); - var exchange = exchangeStrategy.GetExchange(binding.MessageClass); - - channel.QueueBind(declaredQueue.QueueName, exchange, routingKey); - } - - (binding as IBuildBinding)?.SetQueueName(declaredQueue.QueueName); - } - } - else - { - channel.QueueDeclarePassive(queue.Name); - foreach (var binding in queue.Bindings) - { - (binding as IBuildBinding)?.SetQueueName(queue.Name); - } - } + var basicConsumer = new TapetiBasicConsumer(consumer, Respond); + channel.BasicConsume(queueName, false, basicConsumer); }); }); } - public Task Respond(ulong deliveryTag, ConsumeResponse response) + private async Task Respond(ulong deliveryTag, ConsumeResponse response) { - return taskQueue.Value.Add(() => + await taskQueue.Value.Add(() => { // No need for a retryable channel here, if the connection is lost we can't // use the deliveryTag anymore. @@ -167,12 +214,82 @@ namespace Tapeti.Connection } - public Task Close() + /// + public async Task DurableQueueDeclare(string queueName, IEnumerable bindings) + { + await taskQueue.Value.Add(async () => + { + var existingBindings = await GetQueueBindings(queueName); + + WithRetryableChannel(channel => + { + channel.QueueDeclare(queueName, true, false, false); + + var currentBindings = bindings.ToList(); + + foreach (var binding in currentBindings) + channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey); + + foreach (var deletedBinding in existingBindings.Where(binding => !currentBindings.Any(b => b.Exchange == binding.Exchange && b.RoutingKey == binding.RoutingKey))) + channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey); + }); + }); + } + + /// + public async Task DurableQueueVerify(string queueName) + { + await taskQueue.Value.Add(() => + { + WithRetryableChannel(channel => + { + channel.QueueDeclarePassive(queueName); + }); + }); + } + + /// + public async Task DynamicQueueDeclare(string queuePrefix = null) + { + string queueName = null; + + await taskQueue.Value.Add(() => + { + WithRetryableChannel(channel => + { + if (!string.IsNullOrEmpty(queuePrefix)) + { + queueName = queuePrefix + "." + Guid.NewGuid().ToString("N"); + channel.QueueDeclare(queueName); + } + else + queueName = channel.QueueDeclare().QueueName; + }); + }); + + return queueName; + } + + /// + public async Task DynamicQueueBind(string queueName, QueueBinding binding) + { + await taskQueue.Value.Add(() => + { + WithRetryableChannel(channel => + { + channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey); + }); + }); + } + + + /// + public async Task Close() { if (!taskQueue.IsValueCreated) - return Task.CompletedTask; + return; - return taskQueue.Value.Add(() => + await taskQueue.Value.Add(() => { isClosing = true; @@ -194,94 +311,95 @@ namespace Tapeti.Connection } - private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey, bool mandatory) + private static readonly List TransientStatusCodes = new List { - var context = new PublishContext - { - DependencyResolver = config.DependencyResolver, - Exchange = exchange, - RoutingKey = routingKey, - Message = message, - Properties = properties ?? new BasicProperties() - }; + HttpStatusCode.GatewayTimeout, + HttpStatusCode.RequestTimeout, + HttpStatusCode.ServiceUnavailable + }; - if (!context.Properties.IsTimestampPresent()) - context.Properties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()); - - if (!context.Properties.IsDeliveryModePresent()) - context.Properties.DeliveryMode = 2; // Persistent + private static readonly TimeSpan[] ExponentialBackoff = + { + TimeSpan.FromSeconds(1), + TimeSpan.FromSeconds(2), + TimeSpan.FromSeconds(3), + TimeSpan.FromSeconds(5), + TimeSpan.FromSeconds(8), + TimeSpan.FromSeconds(13), + TimeSpan.FromSeconds(21), + TimeSpan.FromSeconds(34), + TimeSpan.FromSeconds(55) + }; - // ReSharper disable ImplicitlyCapturedClosure - MiddlewareHelper will not keep a reference to the lambdas - return MiddlewareHelper.GoAsync( - config.PublishMiddleware, - async (handler, next) => await handler.Handle(context, next), - () => taskQueue.Value.Add(async () => + private class ManagementBinding + { + [JsonProperty("source")] + public string Source { get; set; } + + [JsonProperty("vhost")] + public string Vhost { get; set; } + + [JsonProperty("destination")] + public string Destination { get; set; } + + [JsonProperty("destination_type")] + public string DestinationType { get; set; } + + [JsonProperty("routing_key")] + public string RoutingKey { get; set; } + + [JsonProperty("arguments")] + public Dictionary Arguments { get; set; } + + [JsonProperty("properties_key")] + public string PropertiesKey { get; set; } + } + + + private async Task> GetQueueBindings(string queueName) + { + var virtualHostPath = Uri.EscapeDataString(connectionParams.VirtualHost); + var queuePath = Uri.EscapeDataString(queueName); + var requestUri = new Uri($"{connectionParams.HostName}:{connectionParams.Port}/api/queues/{virtualHostPath}/{queuePath}/bindings"); + + using (var request = new HttpRequestMessage(HttpMethod.Get, requestUri)) + { + var retryDelayIndex = 0; + + while (true) { - if (Thread.CurrentThread.ManagedThreadId != 3) - Debug.WriteLine(Thread.CurrentThread.ManagedThreadId); - - var body = messageSerializer.Serialize(context.Message, context.Properties); - - Task publishResultTask = null; - var messageInfo = new ConfirmMessageInfo + try { - ReturnKey = GetReturnKey(context.Exchange, context.RoutingKey), - CompletionSource = new TaskCompletionSource() - }; + var response = await managementClient.SendAsync(request); + response.EnsureSuccessStatusCode(); + var content = await response.Content.ReadAsStringAsync(); + var bindings = JsonConvert.DeserializeObject>(content); - WithRetryableChannel(channel => + // Filter out the binding to an empty source, which is always present for direct-to-queue routing + return bindings + .Where(binding => !string.IsNullOrEmpty(binding.Source)) + .Select(binding => new QueueBinding(binding.Source, binding.RoutingKey)); + } + catch (TimeoutException) { - // The delivery tag is lost after a reconnect, register under the new tag - if (config.UsePublisherConfirms) - { - lastDeliveryTag++; + } + catch (WebException e) + { + if (!(e.Response is HttpWebResponse response)) + throw; - Monitor.Enter(confirmLock); - try - { - confirmMessages.Add(lastDeliveryTag, messageInfo); - } - finally - { - Monitor.Exit(confirmLock); - } + if (!TransientStatusCodes.Contains(response.StatusCode)) + throw; + } - publishResultTask = messageInfo.CompletionSource.Task; - } - else - mandatory = false; + await Task.Delay(ExponentialBackoff[retryDelayIndex]); - channel.BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body); - }); - - - if (publishResultTask == null) - return; - - var delayCancellationTokenSource = new CancellationTokenSource(); - var signalledTask = await Task.WhenAny(publishResultTask, Task.Delay(MandatoryReturnTimeout, delayCancellationTokenSource.Token)); - - if (signalledTask != publishResultTask) - throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}"); - - delayCancellationTokenSource.Cancel(); - - if (publishResultTask.IsCanceled) - throw new NackException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} was nacked"); - - var replyCode = publishResultTask.Result; - - // There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code - // at the time of writing... - if (replyCode == 312) - throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} does not have a route"); - - if (replyCode > 0) - throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}"); - })); - // ReSharper restore ImplicitlyCapturedClosure + if (retryDelayIndex < ExponentialBackoff.Length - 1) + retryDelayIndex++; + } + } } @@ -298,9 +416,8 @@ namespace Tapeti.Connection operation(GetChannel()); break; } - catch (AlreadyClosedException e) + catch (AlreadyClosedException) { - // TODO log? } } } @@ -323,11 +440,11 @@ namespace Tapeti.Connection var connectionFactory = new ConnectionFactory { - HostName = ConnectionParams.HostName, - Port = ConnectionParams.Port, - VirtualHost = ConnectionParams.VirtualHost, - UserName = ConnectionParams.Username, - Password = ConnectionParams.Password, + HostName = connectionParams.HostName, + Port = connectionParams.Port, + VirtualHost = connectionParams.VirtualHost, + UserName = connectionParams.Username, + Password = connectionParams.Password, AutomaticRecoveryEnabled = false, TopologyRecoveryEnabled = false, RequestedHeartbeat = 30 @@ -337,7 +454,7 @@ namespace Tapeti.Connection { try { - logger.Connect(ConnectionParams); + logger.Connect(connectionParams); connection = connectionFactory.CreateConnection(); channelInstance = connection.CreateModel(); @@ -345,13 +462,16 @@ namespace Tapeti.Connection if (channelInstance == null) throw new BrokerUnreachableException(null); - if (config.UsePublisherConfirms) + if (config.Features.PublisherConfirms) { lastDeliveryTag = 0; Monitor.Enter(confirmLock); try { + foreach (var pair in confirmMessages) + pair.Value.CompletionSource.SetCanceled(); + confirmMessages.Clear(); } finally @@ -362,8 +482,8 @@ namespace Tapeti.Connection channelInstance.ConfirmSelect(); } - if (ConnectionParams.PrefetchCount > 0) - channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false); + if (connectionParams.PrefetchCount > 0) + channelInstance.BasicQos(0, connectionParams.PrefetchCount, false); channelInstance.ModelShutdown += (sender, e) => { @@ -390,14 +510,14 @@ namespace Tapeti.Connection else ConnectionEventListener?.Connected(); - logger.ConnectSuccess(ConnectionParams); + logger.ConnectSuccess(connectionParams); isReconnect = true; break; } catch (BrokerUnreachableException e) { - logger.ConnectFailed(ConnectionParams, e); + logger.ConnectFailed(connectionParams, e); Thread.Sleep(ReconnectDelay); } } @@ -507,15 +627,5 @@ namespace Tapeti.Connection { return exchange + ':' + routingKey; } - - - private class PublishContext : IPublishContext - { - public IDependencyResolver DependencyResolver { get; set; } - public string Exchange { get; set; } - public string RoutingKey { get; set; } - public object Message { get; set; } - public IBasicProperties Properties { get; set; } - } } } diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 06f87aa..fff74ef 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -1,309 +1,190 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Runtime.ExceptionServices; -using RabbitMQ.Client; using Tapeti.Config; using Tapeti.Default; using System.Threading.Tasks; +using Tapeti.Helpers; namespace Tapeti.Connection { - public class TapetiConsumer : DefaultBasicConsumer + /// + /// + /// Implements a RabbitMQ consumer to pass messages to the Tapeti middleware. + /// + public class TapetiConsumer : IConsumer { - private readonly TapetiWorker worker; + private readonly ITapetiConfig config; private readonly string queueName; - private readonly IDependencyResolver dependencyResolver; - private readonly IReadOnlyList messageMiddleware; - private readonly IReadOnlyList cleanupMiddleware; private readonly List bindings; private readonly ILogger logger; private readonly IExceptionStrategy exceptionStrategy; + private readonly IMessageSerializer messageSerializer; - public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware, IReadOnlyList cleanupMiddleware) + /// + public TapetiConsumer(ITapetiConfig config, string queueName, IEnumerable bindings) { - this.worker = worker; + this.config = config; this.queueName = queueName; - this.dependencyResolver = dependencyResolver; - this.messageMiddleware = messageMiddleware; - this.cleanupMiddleware = cleanupMiddleware; this.bindings = bindings.ToList(); - logger = dependencyResolver.Resolve(); - exceptionStrategy = dependencyResolver.Resolve(); + logger = config.DependencyResolver.Resolve(); + exceptionStrategy = config.DependencyResolver.Resolve(); + messageSerializer = config.DependencyResolver.Resolve(); } - public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, - IBasicProperties properties, byte[] body) + /// + public async Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body) { - Task.Run(async () => + try { - MessageContext context = null; - HandlingResult handlingResult = null; - try + var message = messageSerializer.Deserialize(body, properties); + if (message == null) + throw new ArgumentException($"Message body could not be deserialized into a message object in queue {queueName}", nameof(body)); + + await DispatchMessage(message, new MessageContextData { + Exchange = exchange, + RoutingKey = routingKey, + Properties = properties + }); + + return ConsumeResponse.Ack; + } + catch (Exception e) + { + // TODO exception strategy + // TODO logger + return ConsumeResponse.Nack; + } + + + /* + + handlingResult = new HandlingResult + { + ConsumeResponse = ConsumeResponse.Ack, + MessageAction = MessageAction.None + }; + } + catch (Exception eDispatch) + { + var exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch)); + logger.HandlerException(eDispatch); try { - context = new MessageContext - { - DependencyResolver = dependencyResolver, - Queue = queueName, - RoutingKey = routingKey, - Properties = properties - }; + var exceptionStrategyContext = new ExceptionStrategyContext(context, exception.SourceException); - await DispatchMesage(context, body); + exceptionStrategy.HandleException(exceptionStrategyContext); + handlingResult = exceptionStrategyContext.HandlingResult.ToHandlingResult(); + } + catch (Exception eStrategy) + { + logger.HandlerException(eStrategy); + } + } + try + { + if (handlingResult == null) + { handlingResult = new HandlingResult { - ConsumeResponse = ConsumeResponse.Ack, + ConsumeResponse = ConsumeResponse.Nack, MessageAction = MessageAction.None }; } - catch (Exception eDispatch) - { - var exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch)); - logger.HandlerException(eDispatch); - try - { - var exceptionStrategyContext = new ExceptionStrategyContext(context, exception.SourceException); - - exceptionStrategy.HandleException(exceptionStrategyContext); - - handlingResult = exceptionStrategyContext.HandlingResult.ToHandlingResult(); - } - catch (Exception eStrategy) - { - logger.HandlerException(eStrategy); - } - } - try - { - if (handlingResult == null) - { - handlingResult = new HandlingResult - { - ConsumeResponse = ConsumeResponse.Nack, - MessageAction = MessageAction.None - }; - } - await RunCleanup(context, handlingResult); - } - catch (Exception eCleanup) - { - logger.HandlerException(eCleanup); - } - } - finally - { - try - { - if (handlingResult == null) - { - handlingResult = new HandlingResult - { - ConsumeResponse = ConsumeResponse.Nack, - MessageAction = MessageAction.None - }; - } - await worker.Respond(deliveryTag, handlingResult.ConsumeResponse); - } - catch (Exception eRespond) - { - logger.HandlerException(eRespond); - } - try - { - context?.Dispose(); - } - catch (Exception eDispose) - { - logger.HandlerException(eDispose); - } - } - }); - } - - private async Task RunCleanup(MessageContext context, HandlingResult handlingResult) - { - foreach(var handler in cleanupMiddleware) - { - try - { - await handler.Handle(context, handlingResult); + await RunCleanup(context, handlingResult); } catch (Exception eCleanup) { logger.HandlerException(eCleanup); } } + finally + { + try + { + if (handlingResult == null) + { + handlingResult = new HandlingResult + { + ConsumeResponse = ConsumeResponse.Nack, + MessageAction = MessageAction.None + }; + } + await client.Respond(deliveryTag, handlingResult.ConsumeResponse); + } + catch (Exception eRespond) + { + logger.HandlerException(eRespond); + } + try + { + context?.Dispose(); + } + catch (Exception eDispose) + { + logger.HandlerException(eDispose); + } + } + */ } - private async Task DispatchMesage(MessageContext context, byte[] body) + + private async Task DispatchMessage(object message, MessageContextData messageContextData) { - var message = dependencyResolver.Resolve().Deserialize(body, context.Properties); - if (message == null) - throw new ArgumentException("Empty message"); - - context.Message = message; - + var messageType = message.GetType(); var validMessageType = false; foreach (var binding in bindings) { - if (binding.Accept(context, message)) - { - await InvokeUsingBinding(context, binding, message); + if (!binding.Accept(messageType)) + continue; - validMessageType = true; - } + await InvokeUsingBinding(message, messageContextData, binding); + validMessageType = true; } if (!validMessageType) - throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); + throw new ArgumentException($"Unsupported message type in queue {queueName}: {message.GetType().FullName}"); } - private Task InvokeUsingBinding(MessageContext context, IBinding binding, object message) + + private async Task InvokeUsingBinding(object message, MessageContextData messageContextData, IBinding binding) { - context.Binding = binding; - - RecursiveCaller firstCaller = null; - RecursiveCaller currentCaller = null; - - void AddHandler(Handler handle) + var context = new MessageContext { - var caller = new RecursiveCaller(handle); - if (currentCaller == null) - firstCaller = caller; - else - currentCaller.Next = caller; - currentCaller = caller; - } - - if (binding.MessageFilterMiddleware != null) - { - foreach (var m in binding.MessageFilterMiddleware) - { - AddHandler(m.Handle); - } - } - - AddHandler(async (c, next) => - { - c.Controller = dependencyResolver.Resolve(binding.Controller); - await next(); - }); - - foreach (var m in messageMiddleware) - { - AddHandler(m.Handle); - } - - if (binding.MessageMiddleware != null) - { - foreach (var m in binding.MessageMiddleware) - { - AddHandler(m.Handle); - } - } - - AddHandler(async (c, next) => - { - await binding.Invoke(c, message); - }); - - return firstCaller.Call(context); - } - - private static Exception UnwrapException(Exception exception) - { - // In async/await style code this is handled similarly. For synchronous - // code using Tasks we have to unwrap these ourselves to get the proper - // exception directly instead of "Errors occured". We might lose - // some stack traces in the process though. - while (true) - { - var aggregateException = exception as AggregateException; - if (aggregateException == null || aggregateException.InnerExceptions.Count != 1) - return exception; - - exception = aggregateException.InnerExceptions[0]; - } - } - } - - public delegate Task Handler(MessageContext context, Func next); - - public class RecursiveCaller - { - private readonly Handler handle; - private MessageContext currentContext; - private MessageContext nextContext; - - public RecursiveCaller Next; - - public RecursiveCaller(Handler handle) - { - this.handle = handle; - } - - internal async Task Call(MessageContext context) - { - if (currentContext != null) - throw new InvalidOperationException("Cannot simultaneously call 'next' in Middleware."); + Config = config, + Queue = queueName, + Exchange = messageContextData.Exchange, + RoutingKey = messageContextData.RoutingKey, + Message = message, + Properties = messageContextData.Properties, + Binding = binding + }; try { - currentContext = context; - - context.UseNestedContext = Next == null ? (Action)null : UseNestedContext; - - await handle(context, CallNext); + await MiddlewareHelper.GoAsync(config.Middleware.Message, + (handler, next) => handler.Handle(context, next), + async () => { await binding.Invoke(context); }); } finally { - currentContext = null; + context.Dispose(); } } - private async Task CallNext() - { - if (Next == null) - return; - if (nextContext != null) - { - await Next.Call(nextContext); - }else - { - try - { - await Next.Call(currentContext); - } - finally - { - currentContext.UseNestedContext = UseNestedContext; - } - } - } - void UseNestedContext(MessageContext context) + private struct MessageContextData { - if (nextContext != null) - throw new InvalidOperationException("Previous nested context was not yet disposed."); - - context.OnContextDisposed = OnContextDisposed; - nextContext = context; - } - - void OnContextDisposed(MessageContext context) - { - context.OnContextDisposed = null; - if (nextContext == context) - nextContext = null; + public string Exchange; + public string RoutingKey; + public IMessageProperties Properties; } } - } diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs index 8887b85..19bea75 100644 --- a/Tapeti/Connection/TapetiPublisher.cs +++ b/Tapeti/Connection/TapetiPublisher.cs @@ -1,37 +1,92 @@ using System; +using System.Diagnostics; using System.Reflection; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using Tapeti.Annotations; +using Tapeti.Config; +using Tapeti.Default; +using Tapeti.Exceptions; +using Tapeti.Helpers; namespace Tapeti.Connection { + /// public class TapetiPublisher : IInternalPublisher { - private readonly Func workerFactory; + private readonly ITapetiConfig config; + private readonly Func clientFactory; + private readonly IExchangeStrategy exchangeStrategy; + private readonly IRoutingKeyStrategy routingKeyStrategy; + private readonly IMessageSerializer messageSerializer; - public TapetiPublisher(Func workerFactory) + /// + public TapetiPublisher(ITapetiConfig config, Func clientFactory) { - this.workerFactory = workerFactory; + this.config = config; + this.clientFactory = clientFactory; + + exchangeStrategy = config.DependencyResolver.Resolve(); + routingKeyStrategy = config.DependencyResolver.Resolve(); + messageSerializer = config.DependencyResolver.Resolve(); } - public Task Publish(object message) + /// + public async Task Publish(object message) { - return workerFactory().Publish(message, null, IsMandatory(message)); + await Publish(message, null, IsMandatory(message)); } - public Task Publish(object message, IBasicProperties properties, bool mandatory) + /// + public async Task Publish(object message, IMessageProperties properties, bool mandatory) { - return workerFactory().Publish(message, properties, mandatory); + var messageClass = message.GetType(); + var exchange = exchangeStrategy.GetExchange(messageClass); + var routingKey = routingKeyStrategy.GetRoutingKey(messageClass); + + await Publish(message, properties, exchange, routingKey, mandatory); } - public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory) + /// + public async Task PublishDirect(object message, string queueName, IMessageProperties properties, bool mandatory) { - return workerFactory().PublishDirect(message, queueName, properties, mandatory); + await Publish(message, properties, null, queueName, mandatory); + } + + + private async Task Publish(object message, IMessageProperties properties, string exchange, string routingKey, bool mandatory) + { + var writableProperties = new MessageProperties(properties); + + if (!writableProperties.Timestamp.HasValue) + writableProperties.Timestamp = DateTime.UtcNow; + + writableProperties.Persistent = true; + + + var context = new PublishContext + { + Config = config, + Exchange = exchange, + RoutingKey = routingKey, + Message = message, + Properties = writableProperties + }; + + + await MiddlewareHelper.GoAsync( + config.Middleware.Publish, + async (handler, next) => await handler.Handle(context, next), + async () => + { + var body = messageSerializer.Serialize(message, writableProperties); + await clientFactory().Publish(body, writableProperties, exchange, routingKey, mandatory); + }); } @@ -39,5 +94,15 @@ namespace Tapeti.Connection { return message.GetType().GetCustomAttribute() != null; } + + + private class PublishContext : IPublishContext + { + public ITapetiConfig Config { get; set; } + public string Exchange { get; set; } + public string RoutingKey { get; set; } + public object Message { get; set; } + public IMessageProperties Properties { get; set; } + } } } diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index ce309b2..6013b61 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -6,39 +6,273 @@ using Tapeti.Config; namespace Tapeti.Connection { + /// public class TapetiSubscriber : ISubscriber { - private readonly Func workerFactory; - private readonly List queues; + private readonly Func clientFactory; + private readonly ITapetiConfig config; private bool consuming; - public TapetiSubscriber(Func workerFactory, IEnumerable queues) + /// + public TapetiSubscriber(Func clientFactory, ITapetiConfig config) { - this.workerFactory = workerFactory; - this.queues = queues.ToList(); + this.clientFactory = clientFactory; + this.config = config; } - public Task BindQueues() - { - return Task.WhenAll(queues.Select(queue => workerFactory().Subscribe(queue)).ToList()); - } - - - public Task RebindQueues() + /// + /// Applies the configured bindings and declares the queues in RabbitMQ. For internal use only. + /// + /// + public async Task ApplyBindings() { - return BindQueues(); + var routingKeyStrategy = config.DependencyResolver.Resolve(); + var exchangeStrategy = config.DependencyResolver.Resolve(); + + var bindingTarget = config.Features.DeclareDurableQueues + ? (CustomBindingTarget)new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy) + : new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy); + + await Task.WhenAll(config.Bindings.Select(binding => binding.Apply(bindingTarget))); + await bindingTarget.Apply(); } - public Task Resume() + /// + public async Task Resume() { if (consuming) - return Task.CompletedTask; + return; consuming = true; - return Task.WhenAll(queues.Select(queue => workerFactory().Consume(queue.Name, queue.Bindings)).ToList()); + + var queues = config.Bindings.GroupBy(binding => binding.QueueName); + + await Task.WhenAll(queues.Select(async group => + { + var queueName = group.Key; + var consumer = new TapetiConsumer(config, queueName, group); + + await clientFactory().Consume(queueName, consumer); + })); + } + + + private async Task ApplyBinding(IBinding binding, IBindingTarget bindingTarget) + { + await binding.Apply(bindingTarget); + } + + + private abstract class CustomBindingTarget : IBindingTarget + { + protected readonly Func ClientFactory; + protected readonly IRoutingKeyStrategy RoutingKeyStrategy; + protected readonly IExchangeStrategy ExchangeStrategy; + + private struct DynamicQueueInfo + { + public string QueueName; + public List MessageClasses; + } + + private readonly Dictionary> dynamicQueues = new Dictionary>(); + + + protected CustomBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy) + { + ClientFactory = clientFactory; + RoutingKeyStrategy = routingKeyStrategy; + ExchangeStrategy = exchangeStrategy; + } + + + public virtual Task Apply() + { + return Task.CompletedTask; + } + + + public abstract Task BindDurable(Type messageClass, string queueName); + public abstract Task BindDirectDurable(string queueName); + + + public async Task BindDynamic(Type messageClass, string queuePrefix = null) + { + var result = await DeclareDynamicQueue(messageClass, queuePrefix); + + if (result.IsNewMessageClass) + { + var routingKey = RoutingKeyStrategy.GetRoutingKey(messageClass); + var exchange = ExchangeStrategy.GetExchange(messageClass); + + await ClientFactory().DynamicQueueBind(result.QueueName, new QueueBinding(exchange, routingKey)); + } + + return result.QueueName; + } + + + public async Task BindDirectDynamic(Type messageClass, string queuePrefix = null) + { + var result = await DeclareDynamicQueue(messageClass, queuePrefix); + return result.QueueName; + } + + + public async Task BindDirectDynamic(string queuePrefix = null) + { + // If we don't know the routing key, always create a new queue to ensure there is no overlap. + // Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either. + return await ClientFactory().DynamicQueueDeclare(queuePrefix); + } + + + private struct DeclareDynamicQueueResult + { + public string QueueName; + public bool IsNewMessageClass; + } + + private async Task DeclareDynamicQueue(Type messageClass, string queuePrefix) + { + // Group by prefix + var key = queuePrefix ?? ""; + if (!dynamicQueues.TryGetValue(key, out var prefixQueues)) + { + prefixQueues = new List(); + dynamicQueues.Add(key, prefixQueues); + } + + // Ensure routing keys are unique per dynamic queue, so that a requeue + // will not cause the side-effect of calling another handler again as well. + foreach (var existingQueueInfo in prefixQueues) + { + // ReSharper disable once InvertIf + if (!existingQueueInfo.MessageClasses.Contains(messageClass)) + { + // Allow this routing key in the existing dynamic queue + var result = new DeclareDynamicQueueResult + { + QueueName = existingQueueInfo.QueueName, + IsNewMessageClass = !existingQueueInfo.MessageClasses.Contains(messageClass) + }; + + if (result.IsNewMessageClass) + existingQueueInfo.MessageClasses.Add(messageClass); + + return result; + } + } + + // Declare a new queue + var queueName = await ClientFactory().DynamicQueueDeclare(queuePrefix); + var queueInfo = new DynamicQueueInfo + { + QueueName = queueName, + MessageClasses = new List { messageClass } + }; + + prefixQueues.Add(queueInfo); + + return new DeclareDynamicQueueResult + { + QueueName = queueName, + IsNewMessageClass = true + }; + } + } + + + private class DeclareDurableQueuesBindingTarget : CustomBindingTarget + { + private readonly Dictionary> durableQueues = new Dictionary>(); + + + public DeclareDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy) : base(clientFactory, routingKeyStrategy, exchangeStrategy) + { + } + + + public override Task BindDurable(Type messageClass, string queueName) + { + // Collect the message classes per queue so we can determine afterwards + // if any of the bindings currently set on the durable queue are no + // longer valid and should be removed. + if (!durableQueues.TryGetValue(queueName, out var messageClasses)) + { + durableQueues.Add(queueName, new List + { + messageClass + }); + } + else if (!messageClasses.Contains(messageClass)) + messageClasses.Add(messageClass); + + return Task.CompletedTask; + } + + + public override Task BindDirectDurable(string queueName) + { + if (!durableQueues.ContainsKey(queueName)) + durableQueues.Add(queueName, new List()); + + return Task.CompletedTask; + } + + + public override async Task Apply() + { + var worker = ClientFactory(); + + await Task.WhenAll(durableQueues.Select(async queue => + { + var bindings = queue.Value.Select(messageClass => + { + var exchange = ExchangeStrategy.GetExchange(messageClass); + var routingKey = RoutingKeyStrategy.GetRoutingKey(messageClass); + + return new QueueBinding(exchange, routingKey); + }); + + await worker.DurableQueueDeclare(queue.Key, bindings); + })); + } + } + + + private class PassiveDurableQueuesBindingTarget : CustomBindingTarget + { + private readonly List durableQueues = new List(); + + + public PassiveDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy) : base(clientFactory, routingKeyStrategy, exchangeStrategy) + { + } + + + public override async Task BindDurable(Type messageClass, string queueName) + { + await VerifyDurableQueue(queueName); + } + + public override async Task BindDirectDurable(string queueName) + { + await VerifyDurableQueue(queueName); + } + + + private async Task VerifyDurableQueue(string queueName) + { + if (!durableQueues.Contains(queueName)) + { + await ClientFactory().DurableQueueVerify(queueName); + durableQueues.Add(queueName); + } + } } } } diff --git a/Tapeti/ConsumeResponse.cs b/Tapeti/ConsumeResponse.cs index 2997930..d539170 100644 --- a/Tapeti/ConsumeResponse.cs +++ b/Tapeti/ConsumeResponse.cs @@ -1,9 +1,23 @@ namespace Tapeti { + /// + /// Determines the response sent back after handling a message. + /// public enum ConsumeResponse { + /// + /// Acknowledge the message and remove it from the queue + /// Ack, + + /// + /// Negatively acknowledge the message and remove it from the queue, send to dead-letter queue if configured on the bus + /// Nack, + + /// + /// Negatively acknowledge the message and put it back in the queue to try again later + /// Requeue } } diff --git a/Tapeti/Default/ControllerMessageContext.cs b/Tapeti/Default/ControllerMessageContext.cs new file mode 100644 index 0000000..4b29a07 --- /dev/null +++ b/Tapeti/Default/ControllerMessageContext.cs @@ -0,0 +1,50 @@ +using System; +using System.Collections.Generic; +using Tapeti.Config; + +namespace Tapeti.Default +{ + /// + public class ControllerMessageContext : MessageContext, IControllerMessageContext + { + private Dictionary items = new Dictionary(); + + + /// + public object Controller { get; set; } + + /// + public new IControllerMethodBinding Binding { get; set; } + + + /// + public override void Dispose() + { + foreach (var item in items.Values) + (item as IDisposable)?.Dispose(); + + base.Dispose(); + } + + + /// + public void Store(string key, object value) + { + items.Add(key, value); + } + + + /// + public bool Get(string key, out T value) where T : class + { + if (!items.TryGetValue(key, out var objectValue)) + { + value = default(T); + return false; + } + + value = (T)objectValue; + return true; + } + } +} diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs new file mode 100644 index 0000000..b2b8986 --- /dev/null +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -0,0 +1,77 @@ +using System; +using System.Reflection; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Default +{ + /// + /// + /// Binding implementation for controller methods. Do not instantiate this class yourself, + /// instead use the ITapetiConfigBuilder RegisterController / RegisterAllControllers extension + /// methods. + /// + public class ControllerMethodBinding : IBinding + { + private readonly Type controller; + private readonly MethodInfo method; + private readonly QueueInfo queueInfo; + + + /// + public string QueueName { get; private set; } + + + /// + public ControllerMethodBinding(Type controller, MethodInfo method, QueueInfo queueInfo) + { + this.controller = controller; + this.method = method; + this.queueInfo = queueInfo; + } + + + /// + public Task Apply(IBindingTarget target) + { + // TODO ControllerMethodBinding + throw new NotImplementedException(); + } + + + /// + public bool Accept(Type messageClass) + { + throw new NotImplementedException(); + } + + /// + public Task Invoke(IMessageContext context) + { + throw new NotImplementedException(); + } + + + /// + /// + /// + public class QueueInfo + { + /// + /// Whether the queue is dynamic or durable. + /// + public bool Dynamic { get; set; } + + /// + /// The name of the durable queue, or optional prefix of the dynamic queue. + /// + public string Name { get; set; } + + + /// + /// Determines if the QueueInfo properties contain a valid combination. + /// + public bool IsValid => Dynamic|| !string.IsNullOrEmpty(Name); + } + } +} diff --git a/Tapeti/Default/DependencyResolverBinding.cs b/Tapeti/Default/DependencyResolverBinding.cs index f1d61bb..8eb3b9a 100644 --- a/Tapeti/Default/DependencyResolverBinding.cs +++ b/Tapeti/Default/DependencyResolverBinding.cs @@ -4,14 +4,20 @@ using Tapeti.Config; namespace Tapeti.Default { - public class DependencyResolverBinding : IBindingMiddleware + /// + /// + /// Attempts to resolve any unhandled parameters to Controller methods using the IoC container. + /// This middleware is included by default in the standard TapetiConfig. + /// + public class DependencyResolverBinding : IControllerBindingMiddleware { - public void Handle(IBindingContext context, Action next) + /// + public void Handle(IControllerBindingContext context, Action next) { next(); foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType.IsClass)) - parameter.SetBinding(messageContext => messageContext.DependencyResolver.Resolve(parameter.Info.ParameterType)); + parameter.SetBinding(messageContext => messageContext.Config.DependencyResolver.Resolve(parameter.Info.ParameterType)); } } } diff --git a/Tapeti/Default/FallbackStringEnumConverter.cs b/Tapeti/Default/FallbackStringEnumConverter.cs index d4098c3..9801f6c 100644 --- a/Tapeti/Default/FallbackStringEnumConverter.cs +++ b/Tapeti/Default/FallbackStringEnumConverter.cs @@ -4,11 +4,11 @@ using Newtonsoft.Json; namespace Tapeti.Default { + /// /// - /// Converts an to and from its name string value. If an unknown string value is encountered + /// Converts an to and from its name string value. If an unknown string value is encountered /// it will translate to 0xDEADBEEF (-559038737) so it can be gracefully handled. /// If you copy this value as-is to another message and try to send it, this converter will throw an exception. - /// /// This converter is far simpler than the default StringEnumConverter, it assumes both sides use the same /// enum and therefore skips the naming strategy. /// @@ -17,12 +17,14 @@ namespace Tapeti.Default private readonly int invalidEnumValue; + /// public FallbackStringEnumConverter() { unchecked { invalidEnumValue = (int)0xDEADBEEF; } } + /// public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) { if (value == null) @@ -39,6 +41,7 @@ namespace Tapeti.Default } + /// public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) { var isNullable = IsNullableType(objectType); @@ -72,6 +75,7 @@ namespace Tapeti.Default } + /// public override bool CanConvert(Type objectType) { var actualType = IsNullableType(objectType) ? Nullable.GetUnderlyingType(objectType) : objectType; diff --git a/Tapeti/Default/JsonMessageSerializer.cs b/Tapeti/Default/JsonMessageSerializer.cs index 9cee002..e15a4d3 100644 --- a/Tapeti/Default/JsonMessageSerializer.cs +++ b/Tapeti/Default/JsonMessageSerializer.cs @@ -1,22 +1,27 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Text; using Newtonsoft.Json; -using RabbitMQ.Client; +using Tapeti.Config; namespace Tapeti.Default { + /// + /// + /// IMessageSerializer implementation for JSON encoding and decoding using Newtonsoft.Json. + /// public class JsonMessageSerializer : IMessageSerializer { - protected const string ContentType = "application/json"; - protected const string ClassTypeHeader = "classType"; + private const string ContentType = "application/json"; + private const string ClassTypeHeader = "classType"; private readonly ConcurrentDictionary deserializedTypeNames = new ConcurrentDictionary(); private readonly ConcurrentDictionary serializedTypeNames = new ConcurrentDictionary(); private readonly JsonSerializerSettings serializerSettings; + + /// public JsonMessageSerializer() { serializerSettings = new JsonSerializerSettings @@ -28,35 +33,41 @@ namespace Tapeti.Default } - public byte[] Serialize(object message, IBasicProperties properties) + /// + public byte[] Serialize(object message, IMessageProperties properties) { - if (properties.Headers == null) - properties.Headers = new Dictionary(); - var typeName = serializedTypeNames.GetOrAdd(message.GetType(), SerializeTypeName); - properties.Headers.Add(ClassTypeHeader, Encoding.UTF8.GetBytes(typeName)); + properties.SetHeader(ClassTypeHeader, typeName); properties.ContentType = ContentType; return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message, serializerSettings)); } - public object Deserialize(byte[] body, IBasicProperties properties) + /// + public object Deserialize(byte[] body, IMessageProperties properties) { if (properties.ContentType == null || !properties.ContentType.Equals(ContentType)) throw new ArgumentException($"content_type must be {ContentType}"); - if (properties.Headers == null || !properties.Headers.TryGetValue(ClassTypeHeader, out var typeName)) + var typeName = properties.GetHeader(ClassTypeHeader); + if (string.IsNullOrEmpty(typeName)) throw new ArgumentException($"{ClassTypeHeader} header not present"); - var messageType = deserializedTypeNames.GetOrAdd(Encoding.UTF8.GetString((byte[])typeName), DeserializeTypeName); + var messageType = deserializedTypeNames.GetOrAdd(typeName, DeserializeTypeName); return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body), messageType, serializerSettings); } - public virtual Type DeserializeTypeName(string typeName) + /// + /// Resolves a Type based on the serialized type name. + /// + /// The type name in the format FullNamespace.ClassName:AssemblyName + /// The resolved Type + /// If the format is unrecognized or the Type could not be resolved + protected virtual Type DeserializeTypeName(string typeName) { var parts = typeName.Split(':'); if (parts.Length != 2) @@ -69,7 +80,14 @@ namespace Tapeti.Default return type; } - public virtual string SerializeTypeName(Type type) + + /// + /// Serializes a Type into a string representation. + /// + /// The type to serialize + /// The type name in the format FullNamespace.ClassName:AssemblyName + /// If the serialized type name results in the AMQP limit of 255 characters to be exceeded + protected virtual string SerializeTypeName(Type type) { var typeName = type.FullName + ":" + type.Assembly.GetName().Name; if (typeName.Length > 255) diff --git a/Tapeti/Default/MessageBinding.cs b/Tapeti/Default/MessageBinding.cs index 3f86b21..34ad212 100644 --- a/Tapeti/Default/MessageBinding.cs +++ b/Tapeti/Default/MessageBinding.cs @@ -3,9 +3,15 @@ using Tapeti.Config; namespace Tapeti.Default { - public class MessageBinding : IBindingMiddleware + /// + /// + /// Gets the message class from the first parameter of a controller method. + /// This middleware is included by default in the standard TapetiConfig. + /// + public class MessageBinding : IControllerBindingMiddleware { - public void Handle(IBindingContext context, Action next) + /// + public void Handle(IControllerBindingContext context, Action next) { if (context.Parameters.Count == 0) throw new TopologyConfigurationException($"First parameter of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class"); @@ -15,7 +21,7 @@ namespace Tapeti.Default throw new TopologyConfigurationException($"First parameter {parameter.Info.Name} of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class"); parameter.SetBinding(messageContext => messageContext.Message); - context.MessageClass = parameter.Info.ParameterType; + context.SetMessageClass(parameter.Info.ParameterType); next(); } diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs index 77486dc..8f0b8b6 100644 --- a/Tapeti/Default/MessageContext.cs +++ b/Tapeti/Default/MessageContext.cs @@ -1,172 +1,34 @@ -using System; -using System.Collections; -using System.Collections.Generic; -using RabbitMQ.Client; -using Tapeti.Config; -using System.Linq; +using Tapeti.Config; namespace Tapeti.Default { + /// public class MessageContext : IMessageContext { - public IDependencyResolver DependencyResolver { get; set; } + /// + public ITapetiConfig Config { get; set; } - public object Controller { get; set; } + /// + public string Queue { get; set; } + + /// + public string Exchange { get; set; } + + /// + public string RoutingKey { get; set; } + + /// + public object Message { get; set; } + + /// + public IMessageProperties Properties { get; set; } + + /// public IBinding Binding { get; set; } - public string Queue { get; set; } - public string RoutingKey { get; set; } - public object Message { get; set; } - public IBasicProperties Properties { get; set; } - - public IDictionary Items { get; } - - internal Action UseNestedContext; - internal Action OnContextDisposed; - - public MessageContext() + /// + public virtual void Dispose() { - Items = new Dictionary(); - } - - private MessageContext(MessageContext outerContext) - { - DependencyResolver = outerContext.DependencyResolver; - - Controller = outerContext.Controller; - Binding = outerContext.Binding; - - Queue = outerContext.Queue; - RoutingKey = outerContext.RoutingKey; - Message = outerContext.Message; - Properties = outerContext.Properties; - - Items = new DeferingDictionary(outerContext.Items); - } - - public void Dispose() - { - var items = (Items as DeferingDictionary)?.MyState ?? Items; - - foreach (var value in items.Values) - (value as IDisposable)?.Dispose(); - - OnContextDisposed?.Invoke(this); - } - - public IMessageContext SetupNestedContext() - { - if (UseNestedContext == null) - throw new NotSupportedException("This context does not support creating nested contexts"); - - var nested = new MessageContext(this); - - UseNestedContext(nested); - - return nested; - } - - private class DeferingDictionary : IDictionary - { - private readonly IDictionary myState; - private readonly IDictionary deferee; - - public DeferingDictionary(IDictionary deferee) - { - myState = new Dictionary(); - this.deferee = deferee; - } - - public IDictionary MyState => myState; - - object IDictionary.this[string key] - { - get => myState.ContainsKey(key) ? myState[key] : deferee[key]; - - set - { - if (deferee.ContainsKey(key)) - throw new InvalidOperationException("Cannot hide an item set in an outer context."); - - myState[key] = value; - } - } - - int ICollection>.Count => myState.Count + deferee.Count; - bool ICollection>.IsReadOnly => false; - ICollection IDictionary.Keys => myState.Keys.Concat(deferee.Keys).ToList().AsReadOnly(); - ICollection IDictionary.Values => myState.Values.Concat(deferee.Values).ToList().AsReadOnly(); - - void ICollection>.Add(KeyValuePair item) - { - if (deferee.ContainsKey(item.Key)) - throw new InvalidOperationException("Cannot hide an item set in an outer context."); - - myState.Add(item); - } - - void IDictionary.Add(string key, object value) - { - if (deferee.ContainsKey(key)) - throw new InvalidOperationException("Cannot hide an item set in an outer context."); - - myState.Add(key, value); - } - - void ICollection>.Clear() - { - throw new InvalidOperationException("Cannot influence the items in an outer context."); - } - - bool ICollection>.Contains(KeyValuePair item) - { - return myState.Contains(item) || deferee.Contains(item); - } - - bool IDictionary.ContainsKey(string key) - { - return myState.ContainsKey(key) || deferee.ContainsKey(key); - } - - void ICollection>.CopyTo(KeyValuePair[] array, int arrayIndex) - { - foreach(var item in myState.Concat(deferee)) - { - array[arrayIndex++] = item; - } - } - - IEnumerator IEnumerable.GetEnumerator() - { - return (IEnumerator)myState.Concat(deferee); - } - - IEnumerator> IEnumerable>.GetEnumerator() - { - return (IEnumerator < KeyValuePair < string, object>> )myState.Concat(deferee); - } - - bool ICollection>.Remove(KeyValuePair item) - { - if (deferee.ContainsKey(item.Key)) - throw new InvalidOperationException("Cannot remove an item set in an outer context."); - - return myState.Remove(item); - } - - bool IDictionary.Remove(string key) - { - if (deferee.ContainsKey(key)) - throw new InvalidOperationException("Cannot remove an item set in an outer context."); - - return myState.Remove(key); - } - - bool IDictionary.TryGetValue(string key, out object value) - { - return myState.TryGetValue(key, out value) - || deferee.TryGetValue(key, out value); - } } } } diff --git a/Tapeti/Default/MessageProperties.cs b/Tapeti/Default/MessageProperties.cs new file mode 100644 index 0000000..64b2eb2 --- /dev/null +++ b/Tapeti/Default/MessageProperties.cs @@ -0,0 +1,77 @@ +using System; +using System.Collections.Generic; +using Tapeti.Config; + +namespace Tapeti.Default +{ + /// + /// + /// IMessagePropertiesReader implementation for providing properties manually + /// + public class MessageProperties : IMessageProperties + { + private readonly Dictionary headers = new Dictionary(); + + + /// + public string ContentType { get; set; } + + /// + public string CorrelationId { get; set; } + + /// + public string ReplyTo { get; set; } + + /// + public bool? Persistent { get; set; } + + /// + public DateTime? Timestamp { get; set; } + + + /// + public MessageProperties() + { + } + + + /// + public MessageProperties(IMessageProperties source) + { + if (source == null) + return; + + ContentType = source.ContentType; + CorrelationId = source.CorrelationId; + ReplyTo = source.ReplyTo; + Persistent = source.Persistent; + Timestamp = source.Timestamp; + + headers.Clear(); + foreach (var pair in source.GetHeaders()) + SetHeader(pair.Key, pair.Value); + } + + + /// + public void SetHeader(string name, string value) + { + if (headers.ContainsKey(name)) + headers[name] = value; + else + headers.Add(name, value); + } + + /// + public string GetHeader(string name) + { + return headers.TryGetValue(name, out var value) ? value : null; + } + + /// + public IEnumerable> GetHeaders() + { + return headers; + } + } +} diff --git a/Tapeti/Default/PublishResultBinding.cs b/Tapeti/Default/PublishResultBinding.cs index 76542c3..933302a 100644 --- a/Tapeti/Default/PublishResultBinding.cs +++ b/Tapeti/Default/PublishResultBinding.cs @@ -2,16 +2,20 @@ using System.Diagnostics; using System.Reflection; using System.Threading.Tasks; -using RabbitMQ.Client.Framing; using Tapeti.Annotations; using Tapeti.Config; using Tapeti.Helpers; namespace Tapeti.Default { - public class PublishResultBinding : IBindingMiddleware + /// + /// + /// Attempts to publish a return value for Controller methods as a response to the incoming message. + /// + public class PublishResultBinding : IControllerBindingMiddleware { - public void Handle(IBindingContext context, Action next) + /// + public void Handle(IControllerBindingContext context, Action next) { next(); @@ -60,18 +64,15 @@ namespace Tapeti.Default if (message == null) throw new ArgumentException("Return value of a request message handler must not be null"); - var publisher = (IInternalPublisher)messageContext.DependencyResolver.Resolve(); - var properties = new BasicProperties(); + var publisher = (IInternalPublisher)messageContext.Config.DependencyResolver.Resolve(); + var properties = new MessageProperties + { + CorrelationId = messageContext.Properties.CorrelationId + }; - // Only set the property if it's not null, otherwise a string reference exception can occur: - // http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html - if (messageContext.Properties.IsCorrelationIdPresent()) - properties.CorrelationId = messageContext.Properties.CorrelationId; - - if (messageContext.Properties.IsReplyToPresent()) - return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, messageContext.Properties.Persistent); - - return publisher.Publish(message, properties, false); + return !string.IsNullOrEmpty(messageContext.Properties.ReplyTo) + ? publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, messageContext.Properties.Persistent.GetValueOrDefault(true)) + : publisher.Publish(message, properties, false); } } } diff --git a/Tapeti/Default/RabbitMQMessageProperties.cs b/Tapeti/Default/RabbitMQMessageProperties.cs new file mode 100644 index 0000000..6de9719 --- /dev/null +++ b/Tapeti/Default/RabbitMQMessageProperties.cs @@ -0,0 +1,119 @@ +using System; +using System.Collections.Generic; +using System.Text; +using RabbitMQ.Client; +using Tapeti.Config; + +namespace Tapeti.Default +{ + /// + /// + /// Wrapper for RabbitMQ Client's IBasicProperties + /// + public class RabbitMQMessageProperties : IMessageProperties + { + public IBasicProperties BasicProperties { get; } + + + /// + public string ContentType + { + get => BasicProperties.IsContentTypePresent() ? BasicProperties.ContentType : null; + set { if (!string.IsNullOrEmpty(value)) BasicProperties.ContentType = value; else BasicProperties.ClearContentType(); } + } + + /// + public string CorrelationId + { + get => BasicProperties.IsCorrelationIdPresent() ? BasicProperties.CorrelationId : null; + set { if (!string.IsNullOrEmpty(value)) BasicProperties.CorrelationId = value; else BasicProperties.ClearCorrelationId(); } + } + + /// + public string ReplyTo + { + get => BasicProperties.IsReplyToPresent() ? BasicProperties.ReplyTo : null; + set { if (!string.IsNullOrEmpty(value)) BasicProperties.ReplyTo = value; else BasicProperties.ClearReplyTo(); } + } + + /// + public bool? Persistent + { + get => BasicProperties.Persistent; + set { if (value.HasValue) BasicProperties.Persistent = value.Value; else BasicProperties.ClearDeliveryMode(); } + } + + /// + public DateTime? Timestamp + { + get => DateTimeOffset.FromUnixTimeSeconds(BasicProperties.Timestamp.UnixTime).UtcDateTime; + set + { + if (value.HasValue) + BasicProperties.Timestamp = new AmqpTimestamp(new DateTimeOffset(value.Value.ToUniversalTime()).ToUnixTimeSeconds()); + else + BasicProperties.ClearTimestamp(); + } + } + + + /// + public RabbitMQMessageProperties(IBasicProperties BasicProperties) + { + this.BasicProperties = BasicProperties; + } + + + /// + public RabbitMQMessageProperties(IBasicProperties BasicProperties, IMessageProperties source) + { + this.BasicProperties = BasicProperties; + if (source == null) + return; + + ContentType = source.ContentType; + CorrelationId = source.CorrelationId; + ReplyTo = source.ReplyTo; + Persistent = source.Persistent; + Timestamp = source.Timestamp; + + BasicProperties.Headers = null; + foreach (var pair in source.GetHeaders()) + SetHeader(pair.Key, pair.Value); + } + + + /// + public void SetHeader(string name, string value) + { + if (BasicProperties.Headers == null) + BasicProperties.Headers = new Dictionary(); + + if (BasicProperties.Headers.ContainsKey(name)) + BasicProperties.Headers[name] = Encoding.UTF8.GetBytes(value); + else + BasicProperties.Headers.Add(name, Encoding.UTF8.GetBytes(value)); + } + + + /// + public string GetHeader(string name) + { + if (BasicProperties.Headers == null) + return null; + + return BasicProperties.Headers.TryGetValue(name, out var value) ? Encoding.UTF8.GetString((byte[])value) : null; + } + + + /// + public IEnumerable> GetHeaders() + { + if (BasicProperties.Headers == null) + yield break; + + foreach (var pair in BasicProperties.Headers) + yield return new KeyValuePair(pair.Key, Encoding.UTF8.GetString((byte[])pair.Value)); + } + } +} diff --git a/Tapeti/Helpers/ConnectionstringParser.cs b/Tapeti/Helpers/ConnectionstringParser.cs index bbda0d9..57422da 100644 --- a/Tapeti/Helpers/ConnectionstringParser.cs +++ b/Tapeti/Helpers/ConnectionstringParser.cs @@ -2,6 +2,10 @@ namespace Tapeti.Helpers { + /// + /// Helper class to construct a TapetiConnectionParams instance based on the + /// ConnectionString syntax as used by EasyNetQ. + /// public class ConnectionStringParser { private readonly TapetiConnectionParams result = new TapetiConnectionParams(); @@ -10,6 +14,10 @@ namespace Tapeti.Helpers private int pos = -1; private char current = '\0'; + /// + /// Parses an EasyNetQ-compatible ConnectionString into a TapetiConnectionParams instance. + /// + /// public static TapetiConnectionParams Parse(string connectionstring) { return new ConnectionStringParser(connectionstring).result; @@ -106,7 +114,9 @@ namespace Tapeti.Helpers private void SetValue(string key, string value) { - switch (key.ToLowerInvariant()) { + // ReSharper disable once SwitchStatementMissingSomeCases - by design, don't fail on unknown properties + switch (key.ToLowerInvariant()) + { case "hostname": result.HostName = value; break; case "port": result.Port = int.Parse(value); break; case "virtualhost": result.VirtualHost = value; break; diff --git a/Tapeti/IConnection.cs b/Tapeti/IConnection.cs index 5993f0f..364efd8 100644 --- a/Tapeti/IConnection.cs +++ b/Tapeti/IConnection.cs @@ -5,8 +5,76 @@ using System.Threading.Tasks; namespace Tapeti { + /// + /// Contains information about the reason for a lost connection. + /// + public class DisconnectedEventArgs + { + /// + /// The ReplyCode as indicated by the client library + /// + public ushort ReplyCode; + + /// + /// The ReplyText as indicated by the client library + /// + public string ReplyText; + } + + + /// + public delegate void DisconnectedEventHandler(object sender, DisconnectedEventArgs e); + + + /// + /// + /// Represents a connection to a RabbitMQ server + /// public interface IConnection : IDisposable { - Task Subscribe(); + /// + /// Creates a subscriber to consume messages from the bound queues. + /// + /// If true, the subscriber will start consuming messages immediately. If false, the queues will be + /// declared but no messages will be consumed yet. Call Resume on the returned ISubscriber to start consuming messages. + Task Subscribe(bool startConsuming = true); + + + /// + /// Synchronous version of Subscribe. + /// + /// If true, the subscriber will start consuming messages immediately. If false, the queues will be + /// declared but no messages will be consumed yet. Call Resume on the returned ISubscriber to start consuming messages. + ISubscriber SubscribeSync(bool startConsuming = true); + + + /// + /// Returns an IPublisher implementation for the current connection. + /// + /// + IPublisher GetPublisher(); + + + /// + /// Closes the connection to RabbitMQ. + /// + Task Close(); + + + /// + /// Fired when a connection to RabbitMQ has been established. + /// + event EventHandler Connected; + + /// + /// Fired when the connection to RabbitMQ has been lost. + /// + event DisconnectedEventHandler Disconnected; + + /// + /// Fired when the connection to RabbitMQ has been recovered after an unexpected disconnect. + /// + event EventHandler Reconnected; + } } diff --git a/Tapeti/IConsumer.cs b/Tapeti/IConsumer.cs new file mode 100644 index 0000000..f8be17a --- /dev/null +++ b/Tapeti/IConsumer.cs @@ -0,0 +1,21 @@ +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti +{ + /// + /// Processes incoming messages. + /// + public interface IConsumer + { + /// + /// + /// + /// The exchange from which the message originated + /// The routing key the message was sent with + /// Metadata included in the message + /// The raw body of the message + /// + Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body); + } +} diff --git a/Tapeti/IDependencyResolver.cs b/Tapeti/IDependencyResolver.cs index f7a67eb..1862aa4 100644 --- a/Tapeti/IDependencyResolver.cs +++ b/Tapeti/IDependencyResolver.cs @@ -2,6 +2,9 @@ namespace Tapeti { + /// + /// Wrapper interface for an IoC container to allow dependency injection in Tapeti. + /// public interface IDependencyResolver { T Resolve() where T : class; @@ -9,6 +12,10 @@ namespace Tapeti } + /// + /// Allows registering controller classes into the IoC container. Also registers default implementations, + /// so that the calling application may override these. + /// public interface IDependencyContainer : IDependencyResolver { void RegisterDefault() where TService : class where TImplementation : class, TService; diff --git a/Tapeti/IMessageSerializer.cs b/Tapeti/IMessageSerializer.cs index ada89c6..b2bbfc4 100644 --- a/Tapeti/IMessageSerializer.cs +++ b/Tapeti/IMessageSerializer.cs @@ -1,10 +1,26 @@ -using RabbitMQ.Client; +using Tapeti.Config; namespace Tapeti { + /// + /// Provides serialization and deserialization for messages. + /// public interface IMessageSerializer { - byte[] Serialize(object message, IBasicProperties properties); - object Deserialize(byte[] body, IBasicProperties properties); + /// + /// Serialize a message object instance to a byte array. + /// + /// An instance of a message class + /// Writable access to the message properties which will be sent along with the message + /// The encoded message + byte[] Serialize(object message, IMessageProperties properties); + + /// + /// Deserializes a previously serialized message. + /// + /// The encoded message + /// The properties as sent along with the message + /// A decoded instance of the message + object Deserialize(byte[] body, IMessageProperties properties); } } diff --git a/Tapeti/IPublisher.cs b/Tapeti/IPublisher.cs index c55f47c..3a02ac3 100644 --- a/Tapeti/IPublisher.cs +++ b/Tapeti/IPublisher.cs @@ -1,19 +1,50 @@ using System.Threading.Tasks; -using RabbitMQ.Client; +using Tapeti.Config; + +// ReSharper disable once UnusedMember.Global namespace Tapeti { - // Note: Tapeti assumes every implementation of IPublisher can also be cast to an IInternalPublisher. - // The distinction is made on purpose to trigger code-smells in non-Tapeti code when casting. + /// + /// Allows publishing of messages. + /// public interface IPublisher { + /// + /// Publish the specified message. Transport details are determined by the Tapeti configuration. + /// + /// The message to send Task Publish(object message); } + /// + /// + /// Low-level publisher for Tapeti internal use. + /// + /// + /// Tapeti assumes every implementation of IPublisher can also be cast to an IInternalPublisher. + /// The distinction is made on purpose to trigger code-smells in non-Tapeti code when casting. + /// public interface IInternalPublisher : IPublisher { - Task Publish(object message, IBasicProperties properties, bool mandatory); - Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory); + /// + /// Publishes a message. The exchange and routing key are determined by the registered strategies. + /// + /// An instance of a message class + /// Metadata to include in the message + /// If true, an exception will be raised if the message can not be delivered to at least one queue + Task Publish(object message, IMessageProperties properties, bool mandatory); + + + /// + /// Publishes a message directly to a queue. The exchange and routing key are not used. + /// + /// An instance of a message class + /// The name of the queue to send the message to + /// Metadata to include in the message + /// If true, an exception will be raised if the message can not be delivered to the queue + /// + Task PublishDirect(object message, string queueName, IMessageProperties properties, bool mandatory); } } diff --git a/Tapeti/ISubscriber.cs b/Tapeti/ISubscriber.cs index 06a76da..6b54bbf 100644 --- a/Tapeti/ISubscriber.cs +++ b/Tapeti/ISubscriber.cs @@ -2,8 +2,14 @@ namespace Tapeti { + /// + /// Manages subscriptions to queues as configured by the bindings. + /// public interface ISubscriber { + /// + /// Starts consuming from the subscribed queues if not already started. + /// Task Resume(); } } diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index d4ecad3..95868f0 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -5,6 +5,10 @@ true + + 1701;1702 + + diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 9291798..9247f11 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -2,8 +2,6 @@ using System.Collections.Generic; using System.Linq; using System.Reflection; -using System.Threading.Tasks; -using Tapeti.Annotations; using Tapeti.Config; using Tapeti.Default; using Tapeti.Helpers; @@ -12,198 +10,169 @@ using Tapeti.Helpers; namespace Tapeti { - public class TopologyConfigurationException : Exception + /// + /// + /// Default implementation of the Tapeti config builder. + /// Automatically registers the default middleware for injecting the message parameter and handling the return value. + /// + public class TapetiConfig : ITapetiConfigBuilder, ITapetiConfigBuilderAccess { - public TopologyConfigurationException(string message) : base(message) { } - } - - public delegate Task MessageHandlerFunc(IMessageContext context, object message); + private Config config; + private readonly List bindingMiddleware = new List(); - public class TapetiConfig - { - private readonly Dictionary> staticRegistrations = new Dictionary>(); - private readonly Dictionary>> dynamicRegistrations = new Dictionary>>(); - private readonly List uniqueRegistrations = new List(); - - private readonly List customBindings = new List(); - private readonly List bindingMiddleware = new List(); - private readonly List messageMiddleware = new List(); - private readonly List cleanupMiddleware = new List(); - private readonly List publishMiddleware = new List(); - - private readonly IDependencyResolver dependencyResolver; - - private bool usePublisherConfirms = true; + /// + public IDependencyResolver DependencyResolver => GetConfig().DependencyResolver; + /// + /// Instantiates a new Tapeti config builder. + /// + /// A wrapper implementation for an IoC container to allow dependency injection public TapetiConfig(IDependencyResolver dependencyResolver) { - this.dependencyResolver = dependencyResolver; + config = new Config(dependencyResolver); + } + + + /// + public ITapetiConfig Build() + { + if (config == null) + throw new InvalidOperationException("TapetiConfig.Build must only be called once"); Use(new DependencyResolverBinding()); - Use(new MessageBinding()); Use(new PublishResultBinding()); - } - - public IConfig Build() - { - RegisterCustomBindings(); + // Registered last so it runs first and the MessageClass is known to other middleware + Use(new MessageBinding()); RegisterDefaults(); - - var queues = new List(); - queues.AddRange(staticRegistrations.Select(qb => new Queue(new QueueInfo { Dynamic = false, Name = qb.Key }, qb.Value))); + (config.DependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); - // We want to ensure each queue only has unique messages classes. This means we can requeue - // without the side-effect of calling other handlers for the same message class again as well. - // - // Since I had trouble deciphering this code after a year, here's an overview of how it achieves this grouping - // and how the bindingIndex is relevant: - // - // dynamicRegistrations: - // Key (prefix) - // "" - // Key (message class) Value (list of bindings) - // A binding1, binding2, binding3 - // B binding4 - // "prefix" - // A binding5, binding6 - // - // By combining all bindings with the same index, per prefix, the following queues will be registered: - // - // Prefix Bindings - // "" binding1 (message A), binding4 (message B) - // "" binding2 (message A) - // "" binding3 (message A) - // "prefix" binding5 (message A) - // "prefix" binding6 (message A) - // - foreach (var prefixGroup in dynamicRegistrations) - { - var dynamicBindings = new List>(); + var outputConfig = config; + config = null; - foreach (var bindings in prefixGroup.Value.Values) - { - while (dynamicBindings.Count < bindings.Count) - dynamicBindings.Add(new List()); - - for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++) - dynamicBindings[bindingIndex].Add(bindings[bindingIndex]); - } - - queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl))); - } - - queues.AddRange(uniqueRegistrations.Select(b => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(b.QueueInfo.Name) }, new []{b}))); - - - var config = new Config(queues) - { - DependencyResolver = dependencyResolver, - MessageMiddleware = messageMiddleware, - CleanupMiddleware = cleanupMiddleware, - PublishMiddleware = publishMiddleware, - - UsePublisherConfirms = usePublisherConfirms - }; - - (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); - - return config; + outputConfig.Lock(); + return outputConfig; } - public TapetiConfig Use(IBindingMiddleware handler) + /// + public ITapetiConfigBuilder Use(IControllerBindingMiddleware handler) { bindingMiddleware.Add(handler); return this; } - public TapetiConfig Use(IMessageMiddleware handler) + /// + public ITapetiConfigBuilder Use(IMessageMiddleware handler) { - messageMiddleware.Add(handler); + GetConfig().Use(handler); return this; } - public TapetiConfig Use(ICleanupMiddleware handler) + /// + public ITapetiConfigBuilder Use(IPublishMiddleware handler) { - cleanupMiddleware.Add(handler); + GetConfig().Use(handler); return this; } - public TapetiConfig Use(IPublishMiddleware handler) + /// + public ITapetiConfigBuilder Use(ITapetiExtension extension) { - publishMiddleware.Add(handler); - return this; - } - - - public TapetiConfig Use(ITapetiExtension extension) - { - if (dependencyResolver is IDependencyContainer container) + if (DependencyResolver is IDependencyContainer container) extension.RegisterDefaults(container); - var middlewareBundle = extension.GetMiddleware(dependencyResolver); + var configInstance = GetConfig(); - if (extension is ITapetiExtentionBinding extentionBindings) - customBindings.AddRange(extentionBindings.GetBindings(dependencyResolver)); - - // ReSharper disable once InvertIf + var middlewareBundle = extension.GetMiddleware(DependencyResolver); if (middlewareBundle != null) { foreach (var middleware in middlewareBundle) { - // ReSharper disable once CanBeReplacedWithTryCastAndCheckForNull - if (middleware is IBindingMiddleware bindingExtension) - Use(bindingExtension); - else if (middleware is IMessageMiddleware messageExtension) - Use(messageExtension); - else if (middleware is ICleanupMiddleware cleanupExtension) - Use(cleanupExtension); - else if (middleware is IPublishMiddleware publishExtension) - Use(publishExtension); - else - throw new ArgumentException($"Unsupported middleware implementation: {(middleware == null ? "null" : middleware.GetType().Name)}"); + switch (middleware) + { + case IControllerBindingMiddleware bindingExtension: + Use(bindingExtension); + break; + + case IMessageMiddleware messageExtension: + configInstance.Use(messageExtension); + break; + + case IPublishMiddleware publishExtension: + configInstance.Use(publishExtension); + break; + + default: + throw new ArgumentException( + $"Unsupported middleware implementation: {(middleware == null ? "null" : middleware.GetType().Name)}"); + } } } + var bindingBundle = (extension as ITapetiExtensionBinding)?.GetBindings(DependencyResolver); + if (bindingBundle == null) + return this; + + foreach (var binding in bindingBundle) + config.RegisterBinding(binding); + return this; } - - /// - /// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds, - /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may - /// result in never-ending flows. Only disable if you can accept those consequences. - /// - public TapetiConfig DisablePublisherConfirms() + + /// + public void RegisterBinding(IBinding binding) { - usePublisherConfirms = false; + GetConfig().RegisterBinding(binding); + } + + + /// + public ITapetiConfigBuilder DisablePublisherConfirms() + { + GetConfig().SetPublisherConfirms(false); + return this; + } + + + /// + public ITapetiConfigBuilder SetPublisherConfirms(bool enabled) + { + GetConfig().SetPublisherConfirms(enabled); + return this; + } + + + /// + public ITapetiConfigBuilder EnableDeclareDurableQueues() + { + GetConfig().SetDeclareDurableQueues(true); + return this; + } + + + /// + public ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled) + { + GetConfig().SetDeclareDurableQueues(enabled); return this; } /// - /// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds, - /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may - /// result in never-ending flows. Only disable if you accept those consequences. + /// Registers the default implementation of various Tapeti interfaces into the IoC container. /// - public TapetiConfig SetPublisherConfirms(bool enabled) + protected void RegisterDefaults() { - usePublisherConfirms = enabled; - return this; - } - - - public void RegisterDefaults() - { - if (!(dependencyResolver is IDependencyContainer container)) + if (!(DependencyResolver is IDependencyContainer container)) return; if (ConsoleHelper.IsAvailable()) @@ -218,85 +187,133 @@ namespace Tapeti } - public TapetiConfig RegisterController(Type controller) + /// + public void ApplyBindingMiddleware(IControllerBindingContext context, Action lastHandler) { - var controllerQueueInfo = GetQueueInfo(controller); + MiddlewareHelper.Go(bindingMiddleware, + (handler, next) => handler.Handle(context, next), + lastHandler); + } - if (!controller.IsInterface) - (dependencyResolver as IDependencyContainer)?.RegisterController(controller); - foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance) - .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) - .Select(m => (MethodInfo)m)) + private Config GetConfig() + { + if (config == null) + throw new InvalidOperationException("TapetiConfig can not be updated after Build"); + + return null; + } + + + /// + internal class Config : ITapetiConfig + { + private readonly ConfigFeatures features = new ConfigFeatures(); + private readonly ConfigMiddleware middleware = new ConfigMiddleware(); + private readonly ConfigBindings bindings = new ConfigBindings(); + + public IDependencyResolver DependencyResolver { get; } + public ITapetiConfigFeatues Features => features; + public ITapetiConfigMiddleware Middleware => middleware; + public ITapetiConfigBindings Bindings => bindings; + + + public Config(IDependencyResolver dependencyResolver) { - var context = new BindingContext(method); - var messageHandler = GetMessageHandler(context, method); - if (messageHandler == null) - continue; - - var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo; - if (!methodQueueInfo.IsValid) - throw new TopologyConfigurationException( - $"Method {method.Name} or controller {controller.Name} requires a queue attribute"); - - var handlerInfo = new Binding - { - Controller = controller, - Method = method, - QueueInfo = methodQueueInfo, - QueueBindingMode = context.QueueBindingMode, - MessageClass = context.MessageClass, - MessageHandler = messageHandler, - MessageMiddleware = context.MessageMiddleware, - MessageFilterMiddleware = context.MessageFilterMiddleware - }; - - if (methodQueueInfo.Dynamic.GetValueOrDefault()) - AddDynamicRegistration(handlerInfo); - else - AddStaticRegistration(handlerInfo); + DependencyResolver = dependencyResolver; } - return this; - } - - public TapetiConfig RegisterAllControllers(Assembly assembly) - { - foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(MessageControllerAttribute)))) - RegisterController(type); - - return this; - } - - - public TapetiConfig RegisterAllControllers() - { - return RegisterAllControllers(Assembly.GetEntryAssembly()); - } - - private void RegisterCustomBindings() - { - foreach (var customBinding in customBindings) + public void Lock() { - // TODO Do we need to configure additional middleware, or does this only get confused if there is no MessageClass + bindings.Lock(); + } - var binding = new CustomBinding(customBinding); - if (binding.QueueInfo.Dynamic == false) - { - AddStaticRegistration(binding); - } - else if (binding.MessageClass != null) - { - AddDynamicRegistration(binding); - } - else - { - AddUniqueRegistration(binding); - } + + public void Use(IMessageMiddleware handler) + { + middleware.Use(handler); + } + + public void Use(IPublishMiddleware handler) + { + middleware.Use(handler); + } + + + public void RegisterBinding(IBinding binding) + { + bindings.Add(binding); + } + + + public void SetPublisherConfirms(bool enabled) + { + features.PublisherConfirms = enabled; + } + + public void SetDeclareDurableQueues(bool enabled) + { + features.DeclareDurableQueues = enabled; } } + + internal class ConfigFeatures : ITapetiConfigFeatues + { + public bool PublisherConfirms { get; internal set; } = true; + public bool DeclareDurableQueues { get; internal set; } = true; + } + + + internal class ConfigMiddleware : ITapetiConfigMiddleware + { + private readonly List messageMiddleware = new List(); + private readonly List publishMiddleware = new List(); + + + public IReadOnlyList Message => messageMiddleware; + public IReadOnlyList Publish => publishMiddleware; + + + public void Use(IMessageMiddleware handler) + { + messageMiddleware.Add(handler); + } + + public void Use(IPublishMiddleware handler) + { + publishMiddleware.Add(handler); + } + } + + + internal class ConfigBindings : List, ITapetiConfigBindings + { + private Dictionary methodLookup; + + + public IControllerMethodBinding ForMethod(Delegate method) + { + return methodLookup.TryGetValue(method.Method, out var binding) ? binding : null; + } + + + public void Lock() + { + methodLookup = this + .Where(binding => binding is IControllerMethodBinding) + .Cast() + .ToDictionary(binding => binding.Method, binding => binding); + } + } + } + + + /* + public delegate Task MessageHandlerFunc(IMessageContext context, object message); + + protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method) { var allowBinding= false; @@ -408,48 +425,6 @@ namespace Tapeti } - protected void AddDynamicRegistration(IBindingQueueInfo binding) - { - var prefix = binding.QueueInfo.Name ?? ""; - - if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary> prefixRegistrations)) - { - prefixRegistrations = new Dictionary>(); - dynamicRegistrations.Add(prefix, prefixRegistrations); - } - - if (!prefixRegistrations.TryGetValue(binding.MessageClass, out List bindings)) - { - bindings = new List(); - prefixRegistrations.Add(binding.MessageClass, bindings); - } - - bindings.Add(binding); - } - - protected void AddUniqueRegistration(IBindingQueueInfo binding) - { - uniqueRegistrations.Add(binding); - } - - protected QueueInfo GetQueueInfo(MemberInfo member) - { - var dynamicQueueAttribute = member.GetCustomAttribute(); - var durableQueueAttribute = member.GetCustomAttribute(); - - if (dynamicQueueAttribute != null && durableQueueAttribute != null) - throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on {member.Name}"); - - if (dynamicQueueAttribute != null) - return new QueueInfo { Dynamic = true, Name = dynamicQueueAttribute.Prefix }; - - if (durableQueueAttribute != null) - return new QueueInfo { Dynamic = false, Name = durableQueueAttribute.Name }; - - return null; - } - - protected string GetDynamicQueueName(string prefix) { if (String.IsNullOrEmpty(prefix)) @@ -457,300 +432,6 @@ namespace Tapeti return prefix + "." + Guid.NewGuid().ToString("N"); } - - - protected class QueueInfo - { - public bool? Dynamic { get; set; } - public string Name { get; set; } - - public bool IsValid => Dynamic.HasValue || !string.IsNullOrEmpty(Name); - } - - - protected class Config : IConfig - { - public bool UsePublisherConfirms { get; set; } - - public IDependencyResolver DependencyResolver { get; set; } - public IReadOnlyList MessageMiddleware { get; set; } - public IReadOnlyList CleanupMiddleware { get; set; } - public IReadOnlyList PublishMiddleware { get; set; } - public IEnumerable Queues { get; } - - private readonly Dictionary bindingMethodLookup; - - - public Config(IEnumerable queues) - { - Queues = queues.ToList(); - - bindingMethodLookup = Queues.SelectMany(q => q.Bindings).ToDictionary(b => b.Method, b => b); - } - - - public IBinding GetBinding(Delegate method) - { - return bindingMethodLookup.TryGetValue(method.Method, out var binding) ? binding : null; - } - } - - - protected class Queue : IDynamicQueue - { - private readonly string declareQueueName; - - public bool Dynamic { get; } - public string Name { get; set; } - public IEnumerable Bindings { get; } - - - public Queue(QueueInfo queue, IEnumerable bindings) - { - declareQueueName = queue.Name; - - Dynamic = queue.Dynamic.GetValueOrDefault(); - Name = queue.Name; - Bindings = bindings; - } - - - public string GetDeclareQueueName() - { - return declareQueueName; - } - - - public void SetName(string name) - { - Name = name; - } - } - - protected interface IBindingQueueInfo : IBuildBinding - { - QueueInfo QueueInfo { get; } - } - - protected class Binding : IBindingQueueInfo - { - public Type Controller { get; set; } - public MethodInfo Method { get; set; } - public Type MessageClass { get; set; } - public string QueueName { get; set; } - public QueueBindingMode QueueBindingMode { get; set; } - - public IReadOnlyList MessageMiddleware { get; set; } - public IReadOnlyList MessageFilterMiddleware { get; set; } - - private QueueInfo queueInfo; - public QueueInfo QueueInfo - { - get => queueInfo; - set - { - QueueName = (value?.Dynamic).GetValueOrDefault() ? value?.Name : null; - queueInfo = value; - } - } - - public MessageHandlerFunc MessageHandler { get; set; } - - - public void SetQueueName(string queueName) - { - QueueName = queueName; - } - - - public bool Accept(Type messageClass) - { - return MessageClass.IsAssignableFrom(messageClass); - } - - public bool Accept(IMessageContext context, object message) - { - return message.GetType() == MessageClass; - } - - - public Task Invoke(IMessageContext context, object message) - { - return MessageHandler(context, message); - } - } - - - protected class CustomBinding : IBindingQueueInfo - { - private readonly ICustomBinding inner; - - public CustomBinding(ICustomBinding inner) - { - this.inner = inner; - - // Copy all variables to make them guaranteed readonly. - Controller = inner.Controller; - Method = inner.Method; - QueueBindingMode = inner.QueueBindingMode; - MessageClass = inner.MessageClass; - - QueueInfo = inner.StaticQueueName != null - ? new QueueInfo() - { - Dynamic = false, - Name = inner.StaticQueueName - } - : new QueueInfo() - { - Dynamic = true, - Name = inner.DynamicQueuePrefix - }; - - // Custom bindings cannot have other middleware messing with the binding. - MessageFilterMiddleware = new IMessageFilterMiddleware[0]; - MessageMiddleware = new IMessageMiddleware[0]; - } - - public Type Controller { get; } - public MethodInfo Method { get; } - public string QueueName { get; private set; } - public QueueBindingMode QueueBindingMode { get; set; } - public IReadOnlyList MessageFilterMiddleware { get; } - public IReadOnlyList MessageMiddleware { get; } - - public bool Accept(Type messageClass) - { - return inner.Accept(messageClass); - } - - public bool Accept(IMessageContext context, object message) - { - return inner.Accept(context, message); - } - - public Task Invoke(IMessageContext context, object message) - { - return inner.Invoke(context, message); - } - - public void SetQueueName(string queueName) - { - QueueName = queueName; - inner.SetQueueName(queueName); - } - - public Type MessageClass { get; } - public QueueInfo QueueInfo { get; } - } - - internal interface IBindingParameterAccess - { - ValueFactory GetBinding(); - } - - - - internal interface IBindingResultAccess - { - ResultHandler GetHandler(); - } - - - internal class BindingContext : IBindingContext - { - private List messageMiddleware; - private List messageFilterMiddleware; - - public Type MessageClass { get; set; } - - public MethodInfo Method { get; } - public IReadOnlyList Parameters { get; } - public IBindingResult Result { get; } - - public QueueBindingMode QueueBindingMode { get; set; } - - public IReadOnlyList MessageMiddleware => messageMiddleware; - public IReadOnlyList MessageFilterMiddleware => messageFilterMiddleware; - - - public BindingContext(MethodInfo method) - { - Method = method; - - Parameters = method.GetParameters().Select(p => new BindingParameter(p)).ToList(); - Result = new BindingResult(method.ReturnParameter); - } - - - public void Use(IMessageMiddleware middleware) - { - if (messageMiddleware == null) - messageMiddleware = new List(); - - messageMiddleware.Add(middleware); - } - - - public void Use(IMessageFilterMiddleware filterMiddleware) - { - if (messageFilterMiddleware == null) - messageFilterMiddleware = new List(); - - messageFilterMiddleware.Add(filterMiddleware); - } - } - - - internal class BindingParameter : IBindingParameter, IBindingParameterAccess - { - private ValueFactory binding; - - public ParameterInfo Info { get; } - public bool HasBinding => binding != null; - - - public BindingParameter(ParameterInfo parameter) - { - Info = parameter; - - } - - public ValueFactory GetBinding() - { - return binding; - } - - public void SetBinding(ValueFactory valueFactory) - { - binding = valueFactory; - } - } - - - internal class BindingResult : IBindingResult, IBindingResultAccess - { - private ResultHandler handler; - - public ParameterInfo Info { get; } - public bool HasHandler => handler != null; - - - public BindingResult(ParameterInfo parameter) - { - Info = parameter; - } - - - public ResultHandler GetHandler() - { - return handler; - } - - public void SetHandler(ResultHandler resultHandler) - { - handler = resultHandler; - } - } } + */ } diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs new file mode 100644 index 0000000..dd41f2b --- /dev/null +++ b/Tapeti/TapetiConfigControllers.cs @@ -0,0 +1,127 @@ +using System; +using System.Linq; +using System.Reflection; +using Tapeti.Annotations; +using Tapeti.Config; +using Tapeti.Default; + +// ReSharper disable UnusedMember.Global + +namespace Tapeti +{ + /// + /// + /// Thrown when an issue is detected in a controller configuration. + /// + public class TopologyConfigurationException : Exception + { + /// + public TopologyConfigurationException(string message) : base(message) { } + } + + + /// + /// Extension methods for registering message controllers. + /// + public static class TapetiConfigControllers + { + /// + /// Registers all public methods in the specified controller class as message handlers. + /// + /// + /// The controller class to register. The class and/or methods must be annotated with either the DurableQueue or DynamicQueue attribute. + public static ITapetiConfigBuilder RegisterController(this ITapetiConfigBuilder builder, Type controller) + { + var builderAccess = (ITapetiConfigBuilderAccess)builder; + + if (!controller.IsClass) + throw new ArgumentException($"Controller {controller.Name} must be a class"); + + var controllerQueueInfo = GetQueueInfo(controller); + (builderAccess.DependencyResolver as IDependencyContainer)?.RegisterController(controller); + + foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance) + .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) + .Select(m => (MethodInfo)m)) + { + // TODO create binding for method + + /* + var context = new BindingContext(method); + var messageHandler = GetMessageHandler(context, method); + if (messageHandler == null) + continue; + */ + + var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo; + if (methodQueueInfo == null || !methodQueueInfo.IsValid) + throw new TopologyConfigurationException( + $"Method {method.Name} or controller {controller.Name} requires a queue attribute"); + + /* + var handlerInfo = new Binding + { + Controller = controller, + Method = method, + QueueInfo = methodQueueInfo, + QueueBindingMode = context.QueueBindingMode, + MessageClass = context.MessageClass, + MessageHandler = messageHandler, + MessageMiddleware = context.MessageMiddleware, + MessageFilterMiddleware = context.MessageFilterMiddleware + }; + + if (methodQueueInfo.Dynamic.GetValueOrDefault()) + AddDynamicRegistration(handlerInfo); + else + AddStaticRegistration(handlerInfo); + */ + + builder.RegisterBinding(new ControllerMethodBinding(controller, method, methodQueueInfo)); + } + + return builder; + } + + + /// + /// Registers all controllers in the specified assembly which are marked with the MessageController attribute. + /// + /// + /// The assembly to scan for controllers. + public static ITapetiConfigBuilder RegisterAllControllers(this ITapetiConfigBuilder builder, Assembly assembly) + { + foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(MessageControllerAttribute)))) + RegisterController(builder, type); + + return builder; + } + + + /// + /// Registers all controllers in the entry assembly which are marked with the MessageController attribute. + /// + /// + public static ITapetiConfigBuilder RegisterAllControllers(this ITapetiConfigBuilder builder) + { + return RegisterAllControllers(builder, Assembly.GetEntryAssembly()); + } + + + private static ControllerMethodBinding.QueueInfo GetQueueInfo(MemberInfo member) + { + var dynamicQueueAttribute = member.GetCustomAttribute(); + var durableQueueAttribute = member.GetCustomAttribute(); + + if (dynamicQueueAttribute != null && durableQueueAttribute != null) + throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on controller {member.DeclaringType?.Name} method {member.Name}"); + + if (dynamicQueueAttribute != null) + return new ControllerMethodBinding.QueueInfo { Dynamic = true, Name = dynamicQueueAttribute.Prefix }; + + return durableQueueAttribute != null + ? new ControllerMethodBinding.QueueInfo { Dynamic = false, Name = durableQueueAttribute.Name } + : null; + } + } +} diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index d66f880..844249f 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -1,46 +1,68 @@ using System; -using System.Linq; using System.Threading.Tasks; using Tapeti.Config; using Tapeti.Connection; // ReSharper disable UnusedMember.Global +// TODO more separation from the actual worker / RabbitMQ Client for unit testing purposes + namespace Tapeti { - public delegate void DisconnectedEventHandler(object sender, DisconnectedEventArgs e); - - public class TapetiConnection : IDisposable + /// + /// + /// Creates a connection to RabbitMQ based on the provided Tapeti config. + /// + public class TapetiConnection : IConnection { - private readonly IConfig config; + private readonly ITapetiConfig config; + + /// + /// Specifies the hostname and credentials to use when connecting to RabbitMQ. + /// Defaults to guest on localhost. + /// + /// + /// This property must be set before first subscribing or publishing, otherwise it + /// will use the default connection parameters. + /// public TapetiConnectionParams Params { get; set; } - private readonly Lazy worker; + private readonly Lazy client; private TapetiSubscriber subscriber; - public TapetiConnection(IConfig config) + /// + /// Creates a new instance of a TapetiConnection and registers a default IPublisher + /// in the IoC container as provided in the config. + /// + /// + public TapetiConnection(ITapetiConfig config) { this.config = config; (config.DependencyResolver as IDependencyContainer)?.RegisterDefault(GetPublisher); - worker = new Lazy(() => new TapetiWorker(config) + client = new Lazy(() => new TapetiClient(config, Params ?? new TapetiConnectionParams()) { - ConnectionParams = Params ?? new TapetiConnectionParams(), ConnectionEventListener = new ConnectionEventListener(this) }); } + /// public event EventHandler Connected; + + /// public event DisconnectedEventHandler Disconnected; + + /// public event EventHandler Reconnected; + /// public async Task Subscribe(bool startConsuming = true) { if (subscriber == null) { - subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList()); - await subscriber.BindQueues(); + subscriber = new TapetiSubscriber(() => client.Value, config); + await subscriber.ApplyBindings(); } if (startConsuming) @@ -50,30 +72,35 @@ namespace Tapeti } + /// public ISubscriber SubscribeSync(bool startConsuming = true) { return Subscribe(startConsuming).Result; } + /// public IPublisher GetPublisher() { - return new TapetiPublisher(() => worker.Value); + return new TapetiPublisher(config, () => client.Value); } + /// public async Task Close() { - if (worker.IsValueCreated) - await worker.Value.Close(); + if (client.IsValueCreated) + await client.Value.Close(); } + /// public void Dispose() { Close().Wait(); } + private class ConnectionEventListener: IConnectionEventListener { private readonly TapetiConnection owner; @@ -99,25 +126,47 @@ namespace Tapeti } } + + /// + /// Called when a connection to RabbitMQ has been established. + /// protected virtual void OnConnected(EventArgs e) { - Task.Run(() => Connected?.Invoke(this, e)); + var connectedEvent = Connected; + if (connectedEvent == null) + return; + + Task.Run(() => connectedEvent.Invoke(this, e)); } + /// + /// Called when the connection to RabbitMQ has been lost. + /// protected virtual void OnReconnected(EventArgs e) { + var reconnectedEvent = Reconnected; + if (reconnectedEvent == null) + return; + Task.Run(() => { - subscriber?.RebindQueues().ContinueWith((t) => + subscriber?.ApplyBindings().ContinueWith((t) => { - Reconnected?.Invoke(this, e); + reconnectedEvent.Invoke(this, e); }); }); } + /// + /// Called when the connection to RabbitMQ has been recovered after an unexpected disconnect. + /// protected virtual void OnDisconnected(DisconnectedEventArgs e) { - Task.Run(() => Disconnected?.Invoke(this, e)); + var disconnectedEvent = Disconnected; + if (disconnectedEvent == null) + return; + + Task.Run(() => disconnectedEvent.Invoke(this, e)); } } } diff --git a/Tapeti/TapetiConnectionParams.cs b/Tapeti/TapetiConnectionParams.cs index 2c6c525..9b0414c 100644 --- a/Tapeti/TapetiConnectionParams.cs +++ b/Tapeti/TapetiConnectionParams.cs @@ -4,12 +4,34 @@ namespace Tapeti { + /// + /// + /// public class TapetiConnectionParams { + /// + /// The hostname to connect to. Defaults to localhost. + /// public string HostName { get; set; } = "localhost"; + + /// + /// The port to connect to. Defaults to 5672. + /// public int Port { get; set; } = 5672; + + /// + /// The virtual host in RabbitMQ. Defaults to /. + /// public string VirtualHost { get; set; } = "/"; + + /// + /// The username to authenticate with. Defaults to guest. + /// public string Username { get; set; } = "guest"; + + /// + /// The password to authenticate with. Defaults to guest. + /// public string Password { get; set; } = "guest"; /// @@ -20,10 +42,17 @@ namespace Tapeti public ushort PrefetchCount { get; set; } = 50; + /// public TapetiConnectionParams() { } + /// + /// Construct a new TapetiConnectionParams instance based on standard URI syntax. + /// + /// new TapetiConnectionParams(new Uri("amqp://username:password@hostname/")) + /// new TapetiConnectionParams(new Uri("amqp://username:password@hostname:5672/virtualHost")) + /// public TapetiConnectionParams(Uri uri) { HostName = uri.Host; diff --git a/Tapeti/Tasks/SingleThreadTaskQueue.cs b/Tapeti/Tasks/SingleThreadTaskQueue.cs index f22f869..acee8c7 100644 --- a/Tapeti/Tasks/SingleThreadTaskQueue.cs +++ b/Tapeti/Tasks/SingleThreadTaskQueue.cs @@ -6,6 +6,10 @@ using System.Threading.Tasks; namespace Tapeti.Tasks { + /// + /// + /// An implementation of a queue which runs tasks on a single thread. + /// public class SingleThreadTaskQueue : IDisposable { private readonly object previousTaskLock = new object(); @@ -14,6 +18,10 @@ namespace Tapeti.Tasks private readonly Lazy singleThreadScheduler = new Lazy(); + /// + /// Add the specified synchronous action to the task queue. + /// + /// public Task Add(Action action) { lock (previousTaskLock) @@ -27,6 +35,10 @@ namespace Tapeti.Tasks } + /// + /// Add the specified asynchronous method to the task queue. + /// + /// public Task Add(Func func) { lock (previousTaskLock) @@ -45,89 +57,90 @@ namespace Tapeti.Tasks } + /// public void Dispose() { if (singleThreadScheduler.IsValueCreated) singleThreadScheduler.Value.Dispose(); } - } - public class SingleThreadTaskScheduler : TaskScheduler, IDisposable - { - public override int MaximumConcurrencyLevel => 1; - - - private readonly Queue scheduledTasks = new Queue(); - private bool disposed; - - - public SingleThreadTaskScheduler() + internal class SingleThreadTaskScheduler : TaskScheduler, IDisposable { - // ReSharper disable once ObjectCreationAsStatement - fire and forget! - new Thread(WorkerThread).Start(); - } + public override int MaximumConcurrencyLevel => 1; - public void Dispose() - { - lock (scheduledTasks) + private readonly Queue scheduledTasks = new Queue(); + private bool disposed; + + + public SingleThreadTaskScheduler() { - disposed = true; - Monitor.PulseAll(scheduledTasks); + // ReSharper disable once ObjectCreationAsStatement - fire and forget! + new Thread(WorkerThread).Start(); } - } - protected override void QueueTask(Task task) - { - if (disposed) return; - - lock (scheduledTasks) + public void Dispose() { - scheduledTasks.Enqueue(task); - Monitor.Pulse(scheduledTasks); - } - } - - protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) - { - return false; - } - - - protected override IEnumerable GetScheduledTasks() - { - lock (scheduledTasks) - { - return scheduledTasks.ToList(); - } - } - - - private void WorkerThread() - { - while(true) - { - Task task; lock (scheduledTasks) { - task = WaitAndDequeueTask(); + disposed = true; + Monitor.PulseAll(scheduledTasks); } - - if (task == null) - break; - - TryExecuteTask(task); } - } - private Task WaitAndDequeueTask() - { - while (!scheduledTasks.Any() && !disposed) - Monitor.Wait(scheduledTasks); - return disposed ? null : scheduledTasks.Dequeue(); + protected override void QueueTask(Task task) + { + if (disposed) return; + + lock (scheduledTasks) + { + scheduledTasks.Enqueue(task); + Monitor.Pulse(scheduledTasks); + } + } + + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + return false; + } + + + protected override IEnumerable GetScheduledTasks() + { + lock (scheduledTasks) + { + return scheduledTasks.ToList(); + } + } + + + private void WorkerThread() + { + while (true) + { + Task task; + lock (scheduledTasks) + { + task = WaitAndDequeueTask(); + } + + if (task == null) + break; + + TryExecuteTask(task); + } + } + + private Task WaitAndDequeueTask() + { + while (!scheduledTasks.Any() && !disposed) + Monitor.Wait(scheduledTasks); + + return disposed ? null : scheduledTasks.Dequeue(); + } } } }