From 314a67db002fe3b1cdecc61780242c5756c5d01a Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 14 Aug 2019 20:48:40 +0200 Subject: [PATCH] [ci skip] Bit of refactoring and bugfixing, mostly documentation --- Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj | 2 +- .../Annotations/ContinuationAttribute.cs | 5 + Tapeti.Flow/Annotations/StartAttribute.cs | 5 + Tapeti.Flow/ConfigExtensions.cs | 11 +- Tapeti.Flow/ContextItems.cs | 6 + Tapeti.Flow/Default/FlowBindingMiddleware.cs | 6 +- ...eware.cs => FlowContinuationMiddleware.cs} | 8 +- Tapeti.Flow/Default/FlowProvider.cs | 10 + Tapeti.Flow/Default/FlowStarter.cs | 6 +- Tapeti.Flow/Default/FlowState.cs | 64 +++++ Tapeti.Flow/Default/FlowStore.cs | 5 +- .../{FlowMiddleware.cs => FlowExtension.cs} | 11 +- Tapeti.Flow/IFlowProvider.cs | 119 +++++++++- Tapeti.Flow/IFlowStore.cs | 38 +++ Tapeti.Flow/Tapeti.Flow.csproj | 2 +- Tapeti.Serilog/Tapeti.Serilog.csproj | 2 +- .../Tapeti.SimpleInjector.csproj | 2 +- Tapeti.Tests/Tapeti.Tests.csproj | 9 +- Tapeti.Transient/TransientGenericBinding.cs | 9 +- Tapeti/Config/IBinding.cs | 15 +- Tapeti/Config/IControllerBindingContext.cs | 10 +- Tapeti/Config/IControllerCleanupMiddleware.cs | 5 +- Tapeti/Config/IControllerMessageContext.cs | 17 -- Tapeti/Config/IMessageContext.cs | 17 ++ Tapeti/Config/ITapetiConfigBuilder.cs | 2 +- Tapeti/Connection/TapetiClient.cs | 16 +- Tapeti/Connection/TapetiConsumer.cs | 9 +- Tapeti/Connection/TapetiSubscriber.cs | 10 +- Tapeti/Default/ControllerBindingContext.cs | 179 ++++++++++++++ Tapeti/Default/ControllerMessageContext.cs | 48 ++-- Tapeti/Default/ControllerMethodBinding.cs | 223 ++++++++++++++++-- Tapeti/Default/MessageBinding.cs | 17 +- Tapeti/Default/MessageContext.cs | 33 ++- Tapeti/Default/RabbitMQMessageProperties.cs | 8 +- Tapeti/Tapeti.csproj | 4 +- Tapeti/TapetiConfig.cs | 140 +---------- Tapeti/TapetiConfigControllers.cs | 62 +++-- Test/FlowEndController.cs | 2 + Test/Program.cs | 2 - 39 files changed, 871 insertions(+), 268 deletions(-) rename Tapeti.Flow/Default/{FlowMiddleware.cs => FlowContinuationMiddleware.cs} (91%) rename Tapeti.Flow/{FlowMiddleware.cs => FlowExtension.cs} (75%) create mode 100644 Tapeti/Default/ControllerBindingContext.cs diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj index 2da63e5..44bd845 100644 --- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj +++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj @@ -10,7 +10,7 @@ - + diff --git a/Tapeti.Flow/Annotations/ContinuationAttribute.cs b/Tapeti.Flow/Annotations/ContinuationAttribute.cs index 8749bf8..2612a30 100644 --- a/Tapeti.Flow/Annotations/ContinuationAttribute.cs +++ b/Tapeti.Flow/Annotations/ContinuationAttribute.cs @@ -2,6 +2,11 @@ namespace Tapeti.Flow.Annotations { + /// + /// + /// Marks a message handler as a response message handler which continues a Tapeti Flow. + /// The method only receives direct messages, and does not create a routing key based binding to the queue. + /// [AttributeUsage(AttributeTargets.Method)] public class ContinuationAttribute : Attribute { diff --git a/Tapeti.Flow/Annotations/StartAttribute.cs b/Tapeti.Flow/Annotations/StartAttribute.cs index 3f8e767..8c1fd2e 100644 --- a/Tapeti.Flow/Annotations/StartAttribute.cs +++ b/Tapeti.Flow/Annotations/StartAttribute.cs @@ -3,6 +3,11 @@ using JetBrains.Annotations; namespace Tapeti.Flow.Annotations { + /// + /// + /// Marks this method as the start of a Tapeti Flow. Use IFlowStarter.Start to begin a new flow and + /// call this method. Must return an IYieldPoint. + /// [AttributeUsage(AttributeTargets.Method)] [MeansImplicitUse] public class StartAttribute : Attribute diff --git a/Tapeti.Flow/ConfigExtensions.cs b/Tapeti.Flow/ConfigExtensions.cs index e016172..474da6a 100644 --- a/Tapeti.Flow/ConfigExtensions.cs +++ b/Tapeti.Flow/ConfigExtensions.cs @@ -2,11 +2,20 @@ namespace Tapeti.Flow { + /// + /// ITapetiConfigBuilder extension for enabling Flow. + /// public static class ConfigExtensions { + /// + /// Enables Tapeti Flow. + /// + /// + /// An optional IFlowRepository implementation to persist flow state. If not provided, flow state will be lost when the application restarts. + /// public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository flowRepository = null) { - config.Use(new FlowMiddleware(flowRepository)); + config.Use(new FlowExtension(flowRepository)); return config; } } diff --git a/Tapeti.Flow/ContextItems.cs b/Tapeti.Flow/ContextItems.cs index 452de54..8bebc23 100644 --- a/Tapeti.Flow/ContextItems.cs +++ b/Tapeti.Flow/ContextItems.cs @@ -1,7 +1,13 @@ namespace Tapeti.Flow { + /// + /// Key names as used in the message context store. For internal use. + /// public static class ContextItems { + /// + /// Key given to the FlowContext object as stored in the message context. + /// public const string FlowContext = "Tapeti.Flow.FlowContext"; } } diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 7a65642..00f8469 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -15,9 +15,6 @@ namespace Tapeti.Flow.Default if (context.Method.GetCustomAttribute() != null) return; - if (context.Method.GetCustomAttribute() != null) - context.SetBindingTargetMode(BindingTargetMode.Direct); - RegisterYieldPointResult(context); RegisterContinuationFilter(context); @@ -33,7 +30,8 @@ namespace Tapeti.Flow.Default if (continuationAttribute == null) return; - context.Use(new FlowMiddleware()); + context.SetBindingTargetMode(BindingTargetMode.Direct); + context.Use(new FlowContinuationMiddleware()); if (context.Result.HasHandler) return; diff --git a/Tapeti.Flow/Default/FlowMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs similarity index 91% rename from Tapeti.Flow/Default/FlowMiddleware.cs rename to Tapeti.Flow/Default/FlowContinuationMiddleware.cs index 1dd1571..d138b08 100644 --- a/Tapeti.Flow/Default/FlowMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -6,11 +6,11 @@ using Tapeti.Flow.FlowHelpers; namespace Tapeti.Flow.Default { - public class FlowMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware + public class FlowContinuationMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware { public async Task Filter(IControllerMessageContext context, Func next) { - var flowContext = await CreateFlowContext(context); + var flowContext = await EnrichWithFlowContext(context); if (flowContext?.ContinuationMetadata == null) return; @@ -44,7 +44,7 @@ namespace Tapeti.Flow.Default } - public async Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next) + public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func next) { await next(); @@ -62,7 +62,7 @@ namespace Tapeti.Flow.Default - private static async Task CreateFlowContext(IControllerMessageContext context) + private static async Task EnrichWithFlowContext(IControllerMessageContext context) { if (context.Get(ContextItems.FlowContext, out FlowContext flowContext)) return flowContext; diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 9da8a7d..fe71a05 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -12,6 +12,10 @@ using Tapeti.Flow.FlowHelpers; namespace Tapeti.Flow.Default { + /// /> + /// + /// Default implementation for IFlowProvider. + /// public class FlowProvider : IFlowProvider, IFlowHandler { private readonly ITapetiConfig config; @@ -25,28 +29,33 @@ namespace Tapeti.Flow.Default } + /// public IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } + /// public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } + /// public IFlowParallelRequestBuilder YieldWithParallelRequest() { return new ParallelRequestBuilder(config, SendRequest); } + /// public IYieldPoint EndWithResponse(TResponse message) { return new DelegateYieldPoint(context => SendResponse(context, message)); } + /// public IYieldPoint End() { return new DelegateYieldPoint(EndFlow); @@ -179,6 +188,7 @@ namespace Tapeti.Flow.Default }; } + /// public async Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint) { if (!(yieldPoint is DelegateYieldPoint executableYieldPoint)) diff --git a/Tapeti.Flow/Default/FlowStarter.cs b/Tapeti.Flow/Default/FlowStarter.cs index 04b4dc9..359589a 100644 --- a/Tapeti.Flow/Default/FlowStarter.cs +++ b/Tapeti.Flow/Default/FlowStarter.cs @@ -47,17 +47,19 @@ namespace Tapeti.Flow.Default var controller = config.DependencyResolver.Resolve(); var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters)); - var context = new ControllerMessageContext + /* + var context = new ControllerMessageContext() { Config = config, Controller = controller }; + */ var flowHandler = config.DependencyResolver.Resolve(); try { - await flowHandler.Execute(context, yieldPoint); + //await flowHandler.Execute(context, yieldPoint); //handlingResult.ConsumeResponse = ConsumeResponse.Ack; } finally diff --git a/Tapeti.Flow/Default/FlowState.cs b/Tapeti.Flow/Default/FlowState.cs index e32430c..eb7961d 100644 --- a/Tapeti.Flow/Default/FlowState.cs +++ b/Tapeti.Flow/Default/FlowState.cs @@ -4,20 +4,34 @@ using System.Linq; namespace Tapeti.Flow.Default { + /// + /// Represents the state stored for active flows. + /// public class FlowState { private FlowMetadata metadata; private Dictionary continuations; + /// + /// Contains metadata about the flow. + /// public FlowMetadata Metadata { get => metadata ?? (metadata = new FlowMetadata()); set => metadata = value; } + + /// + /// Contains the serialized state which is restored when a flow continues. + /// public string Data { get; set; } + + /// + /// Contains metadata about continuations awaiting a response. + /// public Dictionary Continuations { get => continuations ?? (continuations = new Dictionary()); @@ -25,6 +39,9 @@ namespace Tapeti.Flow.Default } + /// + /// Creates a deep clone of this FlowState. + /// public FlowState Clone() { return new FlowState { @@ -36,11 +53,20 @@ namespace Tapeti.Flow.Default } + /// + /// Contains metadata about the flow. + /// public class FlowMetadata { + /// + /// Contains information about the expected response for this flow. + /// public ReplyMetadata Reply { get; set; } + /// + /// Creates a deep clone of this FlowMetadata. + /// public FlowMetadata Clone() { return new FlowMetadata @@ -51,15 +77,36 @@ namespace Tapeti.Flow.Default } + /// + /// Contains information about the expected response for this flow. + /// public class ReplyMetadata { + /// + /// The queue to which the response should be sent. + /// public string ReplyTo { get; set; } + + /// + /// The correlation ID included in the original request. + /// public string CorrelationId { get; set; } + + /// + /// The expected response message class. + /// public string ResponseTypeName { get; set; } + /// + /// Indicates whether the response should be sent a mandatory. + /// False for requests originating from a dynamic queue. + /// public bool Mandatory { get; set; } + /// + /// Creates a deep clone of this ReplyMetadata. + /// public ReplyMetadata Clone() { return new ReplyMetadata @@ -73,13 +120,30 @@ namespace Tapeti.Flow.Default } + /// + /// Contains metadata about a continuation awaiting a response. + /// public class ContinuationMetadata { + /// + /// The name of the method which will handle the response. + /// public string MethodName { get; set; } + + /// + /// The name of the method which is called when all responses have been processed. + /// public string ConvergeMethodName { get; set; } + + /// + /// Determines if the converge method is synchronous or asynchronous. + /// public bool ConvergeMethodSync { get; set; } + /// + /// Creates a deep clone of this ContinuationMetadata. + /// public ContinuationMetadata Clone() { return new ContinuationMetadata diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index b98923d..6df6955 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -1,13 +1,16 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Tapeti.Flow.FlowHelpers; namespace Tapeti.Flow.Default { + /// + /// + /// Default implementation of IFlowStore. + /// public class FlowStore : IFlowStore { private readonly ConcurrentDictionary flowStates = new ConcurrentDictionary(); diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowExtension.cs similarity index 75% rename from Tapeti.Flow/FlowMiddleware.cs rename to Tapeti.Flow/FlowExtension.cs index a4b9cdb..05a17d2 100644 --- a/Tapeti.Flow/FlowMiddleware.cs +++ b/Tapeti.Flow/FlowExtension.cs @@ -4,15 +4,21 @@ using Tapeti.Flow.Default; namespace Tapeti.Flow { - public class FlowMiddleware : ITapetiExtension + /// + /// + /// Provides the Flow middleware. + /// + public class FlowExtension : ITapetiExtension { private readonly IFlowRepository flowRepository; - public FlowMiddleware(IFlowRepository flowRepository) + /// + public FlowExtension(IFlowRepository flowRepository) { this.flowRepository = flowRepository; } + /// public void RegisterDefaults(IDependencyContainer container) { container.RegisterDefault(); @@ -22,6 +28,7 @@ namespace Tapeti.Flow container.RegisterDefaultSingleton(); } + /// public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) { yield return new FlowBindingMiddleware(); diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index b5fd107..4cbfeec 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -7,49 +7,162 @@ using Tapeti.Config; namespace Tapeti.Flow { + /// + /// Provides methods to build an IYieldPoint to indicate if and how Flow should continue. + /// public interface IFlowProvider { + /// + /// Publish a request message and continue the flow when the response arrives. + /// The request message must be marked with the [Request] attribute, and the + /// Response type must match. Used for asynchronous response handlers. + /// + /// + /// + /// + /// IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler); - // One does not simply overload methods with Task vs non-Task Funcs. "Ambiguous call". - // Apparantly this is because a return type of a method is not part of its signature, - // according to: http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter + + /// + /// Publish a request message and continue the flow when the response arrives. + /// The request message must be marked with the [Request] attribute, and the + /// Response type must match. Used for synchronous response handlers. + /// + /// + /// The reason why this requires the extra 'Sync' in the name: one does not simply overload methods + /// with Task vs non-Task Funcs. "Ambiguous call". Apparantly this is because a return type + /// of a method is not part of its signature,according to: + /// http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter + /// + /// + /// + /// + /// + /// IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler); + + /// + /// Create a request builder to publish one or more requests messages. Call Yield on the resulting builder + /// to acquire an IYieldPoint. + /// IFlowParallelRequestBuilder YieldWithParallelRequest(); + + /// + /// End the flow by publishing the specified response message. Only allowed, and required, when the + /// current flow was started by a message handler for a Request message. + /// + /// + /// IYieldPoint EndWithResponse(TResponse message); + + + /// + /// End the flow and dispose any state. + /// IYieldPoint End(); } + /// /// Allows starting a flow outside of a message handler. /// public interface IFlowStarter { + /// + /// Starts a new flow. + /// + /// Task Start(Expression>> methodSelector) where TController : class; + + /// + /// Starts a new flow. + /// + /// Task Start(Expression>>> methodSelector) where TController : class; + + /// + /// Starts a new flow and passes the parameter to the method. + /// + /// + /// Task Start(Expression>> methodSelector, TParameter parameter) where TController : class; + + /// + /// Starts a new flow and passes the parameter to the method. + /// + /// + /// Task Start(Expression>>> methodSelector, TParameter parameter) where TController : class; } + /// /// Internal interface. Do not call directly. /// public interface IFlowHandler { + /// + /// Executes the YieldPoint for the given message context. + /// + /// + /// Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint); } + + /// + /// Builder to publish one or more request messages and continuing the flow when the responses arrive. + /// public interface IFlowParallelRequestBuilder { + /// + /// Publish a request message and continue the flow when the response arrives. + /// Note that the response handler can not influence the flow as it does not return a YieldPoint. + /// It can instead store state in the controller for the continuation passed to the Yield method. + /// Used for asynchronous response handlers. + /// + /// + /// IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + + /// + /// Publish a request message and continue the flow when the response arrives. + /// Note that the response handler can not influence the flow as it does not return a YieldPoint. + /// It can instead store state in the controller for the continuation passed to the Yield method. + /// Used for synchronous response handlers. + /// + /// + /// IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler); + /// + /// Constructs an IYieldPoint to continue the flow when responses arrive. + /// The continuation method is called when all responses have arrived. + /// Response handlers and the continuation method are guaranteed thread-safe access to the + /// controller and can store state. + /// Used for asynchronous continuation methods. + /// + /// IYieldPoint Yield(Func> continuation); + + /// + /// Constructs an IYieldPoint to continue the flow when responses arrive. + /// The continuation method is called when all responses have arrived. + /// Response handlers and the continuation method are guaranteed thread-safe access to the + /// controller and can store state. + /// Used for synchronous continuation methods. + /// + /// IYieldPoint YieldSync(Func continuation); } + + /// + /// Defines if and how the Flow should continue. Construct using any of the IFlowProvider methods. + /// public interface IYieldPoint { } diff --git a/Tapeti.Flow/IFlowStore.cs b/Tapeti.Flow/IFlowStore.cs index 17da5e8..fec1fb0 100644 --- a/Tapeti.Flow/IFlowStore.cs +++ b/Tapeti.Flow/IFlowStore.cs @@ -6,19 +6,57 @@ using Tapeti.Flow.Default; namespace Tapeti.Flow { + /// + /// Provides a way to store and load flow state. + /// public interface IFlowStore { + /// + /// Must be called during application startup before subscribing or starting a flow. + /// If using an IFlowRepository that requires an update (such as creating tables) make + /// sure it is called before calling Load. + /// Task Load(); + + /// + /// Looks up the FlowID corresponding to a ContinuationID. For internal use. + /// + /// Task FindFlowID(Guid continuationID); + + /// + /// Acquires a lock on the flow with the specified FlowID. + /// + /// Task LockFlowState(Guid flowID); } + + /// + /// + /// Represents a lock on the flow state, to provide thread safety. + /// public interface IFlowStateLock : IDisposable { + /// + /// The unique ID of the flow state. + /// Guid FlowID { get; } + /// + /// Acquires a copy of the flow state. + /// Task GetFlowState(); + + /// + /// Stores the new flow state. + /// + /// Task StoreFlowState(FlowState flowState); + + /// + /// Disposes of the flow state corresponding to this Flow ID. + /// Task DeleteFlowState(); } } diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index a2c4094..c6b101f 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -6,7 +6,7 @@ - 1701;1702;1591 + 1701;1702 diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj index dbe7a51..c643fa4 100644 --- a/Tapeti.Serilog/Tapeti.Serilog.csproj +++ b/Tapeti.Serilog/Tapeti.Serilog.csproj @@ -10,7 +10,7 @@ - + diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj index 436911d..4804e37 100644 --- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj @@ -10,7 +10,7 @@ - + diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index 41ef4bd..e05f517 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -9,9 +9,12 @@ - - - + + + + all + runtime; build; native; contentfiles; analyzers + diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs index f2f3e5e..2d65410 100644 --- a/Tapeti.Transient/TransientGenericBinding.cs +++ b/Tapeti.Transient/TransientGenericBinding.cs @@ -29,7 +29,7 @@ namespace Tapeti.Transient /// public async Task Apply(IBindingTarget target) { - QueueName = await target.BindDirectDynamic(dynamicQueuePrefix); + QueueName = await target.BindDynamicDirect(dynamicQueuePrefix); router.TransientResponseQueueName = QueueName; } @@ -47,5 +47,12 @@ namespace Tapeti.Transient router.HandleMessage(context); return Task.CompletedTask; } + + + /// + public Task Cleanup(IMessageContext context, ConsumeResult consumeResult) + { + return Task.CompletedTask; + } } } \ No newline at end of file diff --git a/Tapeti/Config/IBinding.cs b/Tapeti/Config/IBinding.cs index e717a62..eb204be 100644 --- a/Tapeti/Config/IBinding.cs +++ b/Tapeti/Config/IBinding.cs @@ -33,6 +33,15 @@ namespace Tapeti.Config /// /// Task Invoke(IMessageContext context); + + + /// + /// Called after the handler is invoked and any exception handling has been done. + /// + /// + /// + /// + Task Cleanup(IMessageContext context, ConsumeResult consumeResult); } @@ -67,7 +76,7 @@ namespace Tapeti.Config /// Used for direct-to-queue messages. /// /// The name of the durable queue - Task BindDirectDurable(string queueName); + Task BindDurableDirect(string queueName); /// /// Declares a dynamic queue but does not add a binding for a messageClass' routing key. @@ -76,7 +85,7 @@ namespace Tapeti.Config /// The message class which will be handled on the queue. It is not actually bound to the queue. /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. /// The generated name of the dynamic queue - Task BindDirectDynamic(Type messageClass = null, string queuePrefix = null); + Task BindDynamicDirect(Type messageClass = null, string queuePrefix = null); /// /// Declares a dynamic queue but does not add a binding for a messageClass' routing key. @@ -84,6 +93,6 @@ namespace Tapeti.Config /// /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. /// The generated name of the dynamic queue - Task BindDirectDynamic(string queuePrefix = null); + Task BindDynamicDirect(string queuePrefix = null); } } diff --git a/Tapeti/Config/IControllerBindingContext.cs b/Tapeti/Config/IControllerBindingContext.cs index 9877ff0..a7a63db 100644 --- a/Tapeti/Config/IControllerBindingContext.cs +++ b/Tapeti/Config/IControllerBindingContext.cs @@ -43,10 +43,16 @@ namespace Tapeti.Config public interface IControllerBindingContext { /// - /// The message class for this method. + /// The message class for this method. Can be null if not yet set by the default MessageBinding or other middleware. + /// If required, call next first to ensure it is available. /// Type MessageClass { get; } + /// + /// Determines if SetMessageClass has already been called. + /// + bool HasMessageClass { get; } + /// /// The controller class for this binding. /// @@ -69,7 +75,7 @@ namespace Tapeti.Config /// - /// Sets the message class for this method. Can only be called once, which is always done first by the default MessageBinding. + /// Sets the message class for this method. Can only be called once, which is normally done by the default MessageBinding. /// /// void SetMessageClass(Type messageClass); diff --git a/Tapeti/Config/IControllerCleanupMiddleware.cs b/Tapeti/Config/IControllerCleanupMiddleware.cs index fc6c686..c089b82 100644 --- a/Tapeti/Config/IControllerCleanupMiddleware.cs +++ b/Tapeti/Config/IControllerCleanupMiddleware.cs @@ -3,11 +3,10 @@ using System.Threading.Tasks; namespace Tapeti.Config { - /// /// /// Denotes middleware that runs after controller methods. /// - public interface IControllerCleanupMiddleware : IControllerMiddlewareBase + public interface IControllerCleanupMiddleware { /// /// Called after the message handler method, even if exceptions occured. @@ -15,6 +14,6 @@ namespace Tapeti.Config /// /// /// Always call to allow the next in the chain to clean up - Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next); + Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func next); } } diff --git a/Tapeti/Config/IControllerMessageContext.cs b/Tapeti/Config/IControllerMessageContext.cs index 25108c9..3eec194 100644 --- a/Tapeti/Config/IControllerMessageContext.cs +++ b/Tapeti/Config/IControllerMessageContext.cs @@ -16,22 +16,5 @@ /// Provides access to the binding which is currently processing the message. /// new IControllerMethodBinding Binding { get; } - - - /// - /// Stores a key-value pair in the context for passing information between the various - /// controller middleware stages (IControllerMiddlewareBase descendants). - /// - /// A unique key. It is recommended to prefix it with the package name which hosts the middleware to prevent conflicts - /// Will be disposed if the value implements IDisposable - void Store(string key, object value); - - /// - /// Retrieves a previously stored value. - /// - /// - /// - /// True if the value was found, False otherwise - bool Get(string key, out T value) where T : class; } } diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs index f8a839f..c3314c4 100644 --- a/Tapeti/Config/IMessageContext.cs +++ b/Tapeti/Config/IMessageContext.cs @@ -42,5 +42,22 @@ namespace Tapeti.Config /// Provides access to the binding which is currently processing the message. /// IBinding Binding { get; } + + + /// + /// Stores a key-value pair in the context for passing information between the various + /// middleware stages (mostly for IControllerMiddlewareBase descendants). + /// + /// A unique key. It is recommended to prefix it with the package name which hosts the middleware to prevent conflicts + /// Will be disposed if the value implements IDisposable + void Store(string key, object value); + + /// + /// Retrieves a previously stored value. + /// + /// + /// + /// True if the value was found, False otherwise + bool Get(string key, out T value) where T : class; } } diff --git a/Tapeti/Config/ITapetiConfigBuilder.cs b/Tapeti/Config/ITapetiConfigBuilder.cs index dab033d..51962ab 100644 --- a/Tapeti/Config/ITapetiConfigBuilder.cs +++ b/Tapeti/Config/ITapetiConfigBuilder.cs @@ -107,7 +107,7 @@ namespace Tapeti.Config IDependencyResolver DependencyResolver { get; } /// - /// Applies the currently registered binding middleware to + /// Applies the currently registered binding middleware to the specified context. /// /// /// diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index c8b3bcd..cbe1cfb 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -97,7 +97,8 @@ namespace Tapeti.Connection /// public async Task Publish(byte[] body, IMessageProperties properties, string exchange, string routingKey, bool mandatory) { - var publishProperties = new RabbitMQMessageProperties(new BasicProperties(), properties); + if (string.IsNullOrEmpty(routingKey)) + throw new ArgumentNullException(nameof(routingKey)); await taskQueue.Value.Add(async () => { @@ -131,7 +132,18 @@ namespace Tapeti.Connection else mandatory = false; - channel.BasicPublish(exchange, routingKey, mandatory, publishProperties.BasicProperties, body); + try + { + var publishProperties = new RabbitMQMessageProperties(channel.CreateBasicProperties(), properties); + channel.BasicPublish(exchange ?? "", routingKey, mandatory, publishProperties.BasicProperties, body); + } + catch + { + messageInfo.CompletionSource.SetCanceled(); + publishResultTask = null; + + throw; + } }); diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index ff3ea78..1b57bb8 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Runtime.ExceptionServices; using Tapeti.Config; using Tapeti.Default; using System.Threading.Tasks; @@ -56,9 +55,6 @@ namespace Tapeti.Connection } catch (Exception dispatchException) { - // TODO check if this is still necessary: - // var exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch)); - using (var emptyContext = new MessageContext { Config = config, @@ -119,15 +115,18 @@ namespace Tapeti.Connection try { await MiddlewareHelper.GoAsync(config.Middleware.Message, - (handler, next) => handler.Handle(context, next), + async (handler, next) => await handler.Handle(context, next), async () => { await binding.Invoke(context); }); + await binding.Cleanup(context, ConsumeResult.Success); return ConsumeResult.Success; } catch (Exception invokeException) { var exceptionContext = new ExceptionStrategyContext(context, invokeException); HandleException(exceptionContext); + + await binding.Cleanup(context, exceptionContext.ConsumeResult); return exceptionContext.ConsumeResult; } } diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 6013b61..b5cc87a 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -96,7 +96,7 @@ namespace Tapeti.Connection public abstract Task BindDurable(Type messageClass, string queueName); - public abstract Task BindDirectDurable(string queueName); + public abstract Task BindDurableDirect(string queueName); public async Task BindDynamic(Type messageClass, string queuePrefix = null) @@ -115,14 +115,14 @@ namespace Tapeti.Connection } - public async Task BindDirectDynamic(Type messageClass, string queuePrefix = null) + public async Task BindDynamicDirect(Type messageClass, string queuePrefix = null) { var result = await DeclareDynamicQueue(messageClass, queuePrefix); return result.QueueName; } - public async Task BindDirectDynamic(string queuePrefix = null) + public async Task BindDynamicDirect(string queuePrefix = null) { // If we don't know the routing key, always create a new queue to ensure there is no overlap. // Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either. @@ -215,7 +215,7 @@ namespace Tapeti.Connection } - public override Task BindDirectDurable(string queueName) + public override Task BindDurableDirect(string queueName) { if (!durableQueues.ContainsKey(queueName)) durableQueues.Add(queueName, new List()); @@ -259,7 +259,7 @@ namespace Tapeti.Connection await VerifyDurableQueue(queueName); } - public override async Task BindDirectDurable(string queueName) + public override async Task BindDurableDirect(string queueName) { await VerifyDurableQueue(queueName); } diff --git a/Tapeti/Default/ControllerBindingContext.cs b/Tapeti/Default/ControllerBindingContext.cs new file mode 100644 index 0000000..9f7b4c2 --- /dev/null +++ b/Tapeti/Default/ControllerBindingContext.cs @@ -0,0 +1,179 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using Tapeti.Config; + +namespace Tapeti.Default +{ + /// + /// + /// Default implementation for IControllerBindingContext + /// + public class ControllerBindingContext : IControllerBindingContext + { + private BindingTargetMode? bindingTargetMode; + private readonly List middleware = new List(); + private readonly List parameters; + private readonly ControllerBindingResult result; + + /// + /// Determines how the binding target is configured. + /// + public BindingTargetMode BindingTargetMode => bindingTargetMode ?? BindingTargetMode.Default; + + + /// + /// Provides access to the registered middleware for this method. + /// + public IReadOnlyList Middleware => middleware; + + + /// + public Type MessageClass { get; set; } + + /// + public bool HasMessageClass => MessageClass != null; + + /// + public Type Controller { get; set; } + + /// + public MethodInfo Method { get; set; } + + /// + public IReadOnlyList Parameters => parameters; + + /// + public IBindingResult Result => result; + + + /// + public ControllerBindingContext(IEnumerable parameters, ParameterInfo result) + { + this.parameters = parameters.Select(parameter => new ControllerBindingParameter(parameter)).ToList(); + + this.result = new ControllerBindingResult(result); + } + + + /// + public void SetMessageClass(Type messageClass) + { + if (HasMessageClass) + throw new InvalidOperationException("SetMessageClass can only be called once"); + + MessageClass = messageClass; + } + + + /// + public void SetBindingTargetMode(BindingTargetMode mode) + { + if (bindingTargetMode.HasValue) + throw new InvalidOperationException("SetBindingTargetMode can only be called once"); + + bindingTargetMode = mode; + } + + + /// + public void Use(IControllerMiddlewareBase handler) + { + middleware.Add(handler); + } + + + /// + /// Returns the configured bindings for the parameters. + /// + public IEnumerable GetParameterHandlers() + { + return parameters.Select(p => p.Binding); + } + + + /// + /// Returns the configured result handler. + /// + /// + public ResultHandler GetResultHandler() + { + return result.Handler; + } + } + + + /// + /// + /// Default implementation for IBindingParameter + /// + public class ControllerBindingParameter : IBindingParameter + { + /// + /// Provides access to the configured binding. + /// + public ValueFactory Binding { get; set; } + + + /// + public ParameterInfo Info { get; } + + /// + public bool HasBinding => Binding != null; + + + /// + public ControllerBindingParameter(ParameterInfo info) + { + Info = info; + } + + + /// + public void SetBinding(ValueFactory valueFactory) + { + if (Binding != null) + throw new InvalidOperationException("SetBinding can only be called once"); + + Binding = valueFactory; + } + } + + + /// + /// + /// Default implementation for IBindingResult + /// + public class ControllerBindingResult : IBindingResult + { + /// + /// Provides access to the configured handler. + /// + public ResultHandler Handler { get; set; } + + + /// + public ParameterInfo Info { get; } + + /// + public bool HasHandler => Handler != null; + + + /// + public ControllerBindingResult(ParameterInfo info) + { + Info = info; + } + + + /// + public void SetHandler(ResultHandler resultHandler) + { + if (Handler != null) + throw new InvalidOperationException("SetHandler can only be called once"); + + Handler = resultHandler; + } + } +} diff --git a/Tapeti/Default/ControllerMessageContext.cs b/Tapeti/Default/ControllerMessageContext.cs index 4fc410a..8aaddc6 100644 --- a/Tapeti/Default/ControllerMessageContext.cs +++ b/Tapeti/Default/ControllerMessageContext.cs @@ -5,46 +5,60 @@ using Tapeti.Config; namespace Tapeti.Default { /// - public class ControllerMessageContext : MessageContext, IControllerMessageContext + public class ControllerMessageContext : IControllerMessageContext { - private readonly Dictionary items = new Dictionary(); - + private readonly IMessageContext decoratedContext; /// public object Controller { get; set; } /// - public new IControllerMethodBinding Binding { get; set; } + public ITapetiConfig Config => decoratedContext.Config; + + /// + public string Queue => decoratedContext.Queue; + + /// + public string Exchange => decoratedContext.Exchange; + + /// + public string RoutingKey => decoratedContext.RoutingKey; + + /// + public object Message => decoratedContext.Message; + + /// + public IMessageProperties Properties => decoratedContext.Properties; + + + IBinding IMessageContext.Binding => decoratedContext.Binding; + IControllerMethodBinding IControllerMessageContext.Binding => decoratedContext.Binding as IControllerMethodBinding; /// - public override void Dispose() + public ControllerMessageContext(IMessageContext decoratedContext) { - foreach (var item in items.Values) - (item as IDisposable)?.Dispose(); + this.decoratedContext = decoratedContext; + } - base.Dispose(); + + /// + public void Dispose() + { } /// public void Store(string key, object value) { - items.Add(key, value); + decoratedContext.Store(key, value); } /// public bool Get(string key, out T value) where T : class { - if (!items.TryGetValue(key, out var objectValue)) - { - value = default(T); - return false; - } - - value = (T)objectValue; - return true; + return decoratedContext.Get(key, out value); } } } diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index b2b8986..54eb163 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -1,7 +1,10 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Reflection; using System.Threading.Tasks; using Tapeti.Config; +using Tapeti.Helpers; namespace Tapeti.Default { @@ -13,9 +16,68 @@ namespace Tapeti.Default /// public class ControllerMethodBinding : IBinding { - private readonly Type controller; - private readonly MethodInfo method; - private readonly QueueInfo queueInfo; + /// + /// Contains all the required information to bind a controller method to a queue. + /// + public struct BindingInfo + { + /// + /// The controller type associated with this binding. + /// + public Type ControllerType; + + /// + /// The method called when this binding is invoked. + /// + public MethodInfo Method; + + /// + /// The queue this binding consumes. + /// + public QueueInfo QueueInfo; + + /// + /// The message class handled by this binding's method. + /// + public Type MessageClass; + + /// + /// Indicates whether this method accepts messages to the exchange by routing key, or direct-to-queue only. + /// + public BindingTargetMode BindingTargetMode; + + /// + /// Value factories for the method parameters. + /// + public IEnumerable ParameterFactories; + + /// + /// The return value handler. + /// + public ResultHandler ResultHandler; + + + /// + /// Filter middleware as registered by the binding middleware. + /// + public IReadOnlyList FilterMiddleware; + + /// + /// Message middleware as registered by the binding middleware. + /// + public IReadOnlyList MessageMiddleware; + + /// + /// Cleanup middleware as registered by the binding middleware. + /// + public IReadOnlyList CleanupMiddleware; + } + + + private readonly IDependencyResolver dependencyResolver; + private readonly BindingInfo bindingInfo; + + private readonly MessageHandlerFunc messageHandler; /// @@ -23,37 +85,170 @@ namespace Tapeti.Default /// - public ControllerMethodBinding(Type controller, MethodInfo method, QueueInfo queueInfo) + public ControllerMethodBinding(IDependencyResolver dependencyResolver, BindingInfo bindingInfo) { - this.controller = controller; - this.method = method; - this.queueInfo = queueInfo; + this.dependencyResolver = dependencyResolver; + this.bindingInfo = bindingInfo; + + messageHandler = WrapMethod(bindingInfo.Method, bindingInfo.ParameterFactories, bindingInfo.ResultHandler); } /// - public Task Apply(IBindingTarget target) + public async Task Apply(IBindingTarget target) { - // TODO ControllerMethodBinding - throw new NotImplementedException(); + switch (bindingInfo.BindingTargetMode) + { + case BindingTargetMode.Default: + if (bindingInfo.QueueInfo.Dynamic) + QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name); + else + { + await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name); + QueueName = bindingInfo.QueueInfo.Name; + } + + break; + + case BindingTargetMode.Direct: + if (bindingInfo.QueueInfo.Dynamic) + QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name); + else + { + await target.BindDurableDirect(bindingInfo.QueueInfo.Name); + QueueName = bindingInfo.QueueInfo.Name; + } + + break; + + default: + throw new ArgumentOutOfRangeException(nameof(bindingInfo.BindingTargetMode), bindingInfo.BindingTargetMode, "Invalid BindingTargetMode"); + } } /// public bool Accept(Type messageClass) { - throw new NotImplementedException(); + return messageClass == bindingInfo.MessageClass; } + /// - public Task Invoke(IMessageContext context) + public async Task Invoke(IMessageContext context) { - throw new NotImplementedException(); + var controller = dependencyResolver.Resolve(bindingInfo.ControllerType); + + using (var controllerContext = new ControllerMessageContext(context) + { + Controller = controller + }) + { + if (!await FilterAllowed(controllerContext)) + return; + + + await MiddlewareHelper.GoAsync( + bindingInfo.MessageMiddleware, + async (handler, next) => await handler.Handle(controllerContext, next), + async () => await messageHandler(controllerContext)); + } } + /// + public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult) + { + await MiddlewareHelper.GoAsync( + bindingInfo.CleanupMiddleware, + async (handler, next) => await handler.Cleanup(context, ConsumeResult.Success, next), + () => Task.CompletedTask); + } + + + private async Task FilterAllowed(IControllerMessageContext context) + { + var allowed = false; + await MiddlewareHelper.GoAsync( + bindingInfo.FilterMiddleware, + async (handler, next) => await handler.Filter(context, next), + () => + { + allowed = true; + return Task.CompletedTask; + }); + + return allowed; + } + + + private delegate Task MessageHandlerFunc(IControllerMessageContext context); + + + private static MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable parameterFactories, ResultHandler resultHandler) + { + if (resultHandler != null) + return WrapResultHandlerMethod(method, parameterFactories, resultHandler); + + if (method.ReturnType == typeof(void)) + return WrapNullMethod(method, parameterFactories); + + if (method.ReturnType == typeof(Task)) + return WrapTaskMethod(method, parameterFactories); + + if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>)) + return WrapGenericTaskMethod(method, parameterFactories); + + return WrapObjectMethod(method, parameterFactories); + } + + + private static MessageHandlerFunc WrapResultHandlerMethod(MethodBase method, IEnumerable parameterFactories, ResultHandler resultHandler) + { + return context => + { + var result = method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()); + return resultHandler(context, result); + }; + } + + private static MessageHandlerFunc WrapNullMethod(MethodBase method, IEnumerable parameterFactories) + { + return context => + { + method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()); + return Task.CompletedTask; + }; + } + + + private static MessageHandlerFunc WrapTaskMethod(MethodBase method, IEnumerable parameterFactories) + { + return context => (Task)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()); + } + + + private static MessageHandlerFunc WrapGenericTaskMethod(MethodBase method, IEnumerable parameterFactories) + { + return context => + { + return (Task)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()); + }; + } + + + private static MessageHandlerFunc WrapObjectMethod(MethodBase method, IEnumerable parameterFactories) + { + return context => + { + return Task.FromResult(method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray())); + }; + } + + + /// - /// + /// Contains information about the queue linked to the controller method. /// public class QueueInfo { diff --git a/Tapeti/Default/MessageBinding.cs b/Tapeti/Default/MessageBinding.cs index 34ad212..2265a88 100644 --- a/Tapeti/Default/MessageBinding.cs +++ b/Tapeti/Default/MessageBinding.cs @@ -13,15 +13,18 @@ namespace Tapeti.Default /// public void Handle(IControllerBindingContext context, Action next) { - if (context.Parameters.Count == 0) - throw new TopologyConfigurationException($"First parameter of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class"); + if (!context.HasMessageClass) + { + if (context.Parameters.Count == 0) + throw new TopologyConfigurationException($"First parameter of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class"); - var parameter = context.Parameters[0]; - if (!parameter.Info.ParameterType.IsClass) - throw new TopologyConfigurationException($"First parameter {parameter.Info.Name} of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class"); + var parameter = context.Parameters[0]; + if (!parameter.Info.ParameterType.IsClass) + throw new TopologyConfigurationException($"First parameter {parameter.Info.Name} of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class"); - parameter.SetBinding(messageContext => messageContext.Message); - context.SetMessageClass(parameter.Info.ParameterType); + parameter.SetBinding(messageContext => messageContext.Message); + context.SetMessageClass(parameter.Info.ParameterType); + } next(); } diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs index 8f0b8b6..584de53 100644 --- a/Tapeti/Default/MessageContext.cs +++ b/Tapeti/Default/MessageContext.cs @@ -1,10 +1,15 @@ -using Tapeti.Config; +using System; +using System.Collections.Generic; +using Tapeti.Config; namespace Tapeti.Default { /// public class MessageContext : IMessageContext { + private readonly Dictionary items = new Dictionary(); + + /// public ITapetiConfig Config { get; set; } @@ -26,9 +31,33 @@ namespace Tapeti.Default /// public IBinding Binding { get; set; } + /// - public virtual void Dispose() + public void Dispose() { + foreach (var item in items.Values) + (item as IDisposable)?.Dispose(); + } + + + /// + public void Store(string key, object value) + { + items.Add(key, value); + } + + + /// + public bool Get(string key, out T value) where T : class + { + if (!items.TryGetValue(key, out var objectValue)) + { + value = default(T); + return false; + } + + value = (T)objectValue; + return true; } } } diff --git a/Tapeti/Default/RabbitMQMessageProperties.cs b/Tapeti/Default/RabbitMQMessageProperties.cs index c560e56..b58836d 100644 --- a/Tapeti/Default/RabbitMQMessageProperties.cs +++ b/Tapeti/Default/RabbitMQMessageProperties.cs @@ -61,16 +61,16 @@ namespace Tapeti.Default /// - public RabbitMQMessageProperties(IBasicProperties BasicProperties) + public RabbitMQMessageProperties(IBasicProperties basicProperties) { - this.BasicProperties = BasicProperties; + BasicProperties = basicProperties; } /// - public RabbitMQMessageProperties(IBasicProperties BasicProperties, IMessageProperties source) + public RabbitMQMessageProperties(IBasicProperties basicProperties, IMessageProperties source) { - this.BasicProperties = BasicProperties; + BasicProperties = basicProperties; if (source == null) return; diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 95868f0..d115af9 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -10,8 +10,8 @@ - - + + diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 9247f11..7a542a1 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -32,6 +32,12 @@ namespace Tapeti public TapetiConfig(IDependencyResolver dependencyResolver) { config = new Config(dependencyResolver); + + Use(new DependencyResolverBinding()); + Use(new PublishResultBinding()); + + // Registered last so it runs first and the MessageClass is known to other middleware + Use(new MessageBinding()); } @@ -41,12 +47,6 @@ namespace Tapeti if (config == null) throw new InvalidOperationException("TapetiConfig.Build must only be called once"); - Use(new DependencyResolverBinding()); - Use(new PublishResultBinding()); - - // Registered last so it runs first and the MessageClass is known to other middleware - Use(new MessageBinding()); - RegisterDefaults(); (config.DependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); @@ -201,7 +201,7 @@ namespace Tapeti if (config == null) throw new InvalidOperationException("TapetiConfig can not be updated after Build"); - return null; + return config; } @@ -308,130 +308,4 @@ namespace Tapeti } } } - - - /* - public delegate Task MessageHandlerFunc(IMessageContext context, object message); - - - protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method) - { - var allowBinding= false; - - MiddlewareHelper.Go(bindingMiddleware, - (handler, next) => handler.Handle(context, next), - () => - { - allowBinding = true; - }); - - if (!allowBinding) - return null; - - if (context.MessageClass == null) - throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} does not resolve to a message class"); - - - var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList(); - - // ReSharper disable once InvertIf - if (invalidBindings.Count > 0) - { - var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name)); - throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}"); - } - - var resultHandler = ((IBindingResultAccess) context.Result).GetHandler(); - - return WrapMethod(method, context.Parameters.Select(p => ((IBindingParameterAccess)p).GetBinding()), resultHandler); - } - - - protected MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable parameters, ResultHandler resultHandler) - { - if (resultHandler != null) - return WrapResultHandlerMethod(method, parameters, resultHandler); - - if (method.ReturnType == typeof(void)) - return WrapNullMethod(method, parameters); - - if (method.ReturnType == typeof(Task)) - return WrapTaskMethod(method, parameters); - - if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>)) - return WrapGenericTaskMethod(method, parameters); - - return WrapObjectMethod(method, parameters); - } - - - protected MessageHandlerFunc WrapResultHandlerMethod(MethodInfo method, IEnumerable parameters, ResultHandler resultHandler) - { - return (context, message) => - { - var result = method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()); - return resultHandler(context, result); - }; - } - - protected MessageHandlerFunc WrapNullMethod(MethodInfo method, IEnumerable parameters) - { - return (context, message) => - { - method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()); - return Task.CompletedTask; - }; - } - - - protected MessageHandlerFunc WrapTaskMethod(MethodInfo method, IEnumerable parameters) - { - return (context, message) => (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()); - } - - - protected MessageHandlerFunc WrapGenericTaskMethod(MethodInfo method, IEnumerable parameters) - { - return (context, message) => - { - return (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()); - }; - } - - - protected MessageHandlerFunc WrapObjectMethod(MethodInfo method, IEnumerable parameters) - { - return (context, message) => - { - return Task.FromResult(method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray())); - }; - } - - - protected void AddStaticRegistration(IBindingQueueInfo binding) - { - if (staticRegistrations.ContainsKey(binding.QueueInfo.Name)) - { - var existing = staticRegistrations[binding.QueueInfo.Name]; - - // TODO allow multiple only if there is a filter which guarantees uniqueness? and/or move to independant validation middleware - //if (existing.Any(h => h.MessageClass == binding.MessageClass)) - // throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}"); - - existing.Add(binding); - } - else - staticRegistrations.Add(binding.QueueInfo.Name, new List { binding }); - } - - - protected string GetDynamicQueueName(string prefix) - { - if (String.IsNullOrEmpty(prefix)) - return ""; - - return prefix + "." + Guid.NewGuid().ToString("N"); - } - } - */ } diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs index dd41f2b..399a154 100644 --- a/Tapeti/TapetiConfigControllers.cs +++ b/Tapeti/TapetiConfigControllers.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Reflection; using Tapeti.Annotations; @@ -44,40 +45,53 @@ namespace Tapeti .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) .Select(m => (MethodInfo)m)) { - // TODO create binding for method - - /* - var context = new BindingContext(method); - var messageHandler = GetMessageHandler(context, method); - if (messageHandler == null) - continue; - */ - var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo; if (methodQueueInfo == null || !methodQueueInfo.IsValid) throw new TopologyConfigurationException( $"Method {method.Name} or controller {controller.Name} requires a queue attribute"); - /* - var handlerInfo = new Binding + + var context = new ControllerBindingContext(method.GetParameters(), method.ReturnParameter) { Controller = controller, - Method = method, - QueueInfo = methodQueueInfo, - QueueBindingMode = context.QueueBindingMode, - MessageClass = context.MessageClass, - MessageHandler = messageHandler, - MessageMiddleware = context.MessageMiddleware, - MessageFilterMiddleware = context.MessageFilterMiddleware + Method = method }; - if (methodQueueInfo.Dynamic.GetValueOrDefault()) - AddDynamicRegistration(handlerInfo); - else - AddStaticRegistration(handlerInfo); - */ - builder.RegisterBinding(new ControllerMethodBinding(controller, method, methodQueueInfo)); + var allowBinding = false; + builderAccess.ApplyBindingMiddleware(context, () => { allowBinding = true; }); + + if (!allowBinding) + continue; + + + if (context.MessageClass == null) + throw new TopologyConfigurationException($"Method {method.Name} in controller {controller.Name} does not resolve to a message class"); + + + var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList(); + if (invalidBindings.Count > 0) + { + var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name)); + throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}"); + } + + + builder.RegisterBinding(new ControllerMethodBinding(builderAccess.DependencyResolver, new ControllerMethodBinding.BindingInfo + { + ControllerType = controller, + Method = method, + QueueInfo = methodQueueInfo, + MessageClass = context.MessageClass, + BindingTargetMode = context.BindingTargetMode, + ParameterFactories = context.GetParameterHandlers(), + ResultHandler = context.GetResultHandler(), + + FilterMiddleware = context.Middleware.Where(m => m is IControllerFilterMiddleware).Cast().ToList(), + MessageMiddleware = context.Middleware.Where(m => m is IControllerMessageMiddleware).Cast().ToList(), + CleanupMiddleware = context.Middleware.Where(m => m is IControllerCleanupMiddleware).Cast().ToList() + })); + } return builder; diff --git a/Test/FlowEndController.cs b/Test/FlowEndController.cs index 5c2f450..ad72d4f 100644 --- a/Test/FlowEndController.cs +++ b/Test/FlowEndController.cs @@ -18,6 +18,8 @@ namespace Test this.flowProvider = flowProvider; } + + [Start] public IYieldPoint StartFlow(PingMessage message) { Console.WriteLine("PingMessage received, calling flowProvider.End() directly"); diff --git a/Test/Program.cs b/Test/Program.cs index 3969269..ccca437 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -1,12 +1,10 @@ using System; -using System.Runtime.CompilerServices; using SimpleInjector; using Tapeti; using Tapeti.DataAnnotations; using Tapeti.Flow; using Tapeti.SimpleInjector; using System.Threading; -using Tapeti.Annotations; using Tapeti.Transient; namespace Test