diff --git a/Annotations/DynamicQueueAttribute.cs b/Annotations/DynamicQueueAttribute.cs new file mode 100644 index 0000000..3d730c9 --- /dev/null +++ b/Annotations/DynamicQueueAttribute.cs @@ -0,0 +1,13 @@ +using System; + +namespace Tapeti.Annotations +{ + /// + /// Creates a non-durable auto-delete queue to receive messages. Can be used + /// on an entire MessageController class or on individual methods. + /// + [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + public class DynamicQueueAttribute : Attribute + { + } +} diff --git a/Annotations/MessageControllerAttribute.cs b/Annotations/MessageControllerAttribute.cs new file mode 100644 index 0000000..1f419f2 --- /dev/null +++ b/Annotations/MessageControllerAttribute.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Annotations +{ + [AttributeUsage(AttributeTargets.Class)] + public class MessageControllerAttribute : Attribute + { + } +} diff --git a/Annotations/QueueAttribute.cs b/Annotations/QueueAttribute.cs deleted file mode 100644 index 878390d..0000000 --- a/Annotations/QueueAttribute.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System; - -namespace Tapeti.Annotations -{ - [AttributeUsage(AttributeTargets.Class)] - public class QueueAttribute : Attribute - { - public string Name { get; set; } - public bool Dynamic { get; set; } - - - public QueueAttribute(string name = null) - { - Name = name; - Dynamic = (name == null); - } - } -} diff --git a/Annotations/StaticQueueAttribute.cs b/Annotations/StaticQueueAttribute.cs new file mode 100644 index 0000000..2a9c6b1 --- /dev/null +++ b/Annotations/StaticQueueAttribute.cs @@ -0,0 +1,25 @@ +using System; + +namespace Tapeti.Annotations +{ + /// + /// Binds to an existing durable queue to receive messages. Can be used + /// on an entire MessageController class or on individual methods. + /// + /// + /// At the moment there is no support for creating a durable queue and managing the + /// bindings. The author recommends https://git.x2software.net/pub/RabbitMetaQueue + /// for deploy-time management of durable queues (shameless plug intended). + /// + [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + public class StaticQueueAttribute : Attribute + { + public string Name { get; set; } + + + public StaticQueueAttribute(string name) + { + Name = name; + } + } +} diff --git a/Config/IBindingContext.cs b/Config/IBindingContext.cs new file mode 100644 index 0000000..df3444e --- /dev/null +++ b/Config/IBindingContext.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Reflection; + +namespace Tapeti.Config +{ + public delegate object ValueFactory(IMessageContext context); + + + public interface IBindingContext + { + Type MessageClass { get; set; } + IReadOnlyList Parameters { get; } + } + + + public interface IBindingParameter + { + ParameterInfo Info { get; } + bool HasBinding { get; } + + void SetBinding(ValueFactory valueFactory); + } +} diff --git a/Config/IBindingMiddleware.cs b/Config/IBindingMiddleware.cs new file mode 100644 index 0000000..e2d977c --- /dev/null +++ b/Config/IBindingMiddleware.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Config +{ + public interface IBindingMiddleware + { + void Handle(IBindingContext context, Action next); + } +} diff --git a/Config/IConfig.cs b/Config/IConfig.cs new file mode 100644 index 0000000..8752c97 --- /dev/null +++ b/Config/IConfig.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface IConfig + { + string Exchange { get; } + IDependencyResolver DependencyResolver { get; } + IReadOnlyList MessageMiddleware { get; } + IEnumerable Queues { get; } + } + + + public interface IQueue + { + bool Dynamic { get; } + string Name { get; } + + IEnumerable Bindings { get; } + } + + + public interface IBinding + { + Type Controller { get; } + MethodInfo Method { get; } + Type MessageClass { get; } + + bool Accept(object message); + Task Invoke(IMessageContext context, object message); + } +} diff --git a/Config/IMessageContext.cs b/Config/IMessageContext.cs new file mode 100644 index 0000000..6bf8101 --- /dev/null +++ b/Config/IMessageContext.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; + +namespace Tapeti.Config +{ + public interface IMessageContext + { + object Controller { get; } + object Message { get; } + IDictionary Items { get; } + } +} diff --git a/Config/IMessageMiddleware.cs b/Config/IMessageMiddleware.cs new file mode 100644 index 0000000..aaec7eb --- /dev/null +++ b/Config/IMessageMiddleware.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Config +{ + public interface IMessageMiddleware + { + void Handle(IMessageContext context, Action next); + } +} diff --git a/Config/IMiddlewareBundle.cs b/Config/IMiddlewareBundle.cs new file mode 100644 index 0000000..82846f3 --- /dev/null +++ b/Config/IMiddlewareBundle.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace Tapeti.Config +{ + public interface IMiddlewareBundle + { + IEnumerable GetContents(IDependencyResolver dependencyResolver); + } +} diff --git a/Connection/TapetiConsumer.cs b/Connection/TapetiConsumer.cs index afabfd5..1687d24 100644 --- a/Connection/TapetiConsumer.cs +++ b/Connection/TapetiConsumer.cs @@ -1,21 +1,26 @@ using System; -using System.Diagnostics.Eventing.Reader; +using System.Collections.Generic; +using System.Linq; using RabbitMQ.Client; +using Tapeti.Config; +using Tapeti.Helpers; namespace Tapeti.Connection { public class TapetiConsumer : DefaultBasicConsumer { private readonly TapetiWorker worker; - private readonly IMessageSerializer messageSerializer; - private readonly IQueueRegistration queueRegistration; + private readonly IDependencyResolver dependencyResolver; + private readonly IReadOnlyList messageMiddleware; + private readonly List bindings; - public TapetiConsumer(TapetiWorker worker, IMessageSerializer messageSerializer, IQueueRegistration queueRegistration) + public TapetiConsumer(TapetiWorker worker, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware) { this.worker = worker; - this.messageSerializer = messageSerializer; - this.queueRegistration = queueRegistration; + this.dependencyResolver = dependencyResolver; + this.messageMiddleware = messageMiddleware; + this.bindings = bindings.ToList(); } @@ -24,22 +29,46 @@ namespace Tapeti.Connection { try { - var message = messageSerializer.Deserialize(body, properties); + var message = dependencyResolver.Resolve().Deserialize(body, properties); if (message == null) throw new ArgumentException("Empty message"); - if (queueRegistration.Accept(message)) - queueRegistration.Visit(message); - else + var handled = false; + foreach (var binding in bindings.Where(b => b.Accept(message))) + { + var context = new MessageContext + { + Controller = dependencyResolver.Resolve(binding.Controller), + Message = message + }; + + MiddlewareHelper.Go(messageMiddleware, (handler, next) => handler.Handle(context, next)); + + var result = binding.Invoke(context, message).Result; + if (result != null) + worker.Publish(result); + + handled = true; + } + + if (!handled) throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); worker.Respond(deliveryTag, ConsumeResponse.Ack); } catch (Exception) { - //TODO pluggable exception handling - worker.Respond(deliveryTag, ConsumeResponse.Nack); + worker.Respond(deliveryTag, ConsumeResponse.Requeue); + throw; } } + + + protected class MessageContext : IMessageContext + { + public object Controller { get; set; } + public object Message { get; set; } + public IDictionary Items { get; } = new Dictionary(); + } } } diff --git a/Connection/TapetiPublisher.cs b/Connection/TapetiPublisher.cs index ca021f9..4143cf5 100644 --- a/Connection/TapetiPublisher.cs +++ b/Connection/TapetiPublisher.cs @@ -1,21 +1,22 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; namespace Tapeti.Connection { public class TapetiPublisher : IPublisher { - private readonly TapetiWorker worker; + private readonly Func workerFactory; - public TapetiPublisher(TapetiWorker worker) + public TapetiPublisher(Func workerFactory) { - this.worker = worker; + this.workerFactory = workerFactory; } public Task Publish(object message) { - return worker.Publish(message); + return workerFactory().Publish(message); } } } diff --git a/Connection/TapetiSubscriber.cs b/Connection/TapetiSubscriber.cs index baafe9d..f6303cb 100644 --- a/Connection/TapetiSubscriber.cs +++ b/Connection/TapetiSubscriber.cs @@ -1,23 +1,25 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Tapeti.Config; namespace Tapeti.Connection { public class TapetiSubscriber : ISubscriber { - private readonly TapetiWorker worker; + private readonly Func workerFactory; - public TapetiSubscriber(TapetiWorker worker) + public TapetiSubscriber(Func workerFactory) { - this.worker = worker; + this.workerFactory = workerFactory; } - public async Task BindQueues(IEnumerable registrations) + public async Task BindQueues(IEnumerable queues) { - await Task.WhenAll(registrations.Select(registration => worker.Subscribe(registration)).ToList()); + await Task.WhenAll(queues.Select(queue => workerFactory().Subscribe(queue)).ToList()); } } } diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index c4b13ef..f5ee254 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -1,8 +1,10 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; +using Tapeti.Config; using Tapeti.Tasks; namespace Tapeti.Connection @@ -10,20 +12,23 @@ namespace Tapeti.Connection public class TapetiWorker { public TapetiConnectionParams ConnectionParams { get; set; } - public string PublishExchange { get; set; } - + public string Exchange { get; set; } + private readonly IDependencyResolver dependencyResolver; + private readonly IReadOnlyList messageMiddleware; private readonly IMessageSerializer messageSerializer; private readonly IRoutingKeyStrategy routingKeyStrategy; private readonly Lazy taskQueue = new Lazy(); - private IConnection connection; - private IModel channel; + private RabbitMQ.Client.IConnection connection; + private IModel channelInstance; - public TapetiWorker(IMessageSerializer messageSerializer, IRoutingKeyStrategy routingKeyStrategy) + public TapetiWorker(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware) { - this.messageSerializer = messageSerializer; - this.routingKeyStrategy = routingKeyStrategy; + this.dependencyResolver = dependencyResolver; + this.messageMiddleware = messageMiddleware; + messageSerializer = dependencyResolver.Resolve(); + routingKeyStrategy = dependencyResolver.Resolve(); } @@ -35,29 +40,45 @@ namespace Tapeti.Connection var body = messageSerializer.Serialize(message, properties); (await GetChannel()) - .BasicPublish(PublishExchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false, + .BasicPublish(Exchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false, properties, body); }).Unwrap(); } - public Task Subscribe(string queueName, IQueueRegistration queueRegistration) + public Task Consume(string queueName, IEnumerable bindings) { return taskQueue.Value.Add(async () => { - (await GetChannel()) - .BasicConsume(queueName, false, new TapetiConsumer(this, messageSerializer, queueRegistration)); + (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, dependencyResolver, bindings, messageMiddleware)); }).Unwrap(); } - public async Task Subscribe(IQueueRegistration registration) + public async Task Subscribe(IQueue queue) { - var queueName = await taskQueue.Value.Add(async () => - registration.BindQueue(await GetChannel())) - .Unwrap(); + var queueName = await taskQueue.Value.Add(async () => + { + var channel = await GetChannel(); - await Subscribe(queueName, registration); + if (queue.Dynamic) + { + var dynamicQueue = channel.QueueDeclare(); + + foreach (var binding in queue.Bindings) + { + var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); + channel.QueueBind(dynamicQueue.QueueName, Exchange, routingKey); + } + + return dynamicQueue.QueueName; + } + + channel.QueueDeclarePassive(queue.Name); + return queue.Name; + }).Unwrap(); + + await Consume(queueName, queue.Bindings); } @@ -91,10 +112,10 @@ namespace Tapeti.Connection return taskQueue.Value.Add(() => { - if (channel != null) + if (channelInstance != null) { - channel.Dispose(); - channel = null; + channelInstance.Dispose(); + channelInstance = null; } // ReSharper disable once InvertIf @@ -115,8 +136,8 @@ namespace Tapeti.Connection /// private async Task GetChannel() { - if (channel != null) - return channel; + if (channelInstance != null) + return channelInstance; var connectionFactory = new ConnectionFactory { @@ -134,7 +155,7 @@ namespace Tapeti.Connection try { connection = connectionFactory.CreateConnection(); - channel = connection.CreateModel(); + channelInstance = connection.CreateModel(); break; } @@ -144,7 +165,7 @@ namespace Tapeti.Connection } } - return channel; + return channelInstance; } } } diff --git a/Default/BindingBufferStop.cs b/Default/BindingBufferStop.cs new file mode 100644 index 0000000..8d55a7c --- /dev/null +++ b/Default/BindingBufferStop.cs @@ -0,0 +1,13 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Default +{ + // End of the line... + public class BindingBufferStop : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + } + } +} diff --git a/Default/DefaultControllerFactory.cs b/Default/DefaultControllerFactory.cs deleted file mode 100644 index 9918c18..0000000 --- a/Default/DefaultControllerFactory.cs +++ /dev/null @@ -1,62 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reflection; - -namespace Tapeti.Default -{ - public class DefaultControllerFactory : IControllerFactory - { - private readonly Dictionary> controllerConstructors = new Dictionary>(); - private readonly Func publisherFactory; - - public DefaultControllerFactory(Func publisherFactory) - { - this.publisherFactory = publisherFactory; - } - - - public object CreateController(Type controllerType) - { - Func constructor; - if (!controllerConstructors.TryGetValue(controllerType, out constructor)) - throw new ArgumentException($"Can not create unregistered controller {controllerType.FullName}"); - - return constructor(); - } - - - public void RegisterController(Type type) - { - controllerConstructors.Add(type, GetConstructor(type)); - } - - - protected Func GetConstructor(Type type) - { - var constructors = type.GetConstructors(); - - ConstructorInfo publisherConstructor = null; - ConstructorInfo emptyConstructor = null; - - foreach (var constructor in constructors) - { - var parameters = constructor.GetParameters(); - if (parameters.Length > 0) - { - if (parameters.Length == 1 && parameters[0].ParameterType == typeof(IPublisher)) - publisherConstructor = constructor; - } - else - emptyConstructor = constructor; - } - - if (publisherConstructor != null) - return () => publisherConstructor.Invoke(new object[] { publisherFactory() }); - - if (emptyConstructor != null) - return () => emptyConstructor.Invoke(null); - - throw new ArgumentException($"Unable to construct type {type.Name}, a parameterless constructor or one with only an IPublisher parameter is required"); - } - } -} diff --git a/Default/DefaultDependencyResolver.cs b/Default/DefaultDependencyResolver.cs deleted file mode 100644 index 062bab7..0000000 --- a/Default/DefaultDependencyResolver.cs +++ /dev/null @@ -1,66 +0,0 @@ -using System; - -namespace Tapeti.Default -{ - /** - * !! IoC Container 9000 !! - * - * ...you probably want to replace this one as soon as possible. - * - * A Simple Injector implementation is provided in the Tapeti.SimpleInjector package. - */ - public class DefaultDependencyResolver : IDependencyInjector - { - private readonly Lazy controllerFactory; - private readonly Lazy routingKeyStrategy = new Lazy(); - private readonly Lazy messageSerializer = new Lazy(); - private readonly Lazy logger; - - - - public DefaultDependencyResolver(Func publisherFactory) - { - controllerFactory = new Lazy(() => new DefaultControllerFactory(publisherFactory)); - - logger = new Lazy(() => - { - // http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console - try - { - // ReSharper disable once UnusedVariable - var dummy = Console.WindowHeight; - - return new ConsoleLogger(); - } - catch - { - return new DevNullLogger(); - } - }); - } - - - public T Resolve() where T : class - { - if (typeof(T) == typeof(IControllerFactory)) - return (T)(controllerFactory.Value as IControllerFactory); - - if (typeof(T) == typeof(IRoutingKeyStrategy)) - return (T)(routingKeyStrategy.Value as IRoutingKeyStrategy); - - if (typeof(T) == typeof(IMessageSerializer)) - return (T)(messageSerializer.Value as IMessageSerializer); - - if (typeof(T) == typeof(ILogger)) - return (T)logger.Value; - - return default(T); - } - - - public void RegisterController(Type type) - { - controllerFactory.Value.RegisterController(type); - } - } -} diff --git a/Default/DependencyResolverBinding.cs b/Default/DependencyResolverBinding.cs new file mode 100644 index 0000000..d9a8a0d --- /dev/null +++ b/Default/DependencyResolverBinding.cs @@ -0,0 +1,26 @@ +using System; +using System.Linq; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class DependencyResolverBinding : IBindingMiddleware + { + private readonly IDependencyResolver resolver; + + + public DependencyResolverBinding(IDependencyResolver resolver) + { + this.resolver = resolver; + } + + + public void Handle(IBindingContext context, Action next) + { + next(); + + foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType.IsClass)) + parameter.SetBinding(messageContext => resolver.Resolve(parameter.Info.ParameterType)); + } + } +} diff --git a/Default/MessageBinding.cs b/Default/MessageBinding.cs new file mode 100644 index 0000000..f21cd6b --- /dev/null +++ b/Default/MessageBinding.cs @@ -0,0 +1,23 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class MessageBinding : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + if (context.Parameters.Count == 0) + throw new TopologyConfigurationException("First parameter must be a message class"); + + var parameter = context.Parameters[0]; + if (!parameter.Info.ParameterType.IsClass) + throw new TopologyConfigurationException($"First parameter {parameter.Info.Name} must be a message class"); + + parameter.SetBinding(messageContext => messageContext.Message); + context.MessageClass = parameter.Info.ParameterType; + + next(); + } + } +} diff --git a/Helpers/ConsoleHelper.cs b/Helpers/ConsoleHelper.cs new file mode 100644 index 0000000..0769de8 --- /dev/null +++ b/Helpers/ConsoleHelper.cs @@ -0,0 +1,22 @@ +using System; + +namespace Tapeti.Helpers +{ + public static class ConsoleHelper + { + // Source: http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console + public static bool IsAvailable() + { + try + { + // ReSharper disable once UnusedVariable - that's why it's called dummy + var dummy = Console.WindowHeight; + return true; + } + catch + { + return false; + } + } + } +} diff --git a/Helpers/MiddlewareHelper.cs b/Helpers/MiddlewareHelper.cs new file mode 100644 index 0000000..4e813b1 --- /dev/null +++ b/Helpers/MiddlewareHelper.cs @@ -0,0 +1,26 @@ +using System; +using System.Collections.Generic; + +namespace Tapeti.Helpers +{ + public static class MiddlewareHelper + { + public static void Go(IReadOnlyList middleware, Action handle) + { + var handlerIndex = middleware.Count - 1; + if (handlerIndex == -1) + return; + + Action handleNext = null; + + handleNext = () => + { + handlerIndex--; + if (handlerIndex >= 0) + handle(middleware[handlerIndex], handleNext); + }; + + handle(middleware[handlerIndex], handleNext); + } + } +} diff --git a/IConnection.cs b/IConnection.cs new file mode 100644 index 0000000..b9671be --- /dev/null +++ b/IConnection.cs @@ -0,0 +1,10 @@ +using System; +using System.Threading.Tasks; + +namespace Tapeti +{ + public interface IConnection : IDisposable + { + Task Subscribe(); + } +} diff --git a/IControllerFactory.cs b/IControllerFactory.cs deleted file mode 100644 index 6522f3f..0000000 --- a/IControllerFactory.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; - -namespace Tapeti -{ - public interface IControllerFactory - { - object CreateController(Type controllerType); - } -} diff --git a/IDependencyResolver.cs b/IDependencyResolver.cs index e04aa84..6f2641b 100644 --- a/IDependencyResolver.cs +++ b/IDependencyResolver.cs @@ -5,11 +5,14 @@ namespace Tapeti public interface IDependencyResolver { T Resolve() where T : class; + object Resolve(Type type); } public interface IDependencyInjector : IDependencyResolver { + void RegisterDefault() where TService : class where TImplementation : class, TService; + void RegisterPublisher(Func publisher); void RegisterController(Type type); } } diff --git a/IQueueRegistration.cs b/IQueueRegistration.cs deleted file mode 100644 index 901af4b..0000000 --- a/IQueueRegistration.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System.Threading.Tasks; -using RabbitMQ.Client; - -namespace Tapeti -{ - public interface IQueueRegistration - { - string BindQueue(IModel channel); - - bool Accept(object message); - Task Visit(object message); - } -} diff --git a/ISubscriber.cs b/ISubscriber.cs index 2be1a35..fae4328 100644 --- a/ISubscriber.cs +++ b/ISubscriber.cs @@ -1,6 +1,4 @@ -using System; - -namespace Tapeti +namespace Tapeti { public interface ISubscriber { diff --git a/MessageController.cs b/MessageController.cs new file mode 100644 index 0000000..a046171 --- /dev/null +++ b/MessageController.cs @@ -0,0 +1,16 @@ +using Tapeti.Annotations; + +namespace Tapeti +{ + /// + /// Base class for message controllers + /// + /// + /// Using this base class is not required, you can add the MessageController attribute + /// to any class. + /// + [MessageController] + public abstract class MessageController + { + } +} diff --git a/Properties/AssemblyInfo.cs b/Properties/AssemblyInfo.cs index f02ee9b..3428fd6 100644 --- a/Properties/AssemblyInfo.cs +++ b/Properties/AssemblyInfo.cs @@ -1,5 +1,4 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following diff --git a/README.md b/README.md index 3c2b121..2ac4bfc 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,14 @@ -'Small to medium-sized and classified as "Least Concern" by the IUCN.' +# Tapeti +> 'Small to medium-sized and classified as "Least Concern" by the IUCN.' +> +> [_Wikipedia_](https://en.wikipedia.org/wiki/Tapeti) -- Wikipedia \ No newline at end of file +Tapeti is a wrapper for the RabbitMQ .NET client designed for long-running microservices with a few specific goals: + +1. Automatic registration of message handlers +2. Publishing without transport details + * Routing key generated based on class name + * One exchange (per service / group of services) to publish them all +3. Attribute based, no base class requirements (only for convenience) +4. Graceful handling of connection issues, even at startup +5. Basic Saga support \ No newline at end of file diff --git a/Registration/AbstractControllerRegistration.cs b/Registration/AbstractControllerRegistration.cs deleted file mode 100644 index f0b2c64..0000000 --- a/Registration/AbstractControllerRegistration.cs +++ /dev/null @@ -1,142 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Threading.Tasks; -using RabbitMQ.Client; -using Tapeti.Annotations; - -namespace Tapeti.Registration -{ - using MessageHandlerAction = Func; - - public struct MessageHandler - { - public MessageHandlerAction Action; - public string Exchange; - public string RoutingKey; - } - - - public abstract class AbstractControllerRegistration : IQueueRegistration - { - private readonly Func controllerFactoryFactory; - private readonly Type controllerType; - private readonly string defaultExchange; - private readonly Dictionary> messageHandlers = new Dictionary>(); - - - protected AbstractControllerRegistration(Func controllerFactoryFactory, Type controllerType, string defaultExchange) - { - this.controllerFactoryFactory = controllerFactoryFactory; - this.controllerType = controllerType; - this.defaultExchange = defaultExchange; - - // ReSharper disable once VirtualMemberCallInConstructor - I know. What do you think this is, C++? - GetMessageHandlers(controllerType, (type, handler) => - { - if (!messageHandlers.ContainsKey(type)) - messageHandlers.Add(type, new List { handler }); - else - messageHandlers[type].Add(handler); - }); - } - - - protected virtual void GetMessageHandlers(Type type, Action add) - { - foreach (var method in type.GetMembers(BindingFlags.Public | BindingFlags.Instance) - .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object)) - .Select(m => (MethodInfo)m)) - { - Type messageType; - var messageHandler = GetMessageHandler(method, out messageType); - - add(messageType, messageHandler); - } - } - - - protected virtual MessageHandler GetMessageHandler(MethodInfo method, out Type messageType) - { - var parameters = method.GetParameters(); - - if (parameters.Length != 1 || !parameters[0].ParameterType.IsClass) - throw new ArgumentException($"Method {method.Name} does not have a single object parameter"); - - messageType = parameters[0].ParameterType; - var messageHandler = new MessageHandler(); - - if (method.ReturnType == typeof(void)) - messageHandler.Action = CreateSyncMessageHandler(method); - else if (method.ReturnType == typeof(Task)) - messageHandler.Action = CreateAsyncMessageHandler(method); - else - throw new ArgumentException($"Method {method.Name} needs to return void or a Task"); - - var exchangeAttribute = method.GetCustomAttribute() ?? method.DeclaringType.GetCustomAttribute(); - messageHandler.Exchange = exchangeAttribute?.Name; - - return messageHandler; - } - - - protected IEnumerable GetMessageTypes() - { - return messageHandlers.Keys; - } - - - protected IEnumerable GetMessageExchanges(Type type) - { - var exchanges = messageHandlers[type] - .Where(h => h.Exchange != null) - .Select(h => h.Exchange) - .Distinct(StringComparer.InvariantCulture) - .ToArray(); - - return exchanges.Length > 0 ? exchanges : new[] { defaultExchange }; - } - - - public abstract string BindQueue(IModel channel); - - - public bool Accept(object message) - { - return messageHandlers.ContainsKey(message.GetType()); - } - - - public Task Visit(object message) - { - var registeredHandlers = messageHandlers[message.GetType()]; - if (registeredHandlers != null) - return Task.WhenAll(registeredHandlers.Select(messageHandler => messageHandler.Action(message))); - - return Task.CompletedTask; - } - - - protected virtual MessageHandlerAction CreateSyncMessageHandler(MethodInfo method) - { - return message => - { - var controller = controllerFactoryFactory().CreateController(controllerType); - method.Invoke(controller, new[] { message }); - - return Task.CompletedTask; - }; - } - - - protected virtual MessageHandlerAction CreateAsyncMessageHandler(MethodInfo method) - { - return message => - { - var controller = controllerFactoryFactory().CreateController(controllerType); - return (Task)method.Invoke(controller, new[] { message }); - }; - } - } -} diff --git a/Registration/ControllerDynamicQueueRegistration.cs b/Registration/ControllerDynamicQueueRegistration.cs deleted file mode 100644 index ad7f787..0000000 --- a/Registration/ControllerDynamicQueueRegistration.cs +++ /dev/null @@ -1,33 +0,0 @@ -using System; -using RabbitMQ.Client; - -namespace Tapeti.Registration -{ - public class ControllerDynamicQueueRegistration : AbstractControllerRegistration - { - private readonly Func routingKeyStrategyFactory; - - - public ControllerDynamicQueueRegistration(Func controllerFactoryFactory, Func routingKeyStrategyFactory, Type controllerType, string defaultExchange) - : base(controllerFactoryFactory, controllerType, defaultExchange) - { - this.routingKeyStrategyFactory = routingKeyStrategyFactory; - } - - - public override string BindQueue(IModel channel) - { - var queue = channel.QueueDeclare(); - - foreach (var messageType in GetMessageTypes()) - { - var routingKey = routingKeyStrategyFactory().GetRoutingKey(messageType); - - foreach (var exchange in GetMessageExchanges(messageType)) - channel.QueueBind(queue.QueueName, exchange, routingKey); - } - - return queue.QueueName; - } - } -} diff --git a/Registration/ControllerQueueRegistration.cs b/Registration/ControllerQueueRegistration.cs deleted file mode 100644 index 3be95d5..0000000 --- a/Registration/ControllerQueueRegistration.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using RabbitMQ.Client; - -namespace Tapeti.Registration -{ - public class ControllerQueueRegistration : AbstractControllerRegistration - { - private readonly string queueName; - - public ControllerQueueRegistration(Func controllerFactoryFactory, Type controllerType, string defaultExchange, string queueName) : base(controllerFactoryFactory, controllerType, defaultExchange) - { - this.queueName = queueName; - } - - - public override string BindQueue(IModel channel) - { - return channel.QueueDeclarePassive(queueName).QueueName; - } - } -} diff --git a/Tapeti.Saga/ISaga.cs b/Tapeti.Saga/ISaga.cs new file mode 100644 index 0000000..bd9f01a --- /dev/null +++ b/Tapeti.Saga/ISaga.cs @@ -0,0 +1,13 @@ +using System; + +namespace Tapeti.Saga +{ + public interface ISaga : IDisposable where T : class + { + string Id { get; } + T State { get; } + + void ExpectResponse(string callId); + void ResolveResponse(string callId); + } +} diff --git a/Tapeti.Saga/ISagaProvider.cs b/Tapeti.Saga/ISagaProvider.cs new file mode 100644 index 0000000..b6d3d63 --- /dev/null +++ b/Tapeti.Saga/ISagaProvider.cs @@ -0,0 +1,11 @@ +using System.Threading.Tasks; + +namespace Tapeti.Saga +{ + public interface ISagaProvider + { + Task> Begin() where T : class; + Task> Continue(string sagaId) where T : class; + Task> Current() where T : class; + } +} diff --git a/Tapeti.Saga/ISagaStore.cs b/Tapeti.Saga/ISagaStore.cs new file mode 100644 index 0000000..839be22 --- /dev/null +++ b/Tapeti.Saga/ISagaStore.cs @@ -0,0 +1,10 @@ +using System.Threading.Tasks; + +namespace Tapeti.Saga +{ + public interface ISagaStore + { + Task Read(string sagaId); + Task Update(string sagaId, object state); + } +} diff --git a/Tapeti.Saga/Properties/AssemblyInfo.cs b/Tapeti.Saga/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..785eee1 --- /dev/null +++ b/Tapeti.Saga/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Tapeti.Saga")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Hewlett-Packard Company")] +[assembly: AssemblyProduct("Tapeti.Saga")] +[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("f84ad920-d5a1-455d-aed5-2542b3a47b85")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Tapeti.Saga/SagaBindingMiddleware.cs b/Tapeti.Saga/SagaBindingMiddleware.cs new file mode 100644 index 0000000..f6a056f --- /dev/null +++ b/Tapeti.Saga/SagaBindingMiddleware.cs @@ -0,0 +1,28 @@ +using System; +using System.Linq; +using Tapeti.Config; + +namespace Tapeti.Saga +{ + public class SagaBindingMiddleware : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + foreach (var parameter in context.Parameters.Where(p => + p.Info.ParameterType.IsGenericType && + p.Info.ParameterType.GetGenericTypeDefinition() == typeof(ISaga<>))) + { + parameter.SetBinding(messageContext => + { + object saga; + if (!messageContext.Items.TryGetValue("Saga", out saga)) + return null; + + return saga.GetType() == typeof(ISaga<>) ? saga : null; + }); + } + + next(); + } + } +} diff --git a/Tapeti.Saga/SagaMemoryStore.cs b/Tapeti.Saga/SagaMemoryStore.cs new file mode 100644 index 0000000..52ee511 --- /dev/null +++ b/Tapeti.Saga/SagaMemoryStore.cs @@ -0,0 +1,43 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Tapeti.Saga +{ + public class SagaMemoryStore : ISagaStore + { + private ISagaStore decoratedStore; + private readonly Dictionary values = new Dictionary(); + + + // Not a constructor to allow standard injection to work when using only the MemoryStore + public static SagaMemoryStore AsCacheFor(ISagaStore store) + { + return new SagaMemoryStore + { + decoratedStore = store + }; + } + + + public async Task Read(string sagaId) + { + object value; + + // ReSharper disable once InvertIf + if (!values.TryGetValue(sagaId, out value) && decoratedStore != null) + { + value = await decoratedStore.Read(sagaId); + values.Add(sagaId, value); + } + + return value; + } + + public async Task Update(string sagaId, object state) + { + values[sagaId] = state; + if (decoratedStore != null) + await decoratedStore.Update(sagaId, state); + } + } +} diff --git a/Tapeti.Saga/SagaMessageMiddleware.cs b/Tapeti.Saga/SagaMessageMiddleware.cs new file mode 100644 index 0000000..bef101a --- /dev/null +++ b/Tapeti.Saga/SagaMessageMiddleware.cs @@ -0,0 +1,22 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Saga +{ + public class SagaMessageMiddleware : IMessageMiddleware + { + private readonly IDependencyResolver dependencyResolver; + + + public SagaMessageMiddleware(IDependencyResolver dependencyResolver) + { + this.dependencyResolver = dependencyResolver; + } + + public void Handle(IMessageContext context, Action next) + { + context.Items["Saga"] = dependencyResolver.Resolve().Continue(""); + next(); + } + } +} diff --git a/Tapeti.Saga/SagaMiddleware.cs b/Tapeti.Saga/SagaMiddleware.cs new file mode 100644 index 0000000..405a29f --- /dev/null +++ b/Tapeti.Saga/SagaMiddleware.cs @@ -0,0 +1,16 @@ +using System.Collections.Generic; +using Tapeti.Config; + +namespace Tapeti.Saga +{ + public class SagaMiddleware : IMiddlewareBundle + { + public IEnumerable GetContents(IDependencyResolver dependencyResolver) + { + (dependencyResolver as IDependencyInjector)?.RegisterDefault(); + + yield return new SagaBindingMiddleware(); + yield return new SagaMessageMiddleware(dependencyResolver); + } + } +} diff --git a/Tapeti.Saga/SagaProvider.cs b/Tapeti.Saga/SagaProvider.cs new file mode 100644 index 0000000..b21ece8 --- /dev/null +++ b/Tapeti.Saga/SagaProvider.cs @@ -0,0 +1,90 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace Tapeti.Saga +{ + public class SagaProvider : ISagaProvider + { + protected static readonly ConcurrentDictionary SagaLocks = new ConcurrentDictionary(); + private readonly ISagaStore store; + + public SagaProvider(ISagaStore store) + { + this.store = store; + } + + + public async Task> Begin(T initialState) where T : class + { + var saga = await Saga.Create(() => Task.FromResult(initialState)); + await store.Update(saga.Id, saga.State); + + return saga; + } + + public async Task> Continue(string sagaId) where T : class + { + return await Saga.Create(async () => await store.Read(sagaId) as T, sagaId); + } + + public async Task Continue(string sagaId) + { + return new Saga + { + Id = sagaId, + State = await store.Read(sagaId) + }; + } + + + protected class Saga : ISaga where T : class + { + private bool disposed; + + public string Id { get; set; } + public T State { get; set; } + + + public static async Task> Create(Func> getState, string id = null) + { + var sagaId = id ?? Guid.NewGuid().ToString(); + await SagaLocks.GetOrAdd(sagaId, new SemaphoreSlim(1)).WaitAsync(); + + var saga = new Saga + { + Id = sagaId, + State = await getState() + }; + + return saga; + } + + + public void Dispose() + { + if (disposed) + return; + + SemaphoreSlim semaphore; + if (SagaLocks.TryGetValue(Id, out semaphore)) + semaphore.Release(); + + disposed = true; + } + + + public void ExpectResponse(string callId) + { + throw new NotImplementedException(); + } + + + public void ResolveResponse(string callId) + { + throw new NotImplementedException(); + } + } + } +} diff --git a/Tapeti.Saga/Tapeti.Saga.csproj b/Tapeti.Saga/Tapeti.Saga.csproj new file mode 100644 index 0000000..525f86d --- /dev/null +++ b/Tapeti.Saga/Tapeti.Saga.csproj @@ -0,0 +1,56 @@ + + + + + Debug + AnyCPU + {F84AD920-D5A1-455D-AED5-2542B3A47B85} + Library + Properties + Tapeti.Saga + Tapeti.Saga + v4.5.2 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs b/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs index e01df40..06f8dc0 100644 --- a/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs +++ b/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs @@ -1,5 +1,4 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following diff --git a/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs b/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs deleted file mode 100644 index 8f32b3a..0000000 --- a/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; -using SimpleInjector; - -namespace Tapeti.SimpleInjector -{ - public class SimpleInjectorControllerFactory : IControllerFactory - { - private readonly Container container; - - - public SimpleInjectorControllerFactory(Container container) - { - this.container = container; - } - - - public object CreateController(Type controllerType) - { - return container.GetInstance(controllerType); - } - } -} diff --git a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs index db5e01e..6b74af3 100644 --- a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs +++ b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs @@ -1,22 +1,16 @@ -using System.Linq; -using System.Reflection; +using System; +using System.Linq; using SimpleInjector; -using Tapeti.Annotations; -using Tapeti.Default; -using System.Collections.Generic; namespace Tapeti.SimpleInjector { - public class SimpleInjectorDependencyResolver : IDependencyResolver + public class SimpleInjectorDependencyResolver : IDependencyInjector { private readonly Container container; - public SimpleInjectorDependencyResolver(Container container, bool registerDefaults = true) + public SimpleInjectorDependencyResolver(Container container) { this.container = container; - - if (registerDefaults) - RegisterDefaults(); } public T Resolve() where T : class @@ -24,33 +18,31 @@ namespace Tapeti.SimpleInjector return container.GetInstance(); } - - public SimpleInjectorDependencyResolver RegisterDefaults() + public object Resolve(Type type) { - var currentRegistrations = container.GetCurrentRegistrations(); - - IfUnregistered(currentRegistrations); - IfUnregistered(currentRegistrations); - IfUnregistered(currentRegistrations); - - return this; + return container.GetInstance(type); } - public SimpleInjectorDependencyResolver RegisterAllControllers(Assembly assembly) - { - foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute)))) - container.Register(type); - - return this; - } - - - private void IfUnregistered(IEnumerable currentRegistrations) where TService : class where TImplementation: class, TService + public void RegisterDefault() where TService : class where TImplementation : class, TService { // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates - if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService))) + if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(TService))) container.Register(); } + + + public void RegisterPublisher(Func publisher) + { + // ReSharper disable once SimplifyLinqExpression - still not a fan of negative predicates + if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(IPublisher))) + container.Register(publisher); + } + + + public void RegisterController(Type type) + { + container.Register(type); + } } } diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj index f91dc29..3addf45 100644 --- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj @@ -45,7 +45,6 @@ - diff --git a/Tapeti.csproj b/Tapeti.csproj index ca4d60b..e417e0f 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -50,38 +50,47 @@ - + + + + + + - + + + + + + + + - - - - - - - + + + +