From 4dd8f73f99ef704091424b9d6e61d862e0ae3ff0 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Tue, 7 Feb 2017 16:13:33 +0100 Subject: [PATCH] Added request/response validation for non-yieldpoint methods Added ExceptionStrategy Removed PublishExchange, fixed default ExchangeStrategy --- Config/IConfig.cs | 1 - Connection/TapetiConsumer.cs | 69 +++++++++++------- Connection/TapetiWorker.cs | 9 +-- TapetiTypes.cs => ConsumeResponse.cs | 0 Default/NamespaceMatchExchangeStrategy.cs | 16 ++--- Default/RequeueExceptionStrategy.cs | 14 ++++ IExceptionStrategy.cs | 16 +++++ Tapeti.Flow/Default/FlowBindingMiddleware.cs | 73 ++++++++++++++++++++ Tapeti.Flow/FlowMiddleware.cs | 57 +-------------- Tapeti.Flow/ResponseExpectedException.cs | 9 +++ Tapeti.Flow/Tapeti.Flow.csproj | 2 + Tapeti.csproj | 4 +- TapetiConfig.cs | 31 ++++----- TapetiConnection.cs | 3 +- Test/MarcoController.cs | 2 - Test/Program.cs | 5 +- 16 files changed, 190 insertions(+), 121 deletions(-) rename TapetiTypes.cs => ConsumeResponse.cs (100%) create mode 100644 Default/RequeueExceptionStrategy.cs create mode 100644 IExceptionStrategy.cs create mode 100644 Tapeti.Flow/Default/FlowBindingMiddleware.cs create mode 100644 Tapeti.Flow/ResponseExpectedException.cs diff --git a/Config/IConfig.cs b/Config/IConfig.cs index c04fedf..ef68921 100644 --- a/Config/IConfig.cs +++ b/Config/IConfig.cs @@ -7,7 +7,6 @@ namespace Tapeti.Config { public interface IConfig { - string SubscribeExchange { get; } IDependencyResolver DependencyResolver { get; } IReadOnlyList MessageMiddleware { get; } IEnumerable Queues { get; } diff --git a/Connection/TapetiConsumer.cs b/Connection/TapetiConsumer.cs index b8b7065..5375034 100644 --- a/Connection/TapetiConsumer.cs +++ b/Connection/TapetiConsumer.cs @@ -14,6 +14,7 @@ namespace Tapeti.Connection private readonly IDependencyResolver dependencyResolver; private readonly IReadOnlyList messageMiddleware; private readonly List bindings; + private readonly IExceptionStrategy exceptionStrategy; public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware) @@ -23,6 +24,8 @@ namespace Tapeti.Connection this.dependencyResolver = dependencyResolver; this.messageMiddleware = messageMiddleware; this.bindings = bindings.ToList(); + + exceptionStrategy = dependencyResolver.Resolve(); } @@ -46,47 +49,61 @@ namespace Tapeti.Connection Properties = properties }) { - foreach (var binding in bindings) + try { - if (!binding.Accept(context, message).Result) - continue; + foreach (var binding in bindings) + { + if (!binding.Accept(context, message).Result) + continue; - context.Controller = dependencyResolver.Resolve(binding.Controller); - context.Binding = binding; + context.Controller = dependencyResolver.Resolve(binding.Controller); + context.Binding = binding; - // ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas - MiddlewareHelper.GoAsync( - binding.MessageMiddleware != null - ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() - : messageMiddleware, - async (handler, next) => await handler.Handle(context, next), - () => binding.Invoke(context, message) - ).Wait(); - // ReSharper restore AccessToDisposedClosure + // ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas + MiddlewareHelper.GoAsync( + binding.MessageMiddleware != null + ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() + : messageMiddleware, + async (handler, next) => await handler.Handle(context, next), + () => binding.Invoke(context, message) + ).Wait(); + // ReSharper restore AccessToDisposedClosure - validMessageType = true; + validMessageType = true; + } + + if (!validMessageType) + throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); + } + catch (Exception e) + { + worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, UnwrapException(e))); } } - if (!validMessageType) - throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); - worker.Respond(deliveryTag, ConsumeResponse.Ack); } catch (Exception e) { - // TODO allow different exception handling depending on exception type - worker.Respond(deliveryTag, ConsumeResponse.Requeue); - - var aggregateException = e as AggregateException; - if (aggregateException != null && aggregateException.InnerExceptions.Count == 1) - throw aggregateException.InnerExceptions[0]; - - throw; + worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, UnwrapException(e))); } } + private static Exception UnwrapException(Exception exception) + { + // In async/await style code this is handled similarly. For synchronous + // code using Tasks we have to unwrap these ourselves to get the proper + // exception directly instead of "Errors occured". We might lose + // some stack traces in the process though. + var aggregateException = exception as AggregateException; + if (aggregateException != null && aggregateException.InnerExceptions.Count == 1) + throw aggregateException.InnerExceptions[0]; + + return UnwrapException(exception); + } + + protected class MessageContext : IMessageContext { public IDependencyResolver DependencyResolver { get; set; } diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index b3778b9..9c6bf04 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -12,12 +12,12 @@ namespace Tapeti.Connection public class TapetiWorker { public TapetiConnectionParams ConnectionParams { get; set; } - public string SubscribeExchange { get; set; } private readonly IDependencyResolver dependencyResolver; private readonly IReadOnlyList messageMiddleware; private readonly IMessageSerializer messageSerializer; private readonly IRoutingKeyStrategy routingKeyStrategy; + private readonly IExchangeStrategy exchangeStrategy; private readonly Lazy taskQueue = new Lazy(); private RabbitMQ.Client.IConnection connection; private IModel channelInstance; @@ -27,15 +27,16 @@ namespace Tapeti.Connection { this.dependencyResolver = dependencyResolver; this.messageMiddleware = messageMiddleware; + messageSerializer = dependencyResolver.Resolve(); routingKeyStrategy = dependencyResolver.Resolve(); + exchangeStrategy = dependencyResolver.Resolve(); } public Task Publish(object message, IBasicProperties properties) { - // TODO use exchange strategy! - return Publish(message, properties, SubscribeExchange, routingKeyStrategy.GetRoutingKey(message.GetType())); + return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType())); } @@ -67,7 +68,7 @@ namespace Tapeti.Connection foreach (var binding in queue.Bindings) { var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); - channel.QueueBind(dynamicQueue.QueueName, SubscribeExchange, routingKey); + channel.QueueBind(dynamicQueue.QueueName, exchangeStrategy.GetExchange(binding.MessageClass), routingKey); (binding as IDynamicQueueBinding)?.SetQueueName(dynamicQueue.QueueName); } diff --git a/TapetiTypes.cs b/ConsumeResponse.cs similarity index 100% rename from TapetiTypes.cs rename to ConsumeResponse.cs diff --git a/Default/NamespaceMatchExchangeStrategy.cs b/Default/NamespaceMatchExchangeStrategy.cs index 27498dd..01ee0a6 100644 --- a/Default/NamespaceMatchExchangeStrategy.cs +++ b/Default/NamespaceMatchExchangeStrategy.cs @@ -5,15 +5,9 @@ namespace Tapeti.Default { public class NamespaceMatchExchangeStrategy : IExchangeStrategy { - public const string DefaultFormat = "^Messaging\\.(.[^\\.]+)"; - - private readonly Regex namespaceRegEx; - - - public NamespaceMatchExchangeStrategy() - { - namespaceRegEx = new Regex(DefaultFormat, RegexOptions.Compiled | RegexOptions.Singleline); - } + // If the namespace starts with "Messaging.Service[.Optional.Further.Parts]", the exchange will be "Service". + // If no Messaging prefix is present, the first part of the namespace will be used instead. + private static readonly Regex NamespaceRegex = new Regex("^(Messaging\\.)?(?[^\\.]+)", RegexOptions.Compiled | RegexOptions.Singleline); public string GetExchange(Type messageType) @@ -21,11 +15,11 @@ namespace Tapeti.Default if (messageType.Namespace == null) throw new ArgumentException($"{messageType.FullName} does not have a namespace"); - var match = namespaceRegEx.Match(messageType.Namespace); + var match = NamespaceRegex.Match(messageType.Namespace); if (!match.Success) throw new ArgumentException($"Namespace for {messageType.FullName} does not match the specified format"); - return match.Groups[1].Value.ToLower(); + return match.Groups["exchange"].Value.ToLower(); } } } diff --git a/Default/RequeueExceptionStrategy.cs b/Default/RequeueExceptionStrategy.cs new file mode 100644 index 0000000..6a20ca7 --- /dev/null +++ b/Default/RequeueExceptionStrategy.cs @@ -0,0 +1,14 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class RequeueExceptionStrategy : IExceptionStrategy + { + public ConsumeResponse HandleException(IMessageContext context, Exception exception) + { + // TODO log exception + return ConsumeResponse.Requeue; + } + } +} diff --git a/IExceptionStrategy.cs b/IExceptionStrategy.cs new file mode 100644 index 0000000..7b46af6 --- /dev/null +++ b/IExceptionStrategy.cs @@ -0,0 +1,16 @@ +using System; +using Tapeti.Config; + +namespace Tapeti +{ + public interface IExceptionStrategy + { + /// + /// Called when an exception occurs while handling a message. + /// + /// The message context if available. May be null! + /// The exception instance + /// The ConsumeResponse to determine whether to requeue, dead-letter (nack) or simply ack the message. + ConsumeResponse HandleException(IMessageContext context, Exception exception); + } +} diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs new file mode 100644 index 0000000..6b91e95 --- /dev/null +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -0,0 +1,73 @@ +using System; +using System.Reflection; +using System.Threading.Tasks; +using Tapeti.Config; +using Tapeti.Flow.Annotations; +using Tapeti.Helpers; + +namespace Tapeti.Flow.Default +{ + // TODO figure out a way to prevent binding on Continuation methods (which are always the target of a direct response) + internal class FlowBindingMiddleware : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + RegisterContinuationFilter(context); + RegisterYieldPointResult(context); + + next(); + + ValidateRequestResponse(context); + } + + + private static void RegisterContinuationFilter(IBindingContext context) + { + var continuationAttribute = context.Method.GetCustomAttribute(); + if (continuationAttribute == null) + return; + + context.Use(new FlowBindingFilter()); + context.Use(new FlowMessageMiddleware()); + } + + + private static void RegisterYieldPointResult(IBindingContext context) + { + bool isTask; + if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out isTask)) + return; + + if (isTask) + { + context.Result.SetHandler(async (messageContext, value) => + { + var yieldPoint = await (Task)value; + if (yieldPoint != null) + await HandleYieldPoint(messageContext, yieldPoint); + }); + } + else + context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value)); + } + + + private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint) + { + var flowHandler = context.DependencyResolver.Resolve(); + return flowHandler.Execute(context, yieldPoint); + } + + + private static void ValidateRequestResponse(IBindingContext context) + { + var request = context.MessageClass?.GetCustomAttribute(); + if (request?.Response == null) + return; + + bool isTask; + if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out isTask)) + throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}"); + } + } +} diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowMiddleware.cs index ecac5a8..6e696d8 100644 --- a/Tapeti.Flow/FlowMiddleware.cs +++ b/Tapeti.Flow/FlowMiddleware.cs @@ -1,11 +1,6 @@ -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Threading.Tasks; +using System.Collections.Generic; using Tapeti.Config; -using Tapeti.Flow.Annotations; using Tapeti.Flow.Default; -using Tapeti.Helpers; namespace Tapeti.Flow { @@ -26,55 +21,5 @@ namespace Tapeti.Flow return new[] { new FlowBindingMiddleware() }; } - - - internal class FlowBindingMiddleware : IBindingMiddleware - { - public void Handle(IBindingContext context, Action next) - { - RegisterContinuationFilter(context); - RegisterYieldPointResult(context); - - next(); - } - - - private static void RegisterContinuationFilter(IBindingContext context) - { - var continuationAttribute = context.Method.GetCustomAttribute(); - if (continuationAttribute == null) - return; - - context.Use(new FlowBindingFilter()); - context.Use(new FlowMessageMiddleware()); - } - - - private static void RegisterYieldPointResult(IBindingContext context) - { - bool isTask; - if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out isTask)) - return; - - if (isTask) - { - context.Result.SetHandler(async (messageContext, value) => - { - var yieldPoint = await (Task)value; - if (yieldPoint != null) - await HandleYieldPoint(messageContext, yieldPoint); - }); - } - else - context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value)); - } - - - private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint) - { - var flowHandler = context.DependencyResolver.Resolve(); - return flowHandler.Execute(context, yieldPoint); - } - } } } diff --git a/Tapeti.Flow/ResponseExpectedException.cs b/Tapeti.Flow/ResponseExpectedException.cs new file mode 100644 index 0000000..c636be3 --- /dev/null +++ b/Tapeti.Flow/ResponseExpectedException.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Flow +{ + public class ResponseExpectedException : Exception + { + public ResponseExpectedException(string message) : base(message) { } + } +} diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index 5e4aa8c..4586611 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -53,6 +53,7 @@ + @@ -67,6 +68,7 @@ + diff --git a/Tapeti.csproj b/Tapeti.csproj index 9b8cf0f..d8794ea 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -63,11 +63,13 @@ + + @@ -78,7 +80,7 @@ - + diff --git a/TapetiConfig.cs b/TapetiConfig.cs index 23d51b2..5dcbdc7 100644 --- a/TapetiConfig.cs +++ b/TapetiConfig.cs @@ -27,13 +27,11 @@ namespace Tapeti private readonly List bindingMiddleware = new List(); private readonly List messageMiddleware = new List(); - private readonly string subscribeExchange; private readonly IDependencyResolver dependencyResolver; - public TapetiConfig(string subscribeExchange, IDependencyResolver dependencyResolver) + public TapetiConfig(IDependencyResolver dependencyResolver) { - this.subscribeExchange = subscribeExchange; this.dependencyResolver = dependencyResolver; Use(new DependencyResolverBinding()); @@ -63,7 +61,7 @@ namespace Tapeti queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl))); - var config = new Config(subscribeExchange, dependencyResolver, messageMiddleware, queues); + var config = new Config(dependencyResolver, messageMiddleware, queues); (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); return config; @@ -104,17 +102,18 @@ namespace Tapeti public void RegisterDefaults() { var container = dependencyResolver as IDependencyContainer; - if (container != null) - { - if (ConsoleHelper.IsAvailable()) - container.RegisterDefault(); - else - container.RegisterDefault(); + if (container == null) + return; - container.RegisterDefault(); - container.RegisterDefault(); - container.RegisterDefault(); - } + if (ConsoleHelper.IsAvailable()) + container.RegisterDefault(); + else + container.RegisterDefault(); + + container.RegisterDefault(); + container.RegisterDefault(); + container.RegisterDefault(); + container.RegisterDefault(); } @@ -310,7 +309,6 @@ namespace Tapeti protected class Config : IConfig { - public string SubscribeExchange { get; } public IDependencyResolver DependencyResolver { get; } public IReadOnlyList MessageMiddleware { get; } public IEnumerable Queues { get; } @@ -318,9 +316,8 @@ namespace Tapeti private readonly Dictionary bindingMethodLookup; - public Config(string subscribeExchange, IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IEnumerable queues) + public Config(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IEnumerable queues) { - SubscribeExchange = subscribeExchange; DependencyResolver = dependencyResolver; MessageMiddleware = messageMiddleware; Queues = queues.ToList(); diff --git a/TapetiConnection.cs b/TapetiConnection.cs index 63d346a..60355ae 100644 --- a/TapetiConnection.cs +++ b/TapetiConnection.cs @@ -20,8 +20,7 @@ namespace Tapeti worker = new Lazy(() => new TapetiWorker(config.DependencyResolver, config.MessageMiddleware) { - ConnectionParams = Params ?? new TapetiConnectionParams(), - SubscribeExchange = config.SubscribeExchange + ConnectionParams = Params ?? new TapetiConnectionParams() }); } diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index d1bf818..e9744c0 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -62,8 +62,6 @@ namespace Test * This will automatically include the correlationId in the response and * use the replyTo header of the request if provided. */ - - // TODO validation middleware to ensure a request message returns the specified response (already done for IYieldPoint methods) public PoloConfirmationResponseMessage PoloConfirmation(PoloConfirmationRequestMessage message) { Console.WriteLine(">> PoloConfirmation (returning confirmation)"); diff --git a/Test/Program.cs b/Test/Program.cs index 55f872e..1e575a2 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -10,11 +10,14 @@ namespace Test { private static void Main() { + // TODO SQL based flow store + // TODO logging + var container = new Container(); container.Register(); container.Register(); - var config = new TapetiConfig("test", new SimpleInjectorDependencyResolver(container)) + var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) .WithFlow() .RegisterAllControllers() .Build();