From 8f5160e860a43b843edab35b2843f9a6a2ee4e5e Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Sun, 11 Dec 2016 15:08:58 +0100 Subject: [PATCH] Back to a working state --- Config/IBindingContext.cs | 24 ++ Config/IBindingMiddleware.cs | 9 + Config/IConfig.cs | 35 ++ Config/IMessageContext.cs | 11 + Config/IMessageMiddleware.cs | 9 + Config/IMiddlewareBundle.cs | 9 + Connection/TapetiConsumer.cs | 53 ++- Connection/TapetiSubscriber.cs | 5 +- Connection/TapetiWorker.cs | 65 ++- Default/BindingBufferStop.cs | 13 + Default/DefaultControllerFactory.cs | 62 --- Default/DefaultDependencyResolver.cs | 72 ---- Default/DependencyResolverBinding.cs | 26 ++ Default/MessageBinding.cs | 23 + Helpers/ConsoleHelper.cs | 22 + Helpers/MiddlewareHelper.cs | 26 ++ IControllerFactory.cs | 9 - IDependencyResolver.cs | 4 +- IQueueRegistration.cs | 13 - ISubscriber.cs | 4 +- ITopology.cs | 20 - Properties/AssemblyInfo.cs | 1 - .../AbstractControllerRegistration.cs | 142 ------- .../ControllerDynamicQueueRegistration.cs | 33 -- Registration/ControllerQueueRegistration.cs | 21 - Tapeti.Saga/SagaBindingMiddleware.cs | 28 ++ Tapeti.Saga/SagaMemoryStore.cs | 43 ++ Tapeti.Saga/SagaMessageMiddleware.cs | 22 + Tapeti.Saga/SagaMiddleware.cs | 16 + Tapeti.Saga/SagaProvider.cs | 90 ++++ .../Properties/AssemblyInfo.cs | 1 - .../SimpleInjectorControllerFactory.cs | 22 - .../SimpleInjectorDependencyResolver.cs | 59 +-- .../Tapeti.SimpleInjector.csproj | 1 - Tapeti.csproj | 23 +- TapetiConfig.cs | 400 ++++++++++++++++++ TapetiConnection.cs | 55 +-- TapetiConnectionBuilder.cs | 30 -- TapetiTopologyBuilder.cs | 146 ------- Test/Properties/AssemblyInfo.cs | 1 - Test/Visualizer.cs | 17 + 41 files changed, 956 insertions(+), 709 deletions(-) create mode 100644 Config/IBindingContext.cs create mode 100644 Config/IBindingMiddleware.cs create mode 100644 Config/IConfig.cs create mode 100644 Config/IMessageContext.cs create mode 100644 Config/IMessageMiddleware.cs create mode 100644 Config/IMiddlewareBundle.cs create mode 100644 Default/BindingBufferStop.cs delete mode 100644 Default/DefaultControllerFactory.cs delete mode 100644 Default/DefaultDependencyResolver.cs create mode 100644 Default/DependencyResolverBinding.cs create mode 100644 Default/MessageBinding.cs create mode 100644 Helpers/ConsoleHelper.cs create mode 100644 Helpers/MiddlewareHelper.cs delete mode 100644 IControllerFactory.cs delete mode 100644 IQueueRegistration.cs delete mode 100644 ITopology.cs delete mode 100644 Registration/AbstractControllerRegistration.cs delete mode 100644 Registration/ControllerDynamicQueueRegistration.cs delete mode 100644 Registration/ControllerQueueRegistration.cs create mode 100644 Tapeti.Saga/SagaBindingMiddleware.cs create mode 100644 Tapeti.Saga/SagaMemoryStore.cs create mode 100644 Tapeti.Saga/SagaMessageMiddleware.cs create mode 100644 Tapeti.Saga/SagaMiddleware.cs create mode 100644 Tapeti.Saga/SagaProvider.cs delete mode 100644 Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs create mode 100644 TapetiConfig.cs delete mode 100644 TapetiConnectionBuilder.cs delete mode 100644 TapetiTopologyBuilder.cs create mode 100644 Test/Visualizer.cs diff --git a/Config/IBindingContext.cs b/Config/IBindingContext.cs new file mode 100644 index 0000000..df3444e --- /dev/null +++ b/Config/IBindingContext.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Reflection; + +namespace Tapeti.Config +{ + public delegate object ValueFactory(IMessageContext context); + + + public interface IBindingContext + { + Type MessageClass { get; set; } + IReadOnlyList Parameters { get; } + } + + + public interface IBindingParameter + { + ParameterInfo Info { get; } + bool HasBinding { get; } + + void SetBinding(ValueFactory valueFactory); + } +} diff --git a/Config/IBindingMiddleware.cs b/Config/IBindingMiddleware.cs new file mode 100644 index 0000000..e2d977c --- /dev/null +++ b/Config/IBindingMiddleware.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Config +{ + public interface IBindingMiddleware + { + void Handle(IBindingContext context, Action next); + } +} diff --git a/Config/IConfig.cs b/Config/IConfig.cs new file mode 100644 index 0000000..8752c97 --- /dev/null +++ b/Config/IConfig.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface IConfig + { + string Exchange { get; } + IDependencyResolver DependencyResolver { get; } + IReadOnlyList MessageMiddleware { get; } + IEnumerable Queues { get; } + } + + + public interface IQueue + { + bool Dynamic { get; } + string Name { get; } + + IEnumerable Bindings { get; } + } + + + public interface IBinding + { + Type Controller { get; } + MethodInfo Method { get; } + Type MessageClass { get; } + + bool Accept(object message); + Task Invoke(IMessageContext context, object message); + } +} diff --git a/Config/IMessageContext.cs b/Config/IMessageContext.cs new file mode 100644 index 0000000..6bf8101 --- /dev/null +++ b/Config/IMessageContext.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; + +namespace Tapeti.Config +{ + public interface IMessageContext + { + object Controller { get; } + object Message { get; } + IDictionary Items { get; } + } +} diff --git a/Config/IMessageMiddleware.cs b/Config/IMessageMiddleware.cs new file mode 100644 index 0000000..aaec7eb --- /dev/null +++ b/Config/IMessageMiddleware.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Config +{ + public interface IMessageMiddleware + { + void Handle(IMessageContext context, Action next); + } +} diff --git a/Config/IMiddlewareBundle.cs b/Config/IMiddlewareBundle.cs new file mode 100644 index 0000000..82846f3 --- /dev/null +++ b/Config/IMiddlewareBundle.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace Tapeti.Config +{ + public interface IMiddlewareBundle + { + IEnumerable GetContents(IDependencyResolver dependencyResolver); + } +} diff --git a/Connection/TapetiConsumer.cs b/Connection/TapetiConsumer.cs index afabfd5..1687d24 100644 --- a/Connection/TapetiConsumer.cs +++ b/Connection/TapetiConsumer.cs @@ -1,21 +1,26 @@ using System; -using System.Diagnostics.Eventing.Reader; +using System.Collections.Generic; +using System.Linq; using RabbitMQ.Client; +using Tapeti.Config; +using Tapeti.Helpers; namespace Tapeti.Connection { public class TapetiConsumer : DefaultBasicConsumer { private readonly TapetiWorker worker; - private readonly IMessageSerializer messageSerializer; - private readonly IQueueRegistration queueRegistration; + private readonly IDependencyResolver dependencyResolver; + private readonly IReadOnlyList messageMiddleware; + private readonly List bindings; - public TapetiConsumer(TapetiWorker worker, IMessageSerializer messageSerializer, IQueueRegistration queueRegistration) + public TapetiConsumer(TapetiWorker worker, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware) { this.worker = worker; - this.messageSerializer = messageSerializer; - this.queueRegistration = queueRegistration; + this.dependencyResolver = dependencyResolver; + this.messageMiddleware = messageMiddleware; + this.bindings = bindings.ToList(); } @@ -24,22 +29,46 @@ namespace Tapeti.Connection { try { - var message = messageSerializer.Deserialize(body, properties); + var message = dependencyResolver.Resolve().Deserialize(body, properties); if (message == null) throw new ArgumentException("Empty message"); - if (queueRegistration.Accept(message)) - queueRegistration.Visit(message); - else + var handled = false; + foreach (var binding in bindings.Where(b => b.Accept(message))) + { + var context = new MessageContext + { + Controller = dependencyResolver.Resolve(binding.Controller), + Message = message + }; + + MiddlewareHelper.Go(messageMiddleware, (handler, next) => handler.Handle(context, next)); + + var result = binding.Invoke(context, message).Result; + if (result != null) + worker.Publish(result); + + handled = true; + } + + if (!handled) throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); worker.Respond(deliveryTag, ConsumeResponse.Ack); } catch (Exception) { - //TODO pluggable exception handling - worker.Respond(deliveryTag, ConsumeResponse.Nack); + worker.Respond(deliveryTag, ConsumeResponse.Requeue); + throw; } } + + + protected class MessageContext : IMessageContext + { + public object Controller { get; set; } + public object Message { get; set; } + public IDictionary Items { get; } = new Dictionary(); + } } } diff --git a/Connection/TapetiSubscriber.cs b/Connection/TapetiSubscriber.cs index 9d487f3..f6303cb 100644 --- a/Connection/TapetiSubscriber.cs +++ b/Connection/TapetiSubscriber.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Tapeti.Config; namespace Tapeti.Connection { @@ -16,9 +17,9 @@ namespace Tapeti.Connection } - public async Task BindQueues(IEnumerable registrations) + public async Task BindQueues(IEnumerable queues) { - await Task.WhenAll(registrations.Select(registration => workerFactory().Subscribe(registration)).ToList()); + await Task.WhenAll(queues.Select(queue => workerFactory().Subscribe(queue)).ToList()); } } } diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index 99c4b2f..f5ee254 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -1,8 +1,10 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; +using Tapeti.Config; using Tapeti.Tasks; namespace Tapeti.Connection @@ -10,20 +12,23 @@ namespace Tapeti.Connection public class TapetiWorker { public TapetiConnectionParams ConnectionParams { get; set; } - public string PublishExchange { get; set; } - + public string Exchange { get; set; } + private readonly IDependencyResolver dependencyResolver; + private readonly IReadOnlyList messageMiddleware; private readonly IMessageSerializer messageSerializer; private readonly IRoutingKeyStrategy routingKeyStrategy; private readonly Lazy taskQueue = new Lazy(); private RabbitMQ.Client.IConnection connection; - private IModel channel; + private IModel channelInstance; - public TapetiWorker(IMessageSerializer messageSerializer, IRoutingKeyStrategy routingKeyStrategy) + public TapetiWorker(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware) { - this.messageSerializer = messageSerializer; - this.routingKeyStrategy = routingKeyStrategy; + this.dependencyResolver = dependencyResolver; + this.messageMiddleware = messageMiddleware; + messageSerializer = dependencyResolver.Resolve(); + routingKeyStrategy = dependencyResolver.Resolve(); } @@ -35,29 +40,45 @@ namespace Tapeti.Connection var body = messageSerializer.Serialize(message, properties); (await GetChannel()) - .BasicPublish(PublishExchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false, + .BasicPublish(Exchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false, properties, body); }).Unwrap(); } - public Task Subscribe(string queueName, IQueueRegistration queueRegistration) + public Task Consume(string queueName, IEnumerable bindings) { return taskQueue.Value.Add(async () => { - (await GetChannel()) - .BasicConsume(queueName, false, new TapetiConsumer(this, messageSerializer, queueRegistration)); + (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, dependencyResolver, bindings, messageMiddleware)); }).Unwrap(); } - public async Task Subscribe(IQueueRegistration registration) + public async Task Subscribe(IQueue queue) { - var queueName = await taskQueue.Value.Add(async () => - registration.BindQueue(await GetChannel())) - .Unwrap(); + var queueName = await taskQueue.Value.Add(async () => + { + var channel = await GetChannel(); - await Subscribe(queueName, registration); + if (queue.Dynamic) + { + var dynamicQueue = channel.QueueDeclare(); + + foreach (var binding in queue.Bindings) + { + var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); + channel.QueueBind(dynamicQueue.QueueName, Exchange, routingKey); + } + + return dynamicQueue.QueueName; + } + + channel.QueueDeclarePassive(queue.Name); + return queue.Name; + }).Unwrap(); + + await Consume(queueName, queue.Bindings); } @@ -91,10 +112,10 @@ namespace Tapeti.Connection return taskQueue.Value.Add(() => { - if (channel != null) + if (channelInstance != null) { - channel.Dispose(); - channel = null; + channelInstance.Dispose(); + channelInstance = null; } // ReSharper disable once InvertIf @@ -115,8 +136,8 @@ namespace Tapeti.Connection /// private async Task GetChannel() { - if (channel != null) - return channel; + if (channelInstance != null) + return channelInstance; var connectionFactory = new ConnectionFactory { @@ -134,7 +155,7 @@ namespace Tapeti.Connection try { connection = connectionFactory.CreateConnection(); - channel = connection.CreateModel(); + channelInstance = connection.CreateModel(); break; } @@ -144,7 +165,7 @@ namespace Tapeti.Connection } } - return channel; + return channelInstance; } } } diff --git a/Default/BindingBufferStop.cs b/Default/BindingBufferStop.cs new file mode 100644 index 0000000..8d55a7c --- /dev/null +++ b/Default/BindingBufferStop.cs @@ -0,0 +1,13 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Default +{ + // End of the line... + public class BindingBufferStop : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + } + } +} diff --git a/Default/DefaultControllerFactory.cs b/Default/DefaultControllerFactory.cs deleted file mode 100644 index 9918c18..0000000 --- a/Default/DefaultControllerFactory.cs +++ /dev/null @@ -1,62 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reflection; - -namespace Tapeti.Default -{ - public class DefaultControllerFactory : IControllerFactory - { - private readonly Dictionary> controllerConstructors = new Dictionary>(); - private readonly Func publisherFactory; - - public DefaultControllerFactory(Func publisherFactory) - { - this.publisherFactory = publisherFactory; - } - - - public object CreateController(Type controllerType) - { - Func constructor; - if (!controllerConstructors.TryGetValue(controllerType, out constructor)) - throw new ArgumentException($"Can not create unregistered controller {controllerType.FullName}"); - - return constructor(); - } - - - public void RegisterController(Type type) - { - controllerConstructors.Add(type, GetConstructor(type)); - } - - - protected Func GetConstructor(Type type) - { - var constructors = type.GetConstructors(); - - ConstructorInfo publisherConstructor = null; - ConstructorInfo emptyConstructor = null; - - foreach (var constructor in constructors) - { - var parameters = constructor.GetParameters(); - if (parameters.Length > 0) - { - if (parameters.Length == 1 && parameters[0].ParameterType == typeof(IPublisher)) - publisherConstructor = constructor; - } - else - emptyConstructor = constructor; - } - - if (publisherConstructor != null) - return () => publisherConstructor.Invoke(new object[] { publisherFactory() }); - - if (emptyConstructor != null) - return () => emptyConstructor.Invoke(null); - - throw new ArgumentException($"Unable to construct type {type.Name}, a parameterless constructor or one with only an IPublisher parameter is required"); - } - } -} diff --git a/Default/DefaultDependencyResolver.cs b/Default/DefaultDependencyResolver.cs deleted file mode 100644 index 066307e..0000000 --- a/Default/DefaultDependencyResolver.cs +++ /dev/null @@ -1,72 +0,0 @@ -using System; - -namespace Tapeti.Default -{ - /** - * !! IoC Container 9000 !! - * - * ...you probably want to replace this one as soon as possible. - * - * A Simple Injector implementation is provided in the Tapeti.SimpleInjector package. - */ - public class DefaultDependencyResolver : IDependencyInjector - { - private readonly Lazy controllerFactory; - private readonly Lazy routingKeyStrategy = new Lazy(); - private readonly Lazy messageSerializer = new Lazy(); - private readonly Lazy logger; - private IPublisher publisher; - - - public DefaultDependencyResolver() - { - controllerFactory = new Lazy(() => new DefaultControllerFactory(() => publisher)); - - logger = new Lazy(() => - { - // http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console - try - { - // ReSharper disable once UnusedVariable - var dummy = Console.WindowHeight; - - return new ConsoleLogger(); - } - catch - { - return new DevNullLogger(); - } - }); - } - - - public T Resolve() where T : class - { - if (typeof(T) == typeof(IControllerFactory)) - return (T)(controllerFactory.Value as IControllerFactory); - - if (typeof(T) == typeof(IRoutingKeyStrategy)) - return (T)(routingKeyStrategy.Value as IRoutingKeyStrategy); - - if (typeof(T) == typeof(IMessageSerializer)) - return (T)(messageSerializer.Value as IMessageSerializer); - - if (typeof(T) == typeof(ILogger)) - return (T)logger.Value; - - return default(T); - } - - - public void RegisterPublisher(IPublisher value) - { - publisher = value; - } - - - public void RegisterController(Type type) - { - controllerFactory.Value.RegisterController(type); - } - } -} diff --git a/Default/DependencyResolverBinding.cs b/Default/DependencyResolverBinding.cs new file mode 100644 index 0000000..d9a8a0d --- /dev/null +++ b/Default/DependencyResolverBinding.cs @@ -0,0 +1,26 @@ +using System; +using System.Linq; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class DependencyResolverBinding : IBindingMiddleware + { + private readonly IDependencyResolver resolver; + + + public DependencyResolverBinding(IDependencyResolver resolver) + { + this.resolver = resolver; + } + + + public void Handle(IBindingContext context, Action next) + { + next(); + + foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType.IsClass)) + parameter.SetBinding(messageContext => resolver.Resolve(parameter.Info.ParameterType)); + } + } +} diff --git a/Default/MessageBinding.cs b/Default/MessageBinding.cs new file mode 100644 index 0000000..f21cd6b --- /dev/null +++ b/Default/MessageBinding.cs @@ -0,0 +1,23 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class MessageBinding : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + if (context.Parameters.Count == 0) + throw new TopologyConfigurationException("First parameter must be a message class"); + + var parameter = context.Parameters[0]; + if (!parameter.Info.ParameterType.IsClass) + throw new TopologyConfigurationException($"First parameter {parameter.Info.Name} must be a message class"); + + parameter.SetBinding(messageContext => messageContext.Message); + context.MessageClass = parameter.Info.ParameterType; + + next(); + } + } +} diff --git a/Helpers/ConsoleHelper.cs b/Helpers/ConsoleHelper.cs new file mode 100644 index 0000000..0769de8 --- /dev/null +++ b/Helpers/ConsoleHelper.cs @@ -0,0 +1,22 @@ +using System; + +namespace Tapeti.Helpers +{ + public static class ConsoleHelper + { + // Source: http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console + public static bool IsAvailable() + { + try + { + // ReSharper disable once UnusedVariable - that's why it's called dummy + var dummy = Console.WindowHeight; + return true; + } + catch + { + return false; + } + } + } +} diff --git a/Helpers/MiddlewareHelper.cs b/Helpers/MiddlewareHelper.cs new file mode 100644 index 0000000..4e813b1 --- /dev/null +++ b/Helpers/MiddlewareHelper.cs @@ -0,0 +1,26 @@ +using System; +using System.Collections.Generic; + +namespace Tapeti.Helpers +{ + public static class MiddlewareHelper + { + public static void Go(IReadOnlyList middleware, Action handle) + { + var handlerIndex = middleware.Count - 1; + if (handlerIndex == -1) + return; + + Action handleNext = null; + + handleNext = () => + { + handlerIndex--; + if (handlerIndex >= 0) + handle(middleware[handlerIndex], handleNext); + }; + + handle(middleware[handlerIndex], handleNext); + } + } +} diff --git a/IControllerFactory.cs b/IControllerFactory.cs deleted file mode 100644 index 6522f3f..0000000 --- a/IControllerFactory.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; - -namespace Tapeti -{ - public interface IControllerFactory - { - object CreateController(Type controllerType); - } -} diff --git a/IDependencyResolver.cs b/IDependencyResolver.cs index a177d25..6f2641b 100644 --- a/IDependencyResolver.cs +++ b/IDependencyResolver.cs @@ -5,12 +5,14 @@ namespace Tapeti public interface IDependencyResolver { T Resolve() where T : class; + object Resolve(Type type); } public interface IDependencyInjector : IDependencyResolver { - void RegisterPublisher(IPublisher publisher); + void RegisterDefault() where TService : class where TImplementation : class, TService; + void RegisterPublisher(Func publisher); void RegisterController(Type type); } } diff --git a/IQueueRegistration.cs b/IQueueRegistration.cs deleted file mode 100644 index 901af4b..0000000 --- a/IQueueRegistration.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System.Threading.Tasks; -using RabbitMQ.Client; - -namespace Tapeti -{ - public interface IQueueRegistration - { - string BindQueue(IModel channel); - - bool Accept(object message); - Task Visit(object message); - } -} diff --git a/ISubscriber.cs b/ISubscriber.cs index 2be1a35..fae4328 100644 --- a/ISubscriber.cs +++ b/ISubscriber.cs @@ -1,6 +1,4 @@ -using System; - -namespace Tapeti +namespace Tapeti { public interface ISubscriber { diff --git a/ITopology.cs b/ITopology.cs deleted file mode 100644 index 05c1934..0000000 --- a/ITopology.cs +++ /dev/null @@ -1,20 +0,0 @@ -using System.Collections.Generic; - -namespace Tapeti -{ - public interface ITopology - { - IEnumerable Queues(); - } - - - public interface IQueue - { - IEnumerable Bindings(); - } - - - public interface IBinding - { - } -} diff --git a/Properties/AssemblyInfo.cs b/Properties/AssemblyInfo.cs index f02ee9b..3428fd6 100644 --- a/Properties/AssemblyInfo.cs +++ b/Properties/AssemblyInfo.cs @@ -1,5 +1,4 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following diff --git a/Registration/AbstractControllerRegistration.cs b/Registration/AbstractControllerRegistration.cs deleted file mode 100644 index f0b2c64..0000000 --- a/Registration/AbstractControllerRegistration.cs +++ /dev/null @@ -1,142 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Threading.Tasks; -using RabbitMQ.Client; -using Tapeti.Annotations; - -namespace Tapeti.Registration -{ - using MessageHandlerAction = Func; - - public struct MessageHandler - { - public MessageHandlerAction Action; - public string Exchange; - public string RoutingKey; - } - - - public abstract class AbstractControllerRegistration : IQueueRegistration - { - private readonly Func controllerFactoryFactory; - private readonly Type controllerType; - private readonly string defaultExchange; - private readonly Dictionary> messageHandlers = new Dictionary>(); - - - protected AbstractControllerRegistration(Func controllerFactoryFactory, Type controllerType, string defaultExchange) - { - this.controllerFactoryFactory = controllerFactoryFactory; - this.controllerType = controllerType; - this.defaultExchange = defaultExchange; - - // ReSharper disable once VirtualMemberCallInConstructor - I know. What do you think this is, C++? - GetMessageHandlers(controllerType, (type, handler) => - { - if (!messageHandlers.ContainsKey(type)) - messageHandlers.Add(type, new List { handler }); - else - messageHandlers[type].Add(handler); - }); - } - - - protected virtual void GetMessageHandlers(Type type, Action add) - { - foreach (var method in type.GetMembers(BindingFlags.Public | BindingFlags.Instance) - .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object)) - .Select(m => (MethodInfo)m)) - { - Type messageType; - var messageHandler = GetMessageHandler(method, out messageType); - - add(messageType, messageHandler); - } - } - - - protected virtual MessageHandler GetMessageHandler(MethodInfo method, out Type messageType) - { - var parameters = method.GetParameters(); - - if (parameters.Length != 1 || !parameters[0].ParameterType.IsClass) - throw new ArgumentException($"Method {method.Name} does not have a single object parameter"); - - messageType = parameters[0].ParameterType; - var messageHandler = new MessageHandler(); - - if (method.ReturnType == typeof(void)) - messageHandler.Action = CreateSyncMessageHandler(method); - else if (method.ReturnType == typeof(Task)) - messageHandler.Action = CreateAsyncMessageHandler(method); - else - throw new ArgumentException($"Method {method.Name} needs to return void or a Task"); - - var exchangeAttribute = method.GetCustomAttribute() ?? method.DeclaringType.GetCustomAttribute(); - messageHandler.Exchange = exchangeAttribute?.Name; - - return messageHandler; - } - - - protected IEnumerable GetMessageTypes() - { - return messageHandlers.Keys; - } - - - protected IEnumerable GetMessageExchanges(Type type) - { - var exchanges = messageHandlers[type] - .Where(h => h.Exchange != null) - .Select(h => h.Exchange) - .Distinct(StringComparer.InvariantCulture) - .ToArray(); - - return exchanges.Length > 0 ? exchanges : new[] { defaultExchange }; - } - - - public abstract string BindQueue(IModel channel); - - - public bool Accept(object message) - { - return messageHandlers.ContainsKey(message.GetType()); - } - - - public Task Visit(object message) - { - var registeredHandlers = messageHandlers[message.GetType()]; - if (registeredHandlers != null) - return Task.WhenAll(registeredHandlers.Select(messageHandler => messageHandler.Action(message))); - - return Task.CompletedTask; - } - - - protected virtual MessageHandlerAction CreateSyncMessageHandler(MethodInfo method) - { - return message => - { - var controller = controllerFactoryFactory().CreateController(controllerType); - method.Invoke(controller, new[] { message }); - - return Task.CompletedTask; - }; - } - - - protected virtual MessageHandlerAction CreateAsyncMessageHandler(MethodInfo method) - { - return message => - { - var controller = controllerFactoryFactory().CreateController(controllerType); - return (Task)method.Invoke(controller, new[] { message }); - }; - } - } -} diff --git a/Registration/ControllerDynamicQueueRegistration.cs b/Registration/ControllerDynamicQueueRegistration.cs deleted file mode 100644 index ad7f787..0000000 --- a/Registration/ControllerDynamicQueueRegistration.cs +++ /dev/null @@ -1,33 +0,0 @@ -using System; -using RabbitMQ.Client; - -namespace Tapeti.Registration -{ - public class ControllerDynamicQueueRegistration : AbstractControllerRegistration - { - private readonly Func routingKeyStrategyFactory; - - - public ControllerDynamicQueueRegistration(Func controllerFactoryFactory, Func routingKeyStrategyFactory, Type controllerType, string defaultExchange) - : base(controllerFactoryFactory, controllerType, defaultExchange) - { - this.routingKeyStrategyFactory = routingKeyStrategyFactory; - } - - - public override string BindQueue(IModel channel) - { - var queue = channel.QueueDeclare(); - - foreach (var messageType in GetMessageTypes()) - { - var routingKey = routingKeyStrategyFactory().GetRoutingKey(messageType); - - foreach (var exchange in GetMessageExchanges(messageType)) - channel.QueueBind(queue.QueueName, exchange, routingKey); - } - - return queue.QueueName; - } - } -} diff --git a/Registration/ControllerQueueRegistration.cs b/Registration/ControllerQueueRegistration.cs deleted file mode 100644 index 3be95d5..0000000 --- a/Registration/ControllerQueueRegistration.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using RabbitMQ.Client; - -namespace Tapeti.Registration -{ - public class ControllerQueueRegistration : AbstractControllerRegistration - { - private readonly string queueName; - - public ControllerQueueRegistration(Func controllerFactoryFactory, Type controllerType, string defaultExchange, string queueName) : base(controllerFactoryFactory, controllerType, defaultExchange) - { - this.queueName = queueName; - } - - - public override string BindQueue(IModel channel) - { - return channel.QueueDeclarePassive(queueName).QueueName; - } - } -} diff --git a/Tapeti.Saga/SagaBindingMiddleware.cs b/Tapeti.Saga/SagaBindingMiddleware.cs new file mode 100644 index 0000000..f6a056f --- /dev/null +++ b/Tapeti.Saga/SagaBindingMiddleware.cs @@ -0,0 +1,28 @@ +using System; +using System.Linq; +using Tapeti.Config; + +namespace Tapeti.Saga +{ + public class SagaBindingMiddleware : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + foreach (var parameter in context.Parameters.Where(p => + p.Info.ParameterType.IsGenericType && + p.Info.ParameterType.GetGenericTypeDefinition() == typeof(ISaga<>))) + { + parameter.SetBinding(messageContext => + { + object saga; + if (!messageContext.Items.TryGetValue("Saga", out saga)) + return null; + + return saga.GetType() == typeof(ISaga<>) ? saga : null; + }); + } + + next(); + } + } +} diff --git a/Tapeti.Saga/SagaMemoryStore.cs b/Tapeti.Saga/SagaMemoryStore.cs new file mode 100644 index 0000000..52ee511 --- /dev/null +++ b/Tapeti.Saga/SagaMemoryStore.cs @@ -0,0 +1,43 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Tapeti.Saga +{ + public class SagaMemoryStore : ISagaStore + { + private ISagaStore decoratedStore; + private readonly Dictionary values = new Dictionary(); + + + // Not a constructor to allow standard injection to work when using only the MemoryStore + public static SagaMemoryStore AsCacheFor(ISagaStore store) + { + return new SagaMemoryStore + { + decoratedStore = store + }; + } + + + public async Task Read(string sagaId) + { + object value; + + // ReSharper disable once InvertIf + if (!values.TryGetValue(sagaId, out value) && decoratedStore != null) + { + value = await decoratedStore.Read(sagaId); + values.Add(sagaId, value); + } + + return value; + } + + public async Task Update(string sagaId, object state) + { + values[sagaId] = state; + if (decoratedStore != null) + await decoratedStore.Update(sagaId, state); + } + } +} diff --git a/Tapeti.Saga/SagaMessageMiddleware.cs b/Tapeti.Saga/SagaMessageMiddleware.cs new file mode 100644 index 0000000..bef101a --- /dev/null +++ b/Tapeti.Saga/SagaMessageMiddleware.cs @@ -0,0 +1,22 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Saga +{ + public class SagaMessageMiddleware : IMessageMiddleware + { + private readonly IDependencyResolver dependencyResolver; + + + public SagaMessageMiddleware(IDependencyResolver dependencyResolver) + { + this.dependencyResolver = dependencyResolver; + } + + public void Handle(IMessageContext context, Action next) + { + context.Items["Saga"] = dependencyResolver.Resolve().Continue(""); + next(); + } + } +} diff --git a/Tapeti.Saga/SagaMiddleware.cs b/Tapeti.Saga/SagaMiddleware.cs new file mode 100644 index 0000000..405a29f --- /dev/null +++ b/Tapeti.Saga/SagaMiddleware.cs @@ -0,0 +1,16 @@ +using System.Collections.Generic; +using Tapeti.Config; + +namespace Tapeti.Saga +{ + public class SagaMiddleware : IMiddlewareBundle + { + public IEnumerable GetContents(IDependencyResolver dependencyResolver) + { + (dependencyResolver as IDependencyInjector)?.RegisterDefault(); + + yield return new SagaBindingMiddleware(); + yield return new SagaMessageMiddleware(dependencyResolver); + } + } +} diff --git a/Tapeti.Saga/SagaProvider.cs b/Tapeti.Saga/SagaProvider.cs new file mode 100644 index 0000000..b21ece8 --- /dev/null +++ b/Tapeti.Saga/SagaProvider.cs @@ -0,0 +1,90 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace Tapeti.Saga +{ + public class SagaProvider : ISagaProvider + { + protected static readonly ConcurrentDictionary SagaLocks = new ConcurrentDictionary(); + private readonly ISagaStore store; + + public SagaProvider(ISagaStore store) + { + this.store = store; + } + + + public async Task> Begin(T initialState) where T : class + { + var saga = await Saga.Create(() => Task.FromResult(initialState)); + await store.Update(saga.Id, saga.State); + + return saga; + } + + public async Task> Continue(string sagaId) where T : class + { + return await Saga.Create(async () => await store.Read(sagaId) as T, sagaId); + } + + public async Task Continue(string sagaId) + { + return new Saga + { + Id = sagaId, + State = await store.Read(sagaId) + }; + } + + + protected class Saga : ISaga where T : class + { + private bool disposed; + + public string Id { get; set; } + public T State { get; set; } + + + public static async Task> Create(Func> getState, string id = null) + { + var sagaId = id ?? Guid.NewGuid().ToString(); + await SagaLocks.GetOrAdd(sagaId, new SemaphoreSlim(1)).WaitAsync(); + + var saga = new Saga + { + Id = sagaId, + State = await getState() + }; + + return saga; + } + + + public void Dispose() + { + if (disposed) + return; + + SemaphoreSlim semaphore; + if (SagaLocks.TryGetValue(Id, out semaphore)) + semaphore.Release(); + + disposed = true; + } + + + public void ExpectResponse(string callId) + { + throw new NotImplementedException(); + } + + + public void ResolveResponse(string callId) + { + throw new NotImplementedException(); + } + } + } +} diff --git a/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs b/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs index e01df40..06f8dc0 100644 --- a/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs +++ b/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs @@ -1,5 +1,4 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following diff --git a/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs b/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs deleted file mode 100644 index 8f32b3a..0000000 --- a/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; -using SimpleInjector; - -namespace Tapeti.SimpleInjector -{ - public class SimpleInjectorControllerFactory : IControllerFactory - { - private readonly Container container; - - - public SimpleInjectorControllerFactory(Container container) - { - this.container = container; - } - - - public object CreateController(Type controllerType) - { - return container.GetInstance(controllerType); - } - } -} diff --git a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs index 7dc3f21..6b74af3 100644 --- a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs +++ b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs @@ -1,23 +1,16 @@ using System; using System.Linq; -using System.Reflection; using SimpleInjector; -using Tapeti.Annotations; -using Tapeti.Default; -using System.Collections.Generic; namespace Tapeti.SimpleInjector { - public class SimpleInjectorDependencyResolver : IDependencyResolver, IDependencyInjector + public class SimpleInjectorDependencyResolver : IDependencyInjector { private readonly Container container; - public SimpleInjectorDependencyResolver(Container container, bool registerDefaults = true) + public SimpleInjectorDependencyResolver(Container container) { this.container = container; - - if (registerDefaults) - RegisterDefaults(); } public T Resolve() where T : class @@ -25,10 +18,25 @@ namespace Tapeti.SimpleInjector return container.GetInstance(); } - - public void RegisterPublisher(IPublisher publisher) + public object Resolve(Type type) { - IfUnregistered(container.GetCurrentRegistrations(), () => container.RegisterSingleton(publisher)); + return container.GetInstance(type); + } + + + public void RegisterDefault() where TService : class where TImplementation : class, TService + { + // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates + if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(TService))) + container.Register(); + } + + + public void RegisterPublisher(Func publisher) + { + // ReSharper disable once SimplifyLinqExpression - still not a fan of negative predicates + if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(IPublisher))) + container.Register(publisher); } @@ -36,32 +44,5 @@ namespace Tapeti.SimpleInjector { container.Register(type); } - - - public SimpleInjectorDependencyResolver RegisterDefaults() - { - var currentRegistrations = container.GetCurrentRegistrations(); - - IfUnregistered(currentRegistrations); - IfUnregistered(currentRegistrations); - IfUnregistered(currentRegistrations); - - return this; - } - - - private void IfUnregistered(IEnumerable currentRegistrations) where TService : class where TImplementation: class, TService - { - // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates - if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService))) - container.Register(); - } - - private void IfUnregistered(IEnumerable currentRegistrations, Action register) where TService : class - { - // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates - if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService))) - register(); - } } } diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj index f91dc29..3addf45 100644 --- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj @@ -45,7 +45,6 @@ - diff --git a/Tapeti.csproj b/Tapeti.csproj index 79e761d..e417e0f 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -59,35 +59,38 @@ + + - + + + + + - + - + - - - - - - - + + + +