From 14ee7179853603f99bbb780122c0fc04a9014be6 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 14 Dec 2016 20:28:17 +0100 Subject: [PATCH] Fixed Saga loading and message handler filtering Added Saga publisher extension --- Config/IBindingContext.cs | 2 + Config/IConfig.cs | 2 + Config/IMessageContext.cs | 5 +++ Config/IMessageMiddleware.cs | 3 +- Connection/TapetiConsumer.cs | 29 +++++++++---- Connection/TapetiPublisher.cs | 11 ++++- Connection/TapetiWorker.cs | 11 +++-- Default/BindingBufferStop.cs | 13 ------ Default/DefaultMessageSerializer.cs | 4 +- Helpers/MiddlewareHelper.cs | 33 ++++++++++++++- IPublisher.cs | 7 ++++ Tapeti.Saga/ISagaProvider.cs | 1 - Tapeti.Saga/SagaBindingMiddleware.cs | 28 ------------- Tapeti.Saga/SagaExtensions.cs | 16 ++++++++ Tapeti.Saga/SagaMessageMiddleware.cs | 22 ---------- Tapeti.Saga/SagaMiddleware.cs | 58 +++++++++++++++++++++++++- Tapeti.Saga/SagaProvider.cs | 9 ---- Tapeti.Saga/Tapeti.Saga.csproj | 4 +- Tapeti.csproj | 1 - TapetiConfig.cs | 20 +++++++-- Test/MarcoController.cs | 61 ++++++++++++++-------------- Test/Program.cs | 2 +- 22 files changed, 211 insertions(+), 131 deletions(-) delete mode 100644 Default/BindingBufferStop.cs delete mode 100644 Tapeti.Saga/SagaBindingMiddleware.cs create mode 100644 Tapeti.Saga/SagaExtensions.cs delete mode 100644 Tapeti.Saga/SagaMessageMiddleware.cs diff --git a/Config/IBindingContext.cs b/Config/IBindingContext.cs index df3444e..48818e7 100644 --- a/Config/IBindingContext.cs +++ b/Config/IBindingContext.cs @@ -11,6 +11,8 @@ namespace Tapeti.Config { Type MessageClass { get; set; } IReadOnlyList Parameters { get; } + + void Use(IMessageMiddleware middleware); } diff --git a/Config/IConfig.cs b/Config/IConfig.cs index 8752c97..30fc3ba 100644 --- a/Config/IConfig.cs +++ b/Config/IConfig.cs @@ -29,6 +29,8 @@ namespace Tapeti.Config MethodInfo Method { get; } Type MessageClass { get; } + IReadOnlyList MessageMiddleware { get; } + bool Accept(object message); Task Invoke(IMessageContext context, object message); } diff --git a/Config/IMessageContext.cs b/Config/IMessageContext.cs index 6bf8101..0a57507 100644 --- a/Config/IMessageContext.cs +++ b/Config/IMessageContext.cs @@ -1,11 +1,16 @@ using System.Collections.Generic; +using RabbitMQ.Client; namespace Tapeti.Config { public interface IMessageContext { + IDependencyResolver DependencyResolver { get; } + object Controller { get; } object Message { get; } + IBasicProperties Properties { get; } + IDictionary Items { get; } } } diff --git a/Config/IMessageMiddleware.cs b/Config/IMessageMiddleware.cs index aaec7eb..38ee22b 100644 --- a/Config/IMessageMiddleware.cs +++ b/Config/IMessageMiddleware.cs @@ -1,9 +1,10 @@ using System; +using System.Threading.Tasks; namespace Tapeti.Config { public interface IMessageMiddleware { - void Handle(IMessageContext context, Action next); + Task Handle(IMessageContext context, Func next); } } diff --git a/Connection/TapetiConsumer.cs b/Connection/TapetiConsumer.cs index 1687d24..f5e2bc2 100644 --- a/Connection/TapetiConsumer.cs +++ b/Connection/TapetiConsumer.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using RabbitMQ.Client; using Tapeti.Config; using Tapeti.Helpers; @@ -33,25 +34,31 @@ namespace Tapeti.Connection if (message == null) throw new ArgumentException("Empty message"); - var handled = false; + var validMessageType = false; foreach (var binding in bindings.Where(b => b.Accept(message))) { var context = new MessageContext { + DependencyResolver = dependencyResolver, Controller = dependencyResolver.Resolve(binding.Controller), - Message = message + Message = message, + Properties = properties }; - MiddlewareHelper.Go(messageMiddleware, (handler, next) => handler.Handle(context, next)); + MiddlewareHelper.GoAsync(binding.MessageMiddleware != null ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() : messageMiddleware, + async (handler, next) => await handler.Handle(context, next), + async () => + { + var result = binding.Invoke(context, message).Result; + if (result != null) + await worker.Publish(result, null); + } + ).Wait(); - var result = binding.Invoke(context, message).Result; - if (result != null) - worker.Publish(result); - - handled = true; + validMessageType = true; } - if (!handled) + if (!validMessageType) throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); worker.Respond(deliveryTag, ConsumeResponse.Ack); @@ -66,8 +73,12 @@ namespace Tapeti.Connection protected class MessageContext : IMessageContext { + public IDependencyResolver DependencyResolver { get; set; } + public object Controller { get; set; } public object Message { get; set; } + public IBasicProperties Properties { get; set; } + public IDictionary Items { get; } = new Dictionary(); } } diff --git a/Connection/TapetiPublisher.cs b/Connection/TapetiPublisher.cs index 4143cf5..3b88c03 100644 --- a/Connection/TapetiPublisher.cs +++ b/Connection/TapetiPublisher.cs @@ -1,9 +1,10 @@ using System; using System.Threading.Tasks; +using RabbitMQ.Client; namespace Tapeti.Connection { - public class TapetiPublisher : IPublisher + public class TapetiPublisher : IAdvancedPublisher { private readonly Func workerFactory; @@ -16,7 +17,13 @@ namespace Tapeti.Connection public Task Publish(object message) { - return workerFactory().Publish(message); + return workerFactory().Publish(message, null); + } + + + public Task Publish(object message, IBasicProperties properties) + { + return workerFactory().Publish(message, properties); } } } diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index f5ee254..ba000ae 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -32,16 +32,19 @@ namespace Tapeti.Connection } - public Task Publish(object message) + public Task Publish(object message, IBasicProperties properties) { return taskQueue.Value.Add(async () => { - var properties = new BasicProperties(); - var body = messageSerializer.Serialize(message, properties); + var messageProperties = properties ?? new BasicProperties(); + if (messageProperties.Timestamp.UnixTime == 0) + messageProperties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()); + + var body = messageSerializer.Serialize(message, messageProperties); (await GetChannel()) .BasicPublish(Exchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false, - properties, body); + messageProperties, body); }).Unwrap(); } diff --git a/Default/BindingBufferStop.cs b/Default/BindingBufferStop.cs deleted file mode 100644 index 8d55a7c..0000000 --- a/Default/BindingBufferStop.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using Tapeti.Config; - -namespace Tapeti.Default -{ - // End of the line... - public class BindingBufferStop : IBindingMiddleware - { - public void Handle(IBindingContext context, Action next) - { - } - } -} diff --git a/Default/DefaultMessageSerializer.cs b/Default/DefaultMessageSerializer.cs index 37fde89..e11b60e 100644 --- a/Default/DefaultMessageSerializer.cs +++ b/Default/DefaultMessageSerializer.cs @@ -47,8 +47,8 @@ namespace Tapeti.Default { object typeName; - if (!properties.ContentType.Equals(ContentType)) - throw new ArgumentException("content_type must be {ContentType}"); + if (properties.ContentType == null || !properties.ContentType.Equals(ContentType)) + throw new ArgumentException($"content_type must be {ContentType}"); if (properties.Headers == null || !properties.Headers.TryGetValue(ClassTypeHeader, out typeName)) throw new ArgumentException($"{ClassTypeHeader} header not present"); diff --git a/Helpers/MiddlewareHelper.cs b/Helpers/MiddlewareHelper.cs index 4e813b1..a3b791c 100644 --- a/Helpers/MiddlewareHelper.cs +++ b/Helpers/MiddlewareHelper.cs @@ -1,15 +1,20 @@ using System; using System.Collections.Generic; +using System.Diagnostics.Eventing.Reader; +using System.Threading.Tasks; namespace Tapeti.Helpers { public static class MiddlewareHelper { - public static void Go(IReadOnlyList middleware, Action handle) + public static void Go(IReadOnlyList middleware, Action handle, Action lastHandler) { var handlerIndex = middleware.Count - 1; if (handlerIndex == -1) + { + lastHandler(); return; + } Action handleNext = null; @@ -18,9 +23,35 @@ namespace Tapeti.Helpers handlerIndex--; if (handlerIndex >= 0) handle(middleware[handlerIndex], handleNext); + else + lastHandler(); }; handle(middleware[handlerIndex], handleNext); } + + + public static async Task GoAsync(IReadOnlyList middleware, Func, Task> handle, Func lastHandler) + { + var handlerIndex = middleware.Count - 1; + if (handlerIndex == -1) + { + await lastHandler(); + return; + } + + Func handleNext = null; + + handleNext = async () => + { + handlerIndex--; + if (handlerIndex >= 0) + await handle(middleware[handlerIndex], handleNext); + else + await lastHandler(); + }; + + await handle(middleware[handlerIndex], handleNext); + } } } diff --git a/IPublisher.cs b/IPublisher.cs index f54128f..0c6721b 100644 --- a/IPublisher.cs +++ b/IPublisher.cs @@ -1,4 +1,5 @@ using System.Threading.Tasks; +using RabbitMQ.Client; namespace Tapeti { @@ -6,4 +7,10 @@ namespace Tapeti { Task Publish(object message); } + + + public interface IAdvancedPublisher : IPublisher + { + Task Publish(object message, IBasicProperties properties); + } } diff --git a/Tapeti.Saga/ISagaProvider.cs b/Tapeti.Saga/ISagaProvider.cs index 862aad2..23a33ef 100644 --- a/Tapeti.Saga/ISagaProvider.cs +++ b/Tapeti.Saga/ISagaProvider.cs @@ -6,6 +6,5 @@ namespace Tapeti.Saga { Task> Begin(T initialState) where T : class; Task> Continue(string sagaId) where T : class; - Task Continue(string sagaId); } } diff --git a/Tapeti.Saga/SagaBindingMiddleware.cs b/Tapeti.Saga/SagaBindingMiddleware.cs deleted file mode 100644 index f6a056f..0000000 --- a/Tapeti.Saga/SagaBindingMiddleware.cs +++ /dev/null @@ -1,28 +0,0 @@ -using System; -using System.Linq; -using Tapeti.Config; - -namespace Tapeti.Saga -{ - public class SagaBindingMiddleware : IBindingMiddleware - { - public void Handle(IBindingContext context, Action next) - { - foreach (var parameter in context.Parameters.Where(p => - p.Info.ParameterType.IsGenericType && - p.Info.ParameterType.GetGenericTypeDefinition() == typeof(ISaga<>))) - { - parameter.SetBinding(messageContext => - { - object saga; - if (!messageContext.Items.TryGetValue("Saga", out saga)) - return null; - - return saga.GetType() == typeof(ISaga<>) ? saga : null; - }); - } - - next(); - } - } -} diff --git a/Tapeti.Saga/SagaExtensions.cs b/Tapeti.Saga/SagaExtensions.cs new file mode 100644 index 0000000..08de292 --- /dev/null +++ b/Tapeti.Saga/SagaExtensions.cs @@ -0,0 +1,16 @@ +using System.Threading.Tasks; +using RabbitMQ.Client.Framing; + +namespace Tapeti.Saga +{ + public static class SagaExtensions + { + public static Task Publish(this IPublisher publisher, object message, ISaga saga) where T : class + { + return ((IAdvancedPublisher)publisher).Publish(message, new BasicProperties + { + CorrelationId = saga.Id + }); + } + } +} diff --git a/Tapeti.Saga/SagaMessageMiddleware.cs b/Tapeti.Saga/SagaMessageMiddleware.cs deleted file mode 100644 index bef101a..0000000 --- a/Tapeti.Saga/SagaMessageMiddleware.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; -using Tapeti.Config; - -namespace Tapeti.Saga -{ - public class SagaMessageMiddleware : IMessageMiddleware - { - private readonly IDependencyResolver dependencyResolver; - - - public SagaMessageMiddleware(IDependencyResolver dependencyResolver) - { - this.dependencyResolver = dependencyResolver; - } - - public void Handle(IMessageContext context, Action next) - { - context.Items["Saga"] = dependencyResolver.Resolve().Continue(""); - next(); - } - } -} diff --git a/Tapeti.Saga/SagaMiddleware.cs b/Tapeti.Saga/SagaMiddleware.cs index 405a29f..3460e38 100644 --- a/Tapeti.Saga/SagaMiddleware.cs +++ b/Tapeti.Saga/SagaMiddleware.cs @@ -1,16 +1,70 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; using Tapeti.Config; namespace Tapeti.Saga { public class SagaMiddleware : IMiddlewareBundle { + private const string SagaContextKey = "Saga"; + + public IEnumerable GetContents(IDependencyResolver dependencyResolver) { (dependencyResolver as IDependencyInjector)?.RegisterDefault(); yield return new SagaBindingMiddleware(); - yield return new SagaMessageMiddleware(dependencyResolver); + } + + + protected class SagaBindingMiddleware : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + var registered = false; + + foreach (var parameter in context.Parameters.Where(p => + p.Info.ParameterType.IsGenericType && + p.Info.ParameterType.GetGenericTypeDefinition() == typeof(ISaga<>))) + { + if (!registered) + { + var sagaType = parameter.Info.ParameterType.GetGenericArguments()[0]; + var middlewareType = typeof(SagaMessageMiddleware<>).MakeGenericType(sagaType); + + context.Use(Activator.CreateInstance(middlewareType) as IMessageMiddleware); + + registered = true; + } + + parameter.SetBinding(messageContext => + { + object saga; + return messageContext.Items.TryGetValue(SagaContextKey, out saga) ? saga : null; + }); + } + + next(); + } + } + + + protected class SagaMessageMiddleware : IMessageMiddleware where T : class + { + public async Task Handle(IMessageContext context, Func next) + { + if (string.IsNullOrEmpty(context.Properties.CorrelationId)) + return; + + var saga = await context.DependencyResolver.Resolve().Continue(context.Properties.CorrelationId); + if (saga == null) + return; + + context.Items[SagaContextKey] = saga; + await next(); + } } } } diff --git a/Tapeti.Saga/SagaProvider.cs b/Tapeti.Saga/SagaProvider.cs index b21ece8..c5d2cf9 100644 --- a/Tapeti.Saga/SagaProvider.cs +++ b/Tapeti.Saga/SagaProvider.cs @@ -29,15 +29,6 @@ namespace Tapeti.Saga return await Saga.Create(async () => await store.Read(sagaId) as T, sagaId); } - public async Task Continue(string sagaId) - { - return new Saga - { - Id = sagaId, - State = await store.Read(sagaId) - }; - } - protected class Saga : ISaga where T : class { diff --git a/Tapeti.Saga/Tapeti.Saga.csproj b/Tapeti.Saga/Tapeti.Saga.csproj index 3de0a04..8f8f98d 100644 --- a/Tapeti.Saga/Tapeti.Saga.csproj +++ b/Tapeti.Saga/Tapeti.Saga.csproj @@ -31,6 +31,7 @@ 4 + @@ -45,11 +46,10 @@ - - + diff --git a/Tapeti.csproj b/Tapeti.csproj index e417e0f..ef12ebc 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -64,7 +64,6 @@ - diff --git a/TapetiConfig.cs b/TapetiConfig.cs index 74a6518..a8b7a79 100644 --- a/TapetiConfig.cs +++ b/TapetiConfig.cs @@ -36,7 +36,6 @@ namespace Tapeti this.exchange = exchange; this.dependencyResolver = dependencyResolver; - Use(new BindingBufferStop()); Use(new DependencyResolverBinding(dependencyResolver)); Use(new MessageBinding()); } @@ -129,7 +128,8 @@ namespace Tapeti Method = method, QueueInfo = methodQueueInfo, MessageClass = context.MessageClass, - MessageHandler = messageHandler + MessageHandler = messageHandler, + MessageMiddleware = context.MessageMiddleware }; if (methodQueueInfo.Dynamic.GetValueOrDefault()) @@ -159,7 +159,7 @@ namespace Tapeti protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method) { - MiddlewareHelper.Go(bindingMiddleware, (handler, next) => handler.Handle(context, next)); + MiddlewareHelper.Go(bindingMiddleware, (handler, next) => handler.Handle(context, next), () => {}); if (context.MessageClass == null) throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} does not resolve to a message class"); @@ -336,6 +336,8 @@ namespace Tapeti public MethodInfo Method { get; set; } public Type MessageClass { get; set; } + public IReadOnlyList MessageMiddleware { get; set; } + public QueueInfo QueueInfo { get; set; } public MessageHandlerFunc MessageHandler { get; set; } @@ -361,14 +363,26 @@ namespace Tapeti internal class BindingContext : IBindingContext { + private List messageMiddleware; + public Type MessageClass { get; set; } public IReadOnlyList Parameters { get; } + public IReadOnlyList MessageMiddleware => messageMiddleware; public BindingContext(IReadOnlyList parameters) { Parameters = parameters; } + + + public void Use(IMessageMiddleware middleware) + { + if (messageMiddleware == null) + messageMiddleware = new List(); + + messageMiddleware.Add(middleware); + } } diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index 0f520f9..37e6262 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -1,7 +1,7 @@ -using System; -using Microsoft.SqlServer.Server; +using System.Threading.Tasks; using Tapeti; using Tapeti.Annotations; +using Tapeti.Saga; namespace Test { @@ -9,33 +9,46 @@ namespace Test public class MarcoController : MessageController { private readonly IPublisher publisher; + private readonly ISagaProvider sagaProvider; - public MarcoController(IPublisher publisher/*, ISagaProvider sagaProvider*/) + public MarcoController(IPublisher publisher, ISagaProvider sagaProvider) { this.publisher = publisher; + this.sagaProvider = sagaProvider; } - //[StaticQueue("test")] - public PoloMessage Marco(MarcoMessage message, Visualizer visualizer) + /* + * For simple request response patterns, the return type can also be used: + + public async Task Marco(MarcoMessage message, Visualizer visualizer) + { + visualizer.VisualizeMarco(); + return new PoloMessage(); ; + } + */ + + // Visualizer can also be constructor injected, just proving a point here... + public async Task Marco(MarcoMessage message, Visualizer visualizer) { visualizer.VisualizeMarco(); - /* - using (sagaProvider.Begin(new MarcoState + using (var saga = await sagaProvider.Begin(new MarcoPoloSaga())) { - ... - })) - { - //publisher.Publish(new PoloColorRequest(), saga, PoloColorResponse1); - //publisher.Publish(new PoloColorRequest(), saga, callID = "tweede"); - - // Saga refcount = 2 + // TODO provide publish extension with Saga support + await publisher.Publish(new PoloMessage(), saga); } - */ + } - return new PoloMessage(); ; + + public void Polo(PoloMessage message, Visualizer visualizer, ISaga saga) + { + if (saga.State.ReceivedPolo) + return; + + saga.State.ReceivedPolo = true; + visualizer.VisualizePolo(); } @@ -61,11 +74,6 @@ namespace Test } } */ - - public void Polo(PoloMessage message, Visualizer visualizer) - { - visualizer.VisualizePolo(); - } } @@ -79,15 +87,8 @@ namespace Test } - - - public class PoloColorRequest + public class MarcoPoloSaga { - - } - - public class PoloColorResponse - { - + public bool ReceivedPolo; } } diff --git a/Test/Program.cs b/Test/Program.cs index 246aef6..01a4c15 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -13,7 +13,7 @@ namespace Test var container = new Container(); container.Register(); container.Register(); - container.Register(); + container.RegisterSingleton(); var config = new TapetiConfig("test", new SimpleInjectorDependencyResolver(container)) .Use(new SagaMiddleware())