From 20ac46700648aa90f890e8894fb0a0b988944100 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 15 Feb 2017 22:05:01 +0100 Subject: [PATCH] Fixed #6: Provide a way to start a flow outside of a message handler Fixed Continuation methods binding to dynamic queues --- Tapeti.Annotations/RequestAttribute.cs | 1 + .../Annotations/ContinuationAttribute.cs | 1 + Tapeti.Flow/Annotations/StartAttribute.cs | 9 +++ Tapeti.Flow/Default/FlowBindingMiddleware.cs | 7 ++- Tapeti.Flow/Default/FlowProvider.cs | 7 ++- Tapeti.Flow/Default/FlowStarter.cs | 61 +++++++++++++++++++ Tapeti.Flow/FlowMiddleware.cs | 1 + Tapeti.Flow/IFlowProvider.cs | 13 ++++ Tapeti.Flow/Tapeti.Flow.csproj | 2 + Tapeti/Config/IBindingContext.cs | 16 +++++ Tapeti/Config/IConfig.cs | 1 + Tapeti/Config/IMessageContext.cs | 4 +- Tapeti/Connection/TapetiConsumer.cs | 24 +------- Tapeti/Connection/TapetiWorker.cs | 11 +++- Tapeti/Default/MessageContext.cs | 29 +++++++++ Tapeti/Tapeti.csproj | 1 + Tapeti/TapetiConfig.cs | 27 ++++++-- Test/MarcoController.cs | 20 ++++++ Test/Program.cs | 2 + 19 files changed, 201 insertions(+), 36 deletions(-) create mode 100644 Tapeti.Flow/Annotations/StartAttribute.cs create mode 100644 Tapeti.Flow/Default/FlowStarter.cs create mode 100644 Tapeti/Default/MessageContext.cs diff --git a/Tapeti.Annotations/RequestAttribute.cs b/Tapeti.Annotations/RequestAttribute.cs index 00579ba..4d6a9a5 100644 --- a/Tapeti.Annotations/RequestAttribute.cs +++ b/Tapeti.Annotations/RequestAttribute.cs @@ -2,6 +2,7 @@ namespace Tapeti.Flow.Annotations { + [AttributeUsage(AttributeTargets.Class)] public class RequestAttribute : Attribute { public Type Response { get; set; } diff --git a/Tapeti.Flow/Annotations/ContinuationAttribute.cs b/Tapeti.Flow/Annotations/ContinuationAttribute.cs index 39d0914..8749bf8 100644 --- a/Tapeti.Flow/Annotations/ContinuationAttribute.cs +++ b/Tapeti.Flow/Annotations/ContinuationAttribute.cs @@ -2,6 +2,7 @@ namespace Tapeti.Flow.Annotations { + [AttributeUsage(AttributeTargets.Method)] public class ContinuationAttribute : Attribute { } diff --git a/Tapeti.Flow/Annotations/StartAttribute.cs b/Tapeti.Flow/Annotations/StartAttribute.cs new file mode 100644 index 0000000..32f56f7 --- /dev/null +++ b/Tapeti.Flow/Annotations/StartAttribute.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Flow.Annotations +{ + [AttributeUsage(AttributeTargets.Method)] + public class StartAttribute : Attribute + { + } +} diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 85a64d3..d7e279a 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -7,11 +7,16 @@ using Tapeti.Helpers; namespace Tapeti.Flow.Default { - // TODO figure out a way to prevent binding on Continuation methods (which are always the target of a direct response) internal class FlowBindingMiddleware : IBindingMiddleware { public void Handle(IBindingContext context, Action next) { + if (context.Method.GetCustomAttribute() != null) + return; + + if (context.Method.GetCustomAttribute() != null) + context.QueueBindingMode = QueueBindingMode.DirectToQueue; + RegisterYieldPointResult(context); RegisterContinuationFilter(context); diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index aeab792..8c2cef3 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -119,7 +119,10 @@ namespace Tapeti.Flow.Default var continuationAttribute = binding.Method.GetCustomAttribute(); if (continuationAttribute == null) - throw new ArgumentException($"responseHandler must be marked with the Continuation attribute", nameof(responseHandler)); + throw new ArgumentException("responseHandler must be marked with the Continuation attribute", nameof(responseHandler)); + + if (binding.QueueName == null) + throw new ArgumentException("responseHandler must bind to a valid queue", nameof(responseHandler)); return new ResponseHandlerInfo { @@ -131,7 +134,7 @@ namespace Tapeti.Flow.Default private static ReplyMetadata GetReply(IMessageContext context) { - var requestAttribute = context.Message.GetType().GetCustomAttribute(); + var requestAttribute = context.Message?.GetType().GetCustomAttribute(); if (requestAttribute?.Response == null) return null; diff --git a/Tapeti.Flow/Default/FlowStarter.cs b/Tapeti.Flow/Default/FlowStarter.cs new file mode 100644 index 0000000..a21593f --- /dev/null +++ b/Tapeti.Flow/Default/FlowStarter.cs @@ -0,0 +1,61 @@ +using System; +using System.Linq.Expressions; +using System.Reflection; +using System.Threading.Tasks; +using Tapeti.Config; +using Tapeti.Default; + +namespace Tapeti.Flow.Default +{ + public class FlowStarter : IFlowStarter + { + private readonly IConfig config; + + + public FlowStarter(IConfig config) + { + this.config = config; + } + + + public Task Start(Expression>> methodSelector) where TController : class + { + return CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value)); + } + + + public Task Start(Expression>>> methodSelector) where TController : class + { + return CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value); + } + + + private async Task CallControllerMethod(MethodInfo method, Func> getYieldPointResult) where TController : class + { + var controller = config.DependencyResolver.Resolve(); + var yieldPoint = await getYieldPointResult(method.Invoke(controller, new object[] {})); + + var context = new MessageContext + { + DependencyResolver = config.DependencyResolver, + Controller = controller + }; + + var flowHandler = config.DependencyResolver.Resolve(); + await flowHandler.Execute(context, yieldPoint); + } + + + private static MethodInfo GetExpressionMethod(Expression>> methodSelector) + { + var callExpression = (methodSelector.Body as UnaryExpression)?.Operand as MethodCallExpression; + var targetMethodExpression = callExpression?.Object as ConstantExpression; + + var method = targetMethodExpression?.Value as MethodInfo; + if (method == null) + throw new ArgumentException("Unable to determine the starting method", nameof(methodSelector)); + + return method; + } + } +} diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowMiddleware.cs index 16e4589..2c1dcc6 100644 --- a/Tapeti.Flow/FlowMiddleware.cs +++ b/Tapeti.Flow/FlowMiddleware.cs @@ -9,6 +9,7 @@ namespace Tapeti.Flow public void RegisterDefaults(IDependencyContainer container) { container.RegisterDefault(); + container.RegisterDefault(); container.RegisterDefault(); container.RegisterDefault(); container.RegisterDefault(); diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index 97894b9..f4bc7d1 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -1,4 +1,5 @@ using System; +using System.Linq.Expressions; using System.Threading.Tasks; using Tapeti.Config; @@ -19,6 +20,18 @@ namespace Tapeti.Flow IYieldPoint End(); } + /// + /// Allows starting a flow outside of a message handler. + /// + public interface IFlowStarter + { + Task Start(Expression>> methodSelector) where TController : class; + Task Start(Expression>>> methodSelector) where TController : class; + } + + /// + /// Internal interface. Do not call directly. + /// public interface IFlowHandler { Task Execute(IMessageContext context, IYieldPoint yieldPoint); diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index 26dc0f0..ab64752 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -52,11 +52,13 @@ + + diff --git a/Tapeti/Config/IBindingContext.cs b/Tapeti/Config/IBindingContext.cs index 704ad21..b5cc3b7 100644 --- a/Tapeti/Config/IBindingContext.cs +++ b/Tapeti/Config/IBindingContext.cs @@ -9,6 +9,20 @@ namespace Tapeti.Config public delegate Task ResultHandler(IMessageContext context, object value); + public enum QueueBindingMode + { + /// + /// Allow binding of the routing key from the message's source exchange to the queue + /// + RoutingKey, + + /// + /// Do not bind, rely on the direct-to-queue exchange + /// + DirectToQueue + } + + public interface IBindingContext { Type MessageClass { get; set; } @@ -17,6 +31,8 @@ namespace Tapeti.Config IReadOnlyList Parameters { get; } IBindingResult Result { get; } + QueueBindingMode QueueBindingMode { get; set; } + void Use(IMessageFilterMiddleware filterMiddleware); void Use(IMessageMiddleware middleware); } diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs index 16a8333..1b80950 100644 --- a/Tapeti/Config/IConfig.cs +++ b/Tapeti/Config/IConfig.cs @@ -37,6 +37,7 @@ namespace Tapeti.Config MethodInfo Method { get; } Type MessageClass { get; } string QueueName { get; } + QueueBindingMode QueueBindingMode { get; set; } IReadOnlyList MessageFilterMiddleware { get; } IReadOnlyList MessageMiddleware { get; } diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs index 50c3b30..6bad232 100644 --- a/Tapeti/Config/IMessageContext.cs +++ b/Tapeti/Config/IMessageContext.cs @@ -15,9 +15,9 @@ namespace Tapeti.Config IDictionary Items { get; } - /// + /// /// Controller will be null when passed to a IMessageFilterMiddleware - /// + /// object Controller { get; } IBinding Binding { get; } diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index e3ede98..4d83df9 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Runtime.ExceptionServices; using RabbitMQ.Client; using Tapeti.Config; +using Tapeti.Default; using Tapeti.Helpers; namespace Tapeti.Connection @@ -118,28 +119,5 @@ namespace Tapeti.Connection exception = aggregateException.InnerExceptions[0]; } } - - - protected class MessageContext : IMessageContext - { - public IDependencyResolver DependencyResolver { get; set; } - - public object Controller { get; set; } - public IBinding Binding { get; set; } - - public string Queue { get; set; } - public string RoutingKey { get; set; } - public object Message { get; set; } - public IBasicProperties Properties { get; set; } - - public IDictionary Items { get; } = new Dictionary(); - - - public void Dispose() - { - foreach (var value in Items.Values) - (value as IDisposable)?.Dispose(); - } - } } } diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index 3f651d8..60b53c7 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; @@ -70,10 +71,14 @@ namespace Tapeti.Connection foreach (var binding in queue.Bindings) { - var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); - var exchange = exchangeStrategy.GetExchange(binding.MessageClass); + if (binding.QueueBindingMode == QueueBindingMode.RoutingKey) + { + var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); + var exchange = exchangeStrategy.GetExchange(binding.MessageClass); + + channel.QueueBind(dynamicQueue.QueueName, exchange, routingKey); + } - channel.QueueBind(dynamicQueue.QueueName, exchange, routingKey); (binding as IDynamicQueueBinding)?.SetQueueName(dynamicQueue.QueueName); } } diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs new file mode 100644 index 0000000..5872701 --- /dev/null +++ b/Tapeti/Default/MessageContext.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using RabbitMQ.Client; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class MessageContext : IMessageContext + { + public IDependencyResolver DependencyResolver { get; set; } + + public object Controller { get; set; } + public IBinding Binding { get; set; } + + public string Queue { get; set; } + public string RoutingKey { get; set; } + public object Message { get; set; } + public IBasicProperties Properties { get; set; } + + public IDictionary Items { get; } = new Dictionary(); + + + public void Dispose() + { + foreach (var value in Items.Values) + (value as IDisposable)?.Dispose(); + } + } +} diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index db5c711..12ef812 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -63,6 +63,7 @@ + diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 12f72a3..adb5e56 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -145,18 +145,22 @@ namespace Tapeti .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object)) .Select(m => (MethodInfo)m)) { - var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo; - if (!methodQueueInfo.IsValid) - throw new TopologyConfigurationException($"Method {method.Name} or controller {controller.Name} requires a queue attribute"); - var context = new BindingContext(method); var messageHandler = GetMessageHandler(context, method); + if (messageHandler == null) + continue; + + var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo; + if (!methodQueueInfo.IsValid) + throw new TopologyConfigurationException( + $"Method {method.Name} or controller {controller.Name} requires a queue attribute"); var handlerInfo = new Binding { Controller = controller, Method = method, QueueInfo = methodQueueInfo, + QueueBindingMode = context.QueueBindingMode, MessageClass = context.MessageClass, MessageHandler = messageHandler, MessageMiddleware = context.MessageMiddleware, @@ -190,7 +194,17 @@ namespace Tapeti protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method) { - MiddlewareHelper.Go(bindingMiddleware, (handler, next) => handler.Handle(context, next), () => {}); + var allowBinding= false; + + MiddlewareHelper.Go(bindingMiddleware, + (handler, next) => handler.Handle(context, next), + () => + { + allowBinding = true; + }); + + if (!allowBinding) + return null; if (context.MessageClass == null) throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} does not resolve to a message class"); @@ -382,6 +396,7 @@ namespace Tapeti public MethodInfo Method { get; set; } public Type MessageClass { get; set; } public string QueueName { get; set; } + public QueueBindingMode QueueBindingMode { get; set; } public IReadOnlyList MessageMiddleware { get; set; } public IReadOnlyList MessageFilterMiddleware { get; set; } @@ -443,6 +458,8 @@ namespace Tapeti public IReadOnlyList Parameters { get; } public IBindingResult Result { get; } + public QueueBindingMode QueueBindingMode { get; set; } + public IReadOnlyList MessageMiddleware => messageMiddleware; public IReadOnlyList MessageFilterMiddleware => messageFilterMiddleware; diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index 8660526..c1833f8 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -28,6 +28,26 @@ namespace Test } + [Start] + public async Task StartFlow() + { + Console.WriteLine("Starting stand-alone flow"); + await Task.Delay(1000); + + return flowProvider.YieldWithRequestSync + (new PoloConfirmationRequestMessage(), + HandlePoloConfirmationResponse); + } + + + [Continuation] + public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage msg) + { + Console.WriteLine("Ending stand-alone flow"); + return flowProvider.End(); + } + + /** * The Visualizer could've been injected through the constructor, which is * the recommended way. Just testing the injection middleware here. diff --git a/Test/Program.cs b/Test/Program.cs index 734d792..349ba4e 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -43,6 +43,8 @@ namespace Test Console.WriteLine("Done!"); + container.GetInstance().Start(c => c.StartFlow); + var emitter = container.GetInstance(); emitter.Run().Wait(); }