From e62aa7d482ac88b7ef9c5a01292c3de8a46f7dc7 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 5 Dec 2016 08:00:09 +0100 Subject: [PATCH 1/4] Started refactoring, not in any usable state yet --- Annotations/DynamicQueueAttribute.cs | 13 +++++++ Annotations/MessageControllerAttribute.cs | 9 +++++ Annotations/QueueAttribute.cs | 18 --------- Annotations/StaticQueueAttribute.cs | 25 +++++++++++++ Connection/TapetiPublisher.cs | 11 +++--- Connection/TapetiSubscriber.cs | 11 +++--- Default/DefaultDependencyResolver.cs | 12 ++++-- IDependencyResolver.cs | 1 + MessageController.cs | 16 ++++++++ README.md | 15 +++++++- .../SimpleInjectorDependencyResolver.cs | 33 +++++++++++------ Tapeti.csproj | 5 ++- TapetiConnection.cs | 24 +++++++++--- TapetiConnectionExtensions.cs | 2 +- Test/MarcoEmitter.cs | 37 +++++++++++++++++++ Test/Program.cs | 13 ++----- Test/Test.csproj | 1 + Test/TestQueueController.cs | 10 ++--- 18 files changed, 189 insertions(+), 67 deletions(-) create mode 100644 Annotations/DynamicQueueAttribute.cs create mode 100644 Annotations/MessageControllerAttribute.cs delete mode 100644 Annotations/QueueAttribute.cs create mode 100644 Annotations/StaticQueueAttribute.cs create mode 100644 MessageController.cs create mode 100644 Test/MarcoEmitter.cs diff --git a/Annotations/DynamicQueueAttribute.cs b/Annotations/DynamicQueueAttribute.cs new file mode 100644 index 0000000..3d730c9 --- /dev/null +++ b/Annotations/DynamicQueueAttribute.cs @@ -0,0 +1,13 @@ +using System; + +namespace Tapeti.Annotations +{ + /// + /// Creates a non-durable auto-delete queue to receive messages. Can be used + /// on an entire MessageController class or on individual methods. + /// + [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + public class DynamicQueueAttribute : Attribute + { + } +} diff --git a/Annotations/MessageControllerAttribute.cs b/Annotations/MessageControllerAttribute.cs new file mode 100644 index 0000000..1f419f2 --- /dev/null +++ b/Annotations/MessageControllerAttribute.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Annotations +{ + [AttributeUsage(AttributeTargets.Class)] + public class MessageControllerAttribute : Attribute + { + } +} diff --git a/Annotations/QueueAttribute.cs b/Annotations/QueueAttribute.cs deleted file mode 100644 index 878390d..0000000 --- a/Annotations/QueueAttribute.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System; - -namespace Tapeti.Annotations -{ - [AttributeUsage(AttributeTargets.Class)] - public class QueueAttribute : Attribute - { - public string Name { get; set; } - public bool Dynamic { get; set; } - - - public QueueAttribute(string name = null) - { - Name = name; - Dynamic = (name == null); - } - } -} diff --git a/Annotations/StaticQueueAttribute.cs b/Annotations/StaticQueueAttribute.cs new file mode 100644 index 0000000..2a9c6b1 --- /dev/null +++ b/Annotations/StaticQueueAttribute.cs @@ -0,0 +1,25 @@ +using System; + +namespace Tapeti.Annotations +{ + /// + /// Binds to an existing durable queue to receive messages. Can be used + /// on an entire MessageController class or on individual methods. + /// + /// + /// At the moment there is no support for creating a durable queue and managing the + /// bindings. The author recommends https://git.x2software.net/pub/RabbitMetaQueue + /// for deploy-time management of durable queues (shameless plug intended). + /// + [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + public class StaticQueueAttribute : Attribute + { + public string Name { get; set; } + + + public StaticQueueAttribute(string name) + { + Name = name; + } + } +} diff --git a/Connection/TapetiPublisher.cs b/Connection/TapetiPublisher.cs index ca021f9..4143cf5 100644 --- a/Connection/TapetiPublisher.cs +++ b/Connection/TapetiPublisher.cs @@ -1,21 +1,22 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; namespace Tapeti.Connection { public class TapetiPublisher : IPublisher { - private readonly TapetiWorker worker; + private readonly Func workerFactory; - public TapetiPublisher(TapetiWorker worker) + public TapetiPublisher(Func workerFactory) { - this.worker = worker; + this.workerFactory = workerFactory; } public Task Publish(object message) { - return worker.Publish(message); + return workerFactory().Publish(message); } } } diff --git a/Connection/TapetiSubscriber.cs b/Connection/TapetiSubscriber.cs index baafe9d..9d487f3 100644 --- a/Connection/TapetiSubscriber.cs +++ b/Connection/TapetiSubscriber.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -6,18 +7,18 @@ namespace Tapeti.Connection { public class TapetiSubscriber : ISubscriber { - private readonly TapetiWorker worker; + private readonly Func workerFactory; - public TapetiSubscriber(TapetiWorker worker) + public TapetiSubscriber(Func workerFactory) { - this.worker = worker; + this.workerFactory = workerFactory; } public async Task BindQueues(IEnumerable registrations) { - await Task.WhenAll(registrations.Select(registration => worker.Subscribe(registration)).ToList()); + await Task.WhenAll(registrations.Select(registration => workerFactory().Subscribe(registration)).ToList()); } } } diff --git a/Default/DefaultDependencyResolver.cs b/Default/DefaultDependencyResolver.cs index 062bab7..066307e 100644 --- a/Default/DefaultDependencyResolver.cs +++ b/Default/DefaultDependencyResolver.cs @@ -15,12 +15,12 @@ namespace Tapeti.Default private readonly Lazy routingKeyStrategy = new Lazy(); private readonly Lazy messageSerializer = new Lazy(); private readonly Lazy logger; + private IPublisher publisher; - - public DefaultDependencyResolver(Func publisherFactory) + public DefaultDependencyResolver() { - controllerFactory = new Lazy(() => new DefaultControllerFactory(publisherFactory)); + controllerFactory = new Lazy(() => new DefaultControllerFactory(() => publisher)); logger = new Lazy(() => { @@ -58,6 +58,12 @@ namespace Tapeti.Default } + public void RegisterPublisher(IPublisher value) + { + publisher = value; + } + + public void RegisterController(Type type) { controllerFactory.Value.RegisterController(type); diff --git a/IDependencyResolver.cs b/IDependencyResolver.cs index e04aa84..a177d25 100644 --- a/IDependencyResolver.cs +++ b/IDependencyResolver.cs @@ -10,6 +10,7 @@ namespace Tapeti public interface IDependencyInjector : IDependencyResolver { + void RegisterPublisher(IPublisher publisher); void RegisterController(Type type); } } diff --git a/MessageController.cs b/MessageController.cs new file mode 100644 index 0000000..a046171 --- /dev/null +++ b/MessageController.cs @@ -0,0 +1,16 @@ +using Tapeti.Annotations; + +namespace Tapeti +{ + /// + /// Base class for message controllers + /// + /// + /// Using this base class is not required, you can add the MessageController attribute + /// to any class. + /// + [MessageController] + public abstract class MessageController + { + } +} diff --git a/README.md b/README.md index 3c2b121..2ac4bfc 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,14 @@ -'Small to medium-sized and classified as "Least Concern" by the IUCN.' +# Tapeti +> 'Small to medium-sized and classified as "Least Concern" by the IUCN.' +> +> [_Wikipedia_](https://en.wikipedia.org/wiki/Tapeti) -- Wikipedia \ No newline at end of file +Tapeti is a wrapper for the RabbitMQ .NET client designed for long-running microservices with a few specific goals: + +1. Automatic registration of message handlers +2. Publishing without transport details + * Routing key generated based on class name + * One exchange (per service / group of services) to publish them all +3. Attribute based, no base class requirements (only for convenience) +4. Graceful handling of connection issues, even at startup +5. Basic Saga support \ No newline at end of file diff --git a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs index db5e01e..7dc3f21 100644 --- a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs +++ b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs @@ -1,4 +1,5 @@ -using System.Linq; +using System; +using System.Linq; using System.Reflection; using SimpleInjector; using Tapeti.Annotations; @@ -7,7 +8,7 @@ using System.Collections.Generic; namespace Tapeti.SimpleInjector { - public class SimpleInjectorDependencyResolver : IDependencyResolver + public class SimpleInjectorDependencyResolver : IDependencyResolver, IDependencyInjector { private readonly Container container; @@ -25,6 +26,18 @@ namespace Tapeti.SimpleInjector } + public void RegisterPublisher(IPublisher publisher) + { + IfUnregistered(container.GetCurrentRegistrations(), () => container.RegisterSingleton(publisher)); + } + + + public void RegisterController(Type type) + { + container.Register(type); + } + + public SimpleInjectorDependencyResolver RegisterDefaults() { var currentRegistrations = container.GetCurrentRegistrations(); @@ -37,20 +50,18 @@ namespace Tapeti.SimpleInjector } - public SimpleInjectorDependencyResolver RegisterAllControllers(Assembly assembly) - { - foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute)))) - container.Register(type); - - return this; - } - - private void IfUnregistered(IEnumerable currentRegistrations) where TService : class where TImplementation: class, TService { // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService))) container.Register(); } + + private void IfUnregistered(IEnumerable currentRegistrations, Action register) where TService : class + { + // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates + if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService))) + register(); + } } } diff --git a/Tapeti.csproj b/Tapeti.csproj index ca4d60b..f81dac3 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -50,7 +50,9 @@ - + + + @@ -58,6 +60,7 @@ + diff --git a/TapetiConnection.cs b/TapetiConnection.cs index 0a7a5cc..a4139c9 100644 --- a/TapetiConnection.cs +++ b/TapetiConnection.cs @@ -19,8 +19,20 @@ namespace Tapeti public IDependencyResolver DependencyResolver { - get { return dependencyResolver ?? (dependencyResolver = new DefaultDependencyResolver(GetPublisher)); } - set { dependencyResolver = value; } + get + { + if (dependencyResolver == null) + DependencyResolver = new DefaultDependencyResolver(); + + return dependencyResolver; + } + set + { + dependencyResolver = value; + + var dependencyInjector = value as IDependencyInjector; + dependencyInjector?.RegisterPublisher(GetPublisher()); + } } @@ -43,14 +55,14 @@ namespace Tapeti public TapetiConnection WithDependencyResolver(IDependencyResolver resolver) { - dependencyResolver = resolver; + DependencyResolver = resolver; return this; } public TapetiConnection RegisterController(Type type) { - var queueAttribute = type.GetCustomAttribute(); + var queueAttribute = type.GetCustomAttribute(); if (queueAttribute == null) throw new ArgumentException("Queue attribute required on class", nameof(type)); @@ -84,7 +96,7 @@ namespace Tapeti if (!registrations.IsValueCreated || registrations.Value.Count == 0) throw new ArgumentException("No controllers registered"); - var subscriber = new TapetiSubscriber(worker.Value); + var subscriber = new TapetiSubscriber(() => worker.Value); await subscriber.BindQueues(registrations.Value); return subscriber; @@ -93,7 +105,7 @@ namespace Tapeti public IPublisher GetPublisher() { - return new TapetiPublisher(worker.Value); + return new TapetiPublisher(() => worker.Value); } diff --git a/TapetiConnectionExtensions.cs b/TapetiConnectionExtensions.cs index 72a0fe2..70c8b02 100644 --- a/TapetiConnectionExtensions.cs +++ b/TapetiConnectionExtensions.cs @@ -8,7 +8,7 @@ namespace Tapeti { public static TapetiConnection RegisterAllControllers(this TapetiConnection connection, Assembly assembly) { - foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute)))) + foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(DynamicQueueAttribute)))) connection.RegisterController(type); return connection; diff --git a/Test/MarcoEmitter.cs b/Test/MarcoEmitter.cs new file mode 100644 index 0000000..4fb8561 --- /dev/null +++ b/Test/MarcoEmitter.cs @@ -0,0 +1,37 @@ +using System.Threading; +using System.Threading.Tasks; +using Tapeti; + +namespace Test +{ + public class MarcoEmitter + { + private readonly IPublisher publisher; + + + public MarcoEmitter(IPublisher publisher) + { + this.publisher = publisher; + } + + + public async Task Run() + { + var concurrent = new SemaphoreSlim(20); + + //for (var x = 0; x < 5000; x++) + while (true) + { + await concurrent.WaitAsync(); + try + { + await publisher.Publish(new MarcoMessage()); + } + finally + { + concurrent.Release(); + } + } + } + } +} diff --git a/Test/Program.cs b/Test/Program.cs index 6e6fe03..d27f8c8 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -19,19 +19,14 @@ namespace Test .WithDependencyResolver(new SimpleInjectorDependencyResolver(container)) .RegisterAllControllers(typeof(Program).Assembly)) { - container.Register(() => connection.GetPublisher()); - + container.Register(); + Console.WriteLine("Subscribing..."); connection.Subscribe().Wait(); Console.WriteLine("Done!"); - var publisher = connection.GetPublisher(); - - //for (var x = 0; x < 5000; x++) - while(true) - publisher.Publish(new MarcoMessage()).Wait(); - - //Console.ReadLine(); + var emitter = container.GetInstance(); + emitter.Run().Wait(); } } } diff --git a/Test/Test.csproj b/Test/Test.csproj index d60e49b..98eb7c2 100644 --- a/Test/Test.csproj +++ b/Test/Test.csproj @@ -48,6 +48,7 @@ + diff --git a/Test/TestQueueController.cs b/Test/TestQueueController.cs index 2eed944..09f779b 100644 --- a/Test/TestQueueController.cs +++ b/Test/TestQueueController.cs @@ -5,10 +5,8 @@ using Tapeti.Annotations; namespace Test { - //[Exchange("myexchange")] - //[Queue("staticqueue")] - [Queue] - public class TestQueueController + [DynamicQueue] + public class TestQueueController : MessageController { private readonly IPublisher publisher; @@ -19,10 +17,10 @@ namespace Test } - public async Task Marco(MarcoMessage message) + public PoloMessage Marco(MarcoMessage message) { Console.WriteLine("Marco!"); - await publisher.Publish(new PoloMessage()); + return new PoloMessage(); } From 68399db0954b94372de1b4fc877fc6a75ecfe6ea Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 5 Dec 2016 23:41:17 +0100 Subject: [PATCH 2/4] Some more interface foundations --- Connection/TapetiWorker.cs | 2 +- IConnection.cs | 9 +++ ITopology.cs | 20 +++++ Tapeti.csproj | 5 +- TapetiConnection.cs | 31 -------- TapetiConnectionBuilder.cs | 30 +++++++ TapetiConnectionExtensions.cs | 23 ------ TapetiTopologyBuilder.cs | 146 ++++++++++++++++++++++++++++++++++ Test/Program.cs | 15 ++-- Test/TestQueueController.cs | 1 - 10 files changed, 218 insertions(+), 64 deletions(-) create mode 100644 IConnection.cs create mode 100644 ITopology.cs create mode 100644 TapetiConnectionBuilder.cs delete mode 100644 TapetiConnectionExtensions.cs create mode 100644 TapetiTopologyBuilder.cs diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index c4b13ef..99c4b2f 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -16,7 +16,7 @@ namespace Tapeti.Connection private readonly IMessageSerializer messageSerializer; private readonly IRoutingKeyStrategy routingKeyStrategy; private readonly Lazy taskQueue = new Lazy(); - private IConnection connection; + private RabbitMQ.Client.IConnection connection; private IModel channel; diff --git a/IConnection.cs b/IConnection.cs new file mode 100644 index 0000000..92f5ce5 --- /dev/null +++ b/IConnection.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti +{ + public interface IConnection : IDisposable + { + ISubscriber Subscribe(); + } +} diff --git a/ITopology.cs b/ITopology.cs new file mode 100644 index 0000000..05c1934 --- /dev/null +++ b/ITopology.cs @@ -0,0 +1,20 @@ +using System.Collections.Generic; + +namespace Tapeti +{ + public interface ITopology + { + IEnumerable Queues(); + } + + + public interface IQueue + { + IEnumerable Bindings(); + } + + + public interface IBinding + { + } +} diff --git a/Tapeti.csproj b/Tapeti.csproj index f81dac3..79e761d 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -59,10 +59,13 @@ + + - + + diff --git a/TapetiConnection.cs b/TapetiConnection.cs index a4139c9..7e36e20 100644 --- a/TapetiConnection.cs +++ b/TapetiConnection.cs @@ -60,37 +60,6 @@ namespace Tapeti } - public TapetiConnection RegisterController(Type type) - { - var queueAttribute = type.GetCustomAttribute(); - if (queueAttribute == null) - throw new ArgumentException("Queue attribute required on class", nameof(type)); - - if (queueAttribute.Dynamic) - { - if (!string.IsNullOrEmpty(queueAttribute.Name)) - throw new ArgumentException("Dynamic queue attributes must not have a Name"); - - registrations.Value.Add(new ControllerDynamicQueueRegistration( - DependencyResolver.Resolve, - DependencyResolver.Resolve, - type, SubscribeExchange)); - } - else - { - if (string.IsNullOrEmpty(queueAttribute.Name)) - throw new ArgumentException("Non-dynamic queue attribute must have a Name"); - - registrations.Value.Add(new ControllerQueueRegistration( - DependencyResolver.Resolve, - type, SubscribeExchange, queueAttribute.Name)); - } - - (DependencyResolver as IDependencyInjector)?.RegisterController(type); - return this; - } - - public async Task Subscribe() { if (!registrations.IsValueCreated || registrations.Value.Count == 0) diff --git a/TapetiConnectionBuilder.cs b/TapetiConnectionBuilder.cs new file mode 100644 index 0000000..0f71cfe --- /dev/null +++ b/TapetiConnectionBuilder.cs @@ -0,0 +1,30 @@ +using System; + +namespace Tapeti +{ + public class TapetiConnectionBuilder + { + public IConnection Build() + { + throw new NotImplementedException(); + } + + + public TapetiConnectionBuilder SetExchange(string exchange) + { + return this; + } + + + public TapetiConnectionBuilder SetDependencyResolver(IDependencyResolver dependencyResolver) + { + return this; + } + + + public TapetiConnectionBuilder SetTopology(ITopology topology) + { + return this; + } + } +} diff --git a/TapetiConnectionExtensions.cs b/TapetiConnectionExtensions.cs deleted file mode 100644 index 70c8b02..0000000 --- a/TapetiConnectionExtensions.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System.Linq; -using System.Reflection; -using Tapeti.Annotations; - -namespace Tapeti -{ - public static class TapetiConnectionExtensions - { - public static TapetiConnection RegisterAllControllers(this TapetiConnection connection, Assembly assembly) - { - foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(DynamicQueueAttribute)))) - connection.RegisterController(type); - - return connection; - } - - - public static TapetiConnection RegisterAllControllers(this TapetiConnection connection) - { - return RegisterAllControllers(connection, Assembly.GetCallingAssembly()); - } - } -} diff --git a/TapetiTopologyBuilder.cs b/TapetiTopologyBuilder.cs new file mode 100644 index 0000000..aefe9dd --- /dev/null +++ b/TapetiTopologyBuilder.cs @@ -0,0 +1,146 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using Tapeti.Annotations; + + +namespace Tapeti +{ + public class TopologyConfigurationException : Exception + { + public TopologyConfigurationException(string message) : base(message) { } + } + + + public class TapetiTopologyBuilder + { + private readonly List registrations = new List(); + + + public ITopology Build() + { + throw new NotImplementedException(); + } + + + public TapetiTopologyBuilder RegisterController(Type controller) + { + var controllerRegistration = GetAttributesRegistration(controller); + + foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance) + .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object)) + .Select(m => (MethodInfo)m)) + { + } + + /* + if (queueAttribute.Dynamic) + { + if (!string.IsNullOrEmpty(queueAttribute.Name)) + throw new ArgumentException("Dynamic queue attributes must not have a Name"); + + registrations.Value.Add(new ControllerDynamicQueueRegistration( + DependencyResolver.Resolve, + DependencyResolver.Resolve, + type, SubscribeExchange)); + } + else + { + if (string.IsNullOrEmpty(queueAttribute.Name)) + throw new ArgumentException("Non-dynamic queue attribute must have a Name"); + + registrations.Value.Add(new ControllerQueueRegistration( + DependencyResolver.Resolve, + type, SubscribeExchange, queueAttribute.Name)); + } + + (DependencyResolver as IDependencyInjector)?.RegisterController(type); + */ + return this; + } + + + public TapetiTopologyBuilder RegisterAllControllers(Assembly assembly) + { + foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(MessageControllerAttribute)))) + RegisterController(type); + + return this; + } + + + public TapetiTopologyBuilder RegisterAllControllers() + { + return RegisterAllControllers(Assembly.GetCallingAssembly()); + } + + + protected HandlerRegistration GetAttributesRegistration(MemberInfo member) + { + var registration = new HandlerRegistration(); + + var dynamicQueueAttribute = member.GetCustomAttribute(); + var staticQueueAttribute = member.GetCustomAttribute(); + + if (dynamicQueueAttribute != null && staticQueueAttribute != null) + throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on {member.Name}"); + + if (dynamicQueueAttribute != null) + registration.Dynamic = true; + else if (staticQueueAttribute != null) + { + registration.Dynamic = false; + registration.QueueName = staticQueueAttribute.Name; + } + + return registration; + } + + + protected class HandlerRegistration + { + public bool? Dynamic { get; set; } + public string QueueName { get; set; } + } + + + protected class Topology : ITopology + { + private readonly List queues = new List(); + + + public void Add(Queue queue) + { + queues.Add(queue); + } + + public IEnumerable Queues() + { + return queues; + } + } + + + protected class Queue : IQueue + { + private readonly List bindings = new List(); + + + public void Add(Binding binding) + { + bindings.Add(binding); + } + + public IEnumerable Bindings() + { + return bindings; + } + } + + + protected class Binding : IBinding + { + } + } +} diff --git a/Test/Program.cs b/Test/Program.cs index d27f8c8..e20454d 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -11,13 +11,14 @@ namespace Test { var container = new Container(); - using (var connection = new TapetiConnection - { - PublishExchange = "test", - SubscribeExchange = "test" - } - .WithDependencyResolver(new SimpleInjectorDependencyResolver(container)) - .RegisterAllControllers(typeof(Program).Assembly)) + using (var connection = new TapetiConnectionBuilder() + .SetExchange("test") + .SetDependencyResolver(new SimpleInjectorDependencyResolver(container)) + .SetTopology( + new TapetiTopologyBuilder() + .RegisterAllControllers() + .Build()) + .Build()) { container.Register(); diff --git a/Test/TestQueueController.cs b/Test/TestQueueController.cs index 09f779b..c408bce 100644 --- a/Test/TestQueueController.cs +++ b/Test/TestQueueController.cs @@ -1,5 +1,4 @@ using System; -using System.Threading.Tasks; using Tapeti; using Tapeti.Annotations; From dfd480444e401410a21cbd1c62bf2c55a668ce73 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 7 Dec 2016 10:19:16 +0100 Subject: [PATCH 3/4] Added Saga interfaces More mockup code --- IConnection.cs | 3 +- Tapeti.Saga/ISaga.cs | 13 ++++ Tapeti.Saga/ISagaProvider.cs | 11 ++++ Tapeti.Saga/ISagaStore.cs | 10 +++ Tapeti.Saga/Properties/AssemblyInfo.cs | 36 ++++++++++ Tapeti.Saga/Tapeti.Saga.csproj | 56 ++++++++++++++++ Tapeti.sln | 6 ++ Test/MarcoController.cs | 91 ++++++++++++++++++++++++++ Test/Program.cs | 15 +++-- Test/Test.csproj | 2 +- Test/TestQueueController.cs | 41 ------------ 11 files changed, 234 insertions(+), 50 deletions(-) create mode 100644 Tapeti.Saga/ISaga.cs create mode 100644 Tapeti.Saga/ISagaProvider.cs create mode 100644 Tapeti.Saga/ISagaStore.cs create mode 100644 Tapeti.Saga/Properties/AssemblyInfo.cs create mode 100644 Tapeti.Saga/Tapeti.Saga.csproj create mode 100644 Test/MarcoController.cs delete mode 100644 Test/TestQueueController.cs diff --git a/IConnection.cs b/IConnection.cs index 92f5ce5..b9671be 100644 --- a/IConnection.cs +++ b/IConnection.cs @@ -1,9 +1,10 @@ using System; +using System.Threading.Tasks; namespace Tapeti { public interface IConnection : IDisposable { - ISubscriber Subscribe(); + Task Subscribe(); } } diff --git a/Tapeti.Saga/ISaga.cs b/Tapeti.Saga/ISaga.cs new file mode 100644 index 0000000..bd9f01a --- /dev/null +++ b/Tapeti.Saga/ISaga.cs @@ -0,0 +1,13 @@ +using System; + +namespace Tapeti.Saga +{ + public interface ISaga : IDisposable where T : class + { + string Id { get; } + T State { get; } + + void ExpectResponse(string callId); + void ResolveResponse(string callId); + } +} diff --git a/Tapeti.Saga/ISagaProvider.cs b/Tapeti.Saga/ISagaProvider.cs new file mode 100644 index 0000000..b6d3d63 --- /dev/null +++ b/Tapeti.Saga/ISagaProvider.cs @@ -0,0 +1,11 @@ +using System.Threading.Tasks; + +namespace Tapeti.Saga +{ + public interface ISagaProvider + { + Task> Begin() where T : class; + Task> Continue(string sagaId) where T : class; + Task> Current() where T : class; + } +} diff --git a/Tapeti.Saga/ISagaStore.cs b/Tapeti.Saga/ISagaStore.cs new file mode 100644 index 0000000..839be22 --- /dev/null +++ b/Tapeti.Saga/ISagaStore.cs @@ -0,0 +1,10 @@ +using System.Threading.Tasks; + +namespace Tapeti.Saga +{ + public interface ISagaStore + { + Task Read(string sagaId); + Task Update(string sagaId, object state); + } +} diff --git a/Tapeti.Saga/Properties/AssemblyInfo.cs b/Tapeti.Saga/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..785eee1 --- /dev/null +++ b/Tapeti.Saga/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Tapeti.Saga")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Hewlett-Packard Company")] +[assembly: AssemblyProduct("Tapeti.Saga")] +[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("f84ad920-d5a1-455d-aed5-2542b3a47b85")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Tapeti.Saga/Tapeti.Saga.csproj b/Tapeti.Saga/Tapeti.Saga.csproj new file mode 100644 index 0000000..525f86d --- /dev/null +++ b/Tapeti.Saga/Tapeti.Saga.csproj @@ -0,0 +1,56 @@ + + + + + Debug + AnyCPU + {F84AD920-D5A1-455D-AED5-2542B3A47B85} + Library + Properties + Tapeti.Saga + Tapeti.Saga + v4.5.2 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Tapeti.sln b/Tapeti.sln index 49530f0..16ebd8f 100644 --- a/Tapeti.sln +++ b/Tapeti.sln @@ -9,6 +9,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.SimpleInjector", "Ta EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Test", "Test\Test.csproj", "{90559950-1B32-4119-A78E-517E2C71EE23}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Saga", "Tapeti.Saga\Tapeti.Saga.csproj", "{F84AD920-D5A1-455D-AED5-2542B3A47B85}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -27,6 +29,10 @@ Global {90559950-1B32-4119-A78E-517E2C71EE23}.Debug|Any CPU.Build.0 = Debug|Any CPU {90559950-1B32-4119-A78E-517E2C71EE23}.Release|Any CPU.ActiveCfg = Release|Any CPU {90559950-1B32-4119-A78E-517E2C71EE23}.Release|Any CPU.Build.0 = Release|Any CPU + {F84AD920-D5A1-455D-AED5-2542B3A47B85}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F84AD920-D5A1-455D-AED5-2542B3A47B85}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F84AD920-D5A1-455D-AED5-2542B3A47B85}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F84AD920-D5A1-455D-AED5-2542B3A47B85}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs new file mode 100644 index 0000000..9c955eb --- /dev/null +++ b/Test/MarcoController.cs @@ -0,0 +1,91 @@ +using System; +using Microsoft.SqlServer.Server; +using Tapeti; +using Tapeti.Annotations; + +namespace Test +{ + [DynamicQueue] + public class MarcoController : MessageController + { + private readonly IPublisher publisher; + + + public MarcoController(IPublisher publisher/*, ISagaProvider sagaProvider*/) + { + this.publisher = publisher; + } + + + //[StaticQueue("test")] + public PoloMessage Marco(MarcoMessage message) + { + /* + using (sagaProvider.Begin(new MarcoState + { + ... + })) + { + //publisher.Publish(new PoloColorRequest(), saga, PoloColorResponse1); + //publisher.Publish(new PoloColorRequest(), saga, callID = "tweede"); + + // Saga refcount = 2 + } + */ + + return new PoloMessage(); ; + } + + + /* + [CallID("eerste")] + Implicit: + + using (sagaProvider.Continue(correlatieID)) + { + saga refcount--; + public void PoloColorResponse1(PoloColorResponse message, ISaga saga) + { + + saga.State == MarcoState + + + + state.Color = message.Color; + + if (state.Complete) + { + publisher.Publish(new PoloMessage()); + } + } + */ + + public void Polo(PoloMessage message) + { + Console.WriteLine("Polo!"); + } + } + + + public class MarcoMessage + { + } + + + public class PoloMessage + { + } + + + + + public class PoloColorRequest + { + + } + + public class PoloColorResponse + { + + } +} diff --git a/Test/Program.cs b/Test/Program.cs index e20454d..df7b22f 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -10,18 +10,19 @@ namespace Test private static void Main() { var container = new Container(); + container.Register(); + + + var topology = new TapetiTopologyBuilder() + .RegisterAllControllers() + .Build(); using (var connection = new TapetiConnectionBuilder() .SetExchange("test") .SetDependencyResolver(new SimpleInjectorDependencyResolver(container)) - .SetTopology( - new TapetiTopologyBuilder() - .RegisterAllControllers() - .Build()) + .SetTopology(topology) .Build()) - { - container.Register(); - + { Console.WriteLine("Subscribing..."); connection.Subscribe().Wait(); Console.WriteLine("Done!"); diff --git a/Test/Test.csproj b/Test/Test.csproj index 98eb7c2..a66fa9f 100644 --- a/Test/Test.csproj +++ b/Test/Test.csproj @@ -51,7 +51,7 @@ - + diff --git a/Test/TestQueueController.cs b/Test/TestQueueController.cs deleted file mode 100644 index c408bce..0000000 --- a/Test/TestQueueController.cs +++ /dev/null @@ -1,41 +0,0 @@ -using System; -using Tapeti; -using Tapeti.Annotations; - -namespace Test -{ - [DynamicQueue] - public class TestQueueController : MessageController - { - private readonly IPublisher publisher; - - - public TestQueueController(IPublisher publisher) - { - this.publisher = publisher; - } - - - public PoloMessage Marco(MarcoMessage message) - { - Console.WriteLine("Marco!"); - return new PoloMessage(); - } - - - public void Polo(PoloMessage message) - { - Console.WriteLine("Polo!"); - } - } - - - public class MarcoMessage - { - } - - - public class PoloMessage - { - } -} From 8f5160e860a43b843edab35b2843f9a6a2ee4e5e Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Sun, 11 Dec 2016 15:08:58 +0100 Subject: [PATCH 4/4] Back to a working state --- Config/IBindingContext.cs | 24 ++ Config/IBindingMiddleware.cs | 9 + Config/IConfig.cs | 35 ++ Config/IMessageContext.cs | 11 + Config/IMessageMiddleware.cs | 9 + Config/IMiddlewareBundle.cs | 9 + Connection/TapetiConsumer.cs | 53 ++- Connection/TapetiSubscriber.cs | 5 +- Connection/TapetiWorker.cs | 65 ++- Default/BindingBufferStop.cs | 13 + Default/DefaultControllerFactory.cs | 62 --- Default/DefaultDependencyResolver.cs | 72 ---- Default/DependencyResolverBinding.cs | 26 ++ Default/MessageBinding.cs | 23 + Helpers/ConsoleHelper.cs | 22 + Helpers/MiddlewareHelper.cs | 26 ++ IControllerFactory.cs | 9 - IDependencyResolver.cs | 4 +- IQueueRegistration.cs | 13 - ISubscriber.cs | 4 +- ITopology.cs | 20 - Properties/AssemblyInfo.cs | 1 - .../AbstractControllerRegistration.cs | 142 ------- .../ControllerDynamicQueueRegistration.cs | 33 -- Registration/ControllerQueueRegistration.cs | 21 - Tapeti.Saga/SagaBindingMiddleware.cs | 28 ++ Tapeti.Saga/SagaMemoryStore.cs | 43 ++ Tapeti.Saga/SagaMessageMiddleware.cs | 22 + Tapeti.Saga/SagaMiddleware.cs | 16 + Tapeti.Saga/SagaProvider.cs | 90 ++++ .../Properties/AssemblyInfo.cs | 1 - .../SimpleInjectorControllerFactory.cs | 22 - .../SimpleInjectorDependencyResolver.cs | 59 +-- .../Tapeti.SimpleInjector.csproj | 1 - Tapeti.csproj | 23 +- TapetiConfig.cs | 400 ++++++++++++++++++ TapetiConnection.cs | 55 +-- TapetiConnectionBuilder.cs | 30 -- TapetiTopologyBuilder.cs | 146 ------- Test/Properties/AssemblyInfo.cs | 1 - Test/Visualizer.cs | 17 + 41 files changed, 956 insertions(+), 709 deletions(-) create mode 100644 Config/IBindingContext.cs create mode 100644 Config/IBindingMiddleware.cs create mode 100644 Config/IConfig.cs create mode 100644 Config/IMessageContext.cs create mode 100644 Config/IMessageMiddleware.cs create mode 100644 Config/IMiddlewareBundle.cs create mode 100644 Default/BindingBufferStop.cs delete mode 100644 Default/DefaultControllerFactory.cs delete mode 100644 Default/DefaultDependencyResolver.cs create mode 100644 Default/DependencyResolverBinding.cs create mode 100644 Default/MessageBinding.cs create mode 100644 Helpers/ConsoleHelper.cs create mode 100644 Helpers/MiddlewareHelper.cs delete mode 100644 IControllerFactory.cs delete mode 100644 IQueueRegistration.cs delete mode 100644 ITopology.cs delete mode 100644 Registration/AbstractControllerRegistration.cs delete mode 100644 Registration/ControllerDynamicQueueRegistration.cs delete mode 100644 Registration/ControllerQueueRegistration.cs create mode 100644 Tapeti.Saga/SagaBindingMiddleware.cs create mode 100644 Tapeti.Saga/SagaMemoryStore.cs create mode 100644 Tapeti.Saga/SagaMessageMiddleware.cs create mode 100644 Tapeti.Saga/SagaMiddleware.cs create mode 100644 Tapeti.Saga/SagaProvider.cs delete mode 100644 Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs create mode 100644 TapetiConfig.cs delete mode 100644 TapetiConnectionBuilder.cs delete mode 100644 TapetiTopologyBuilder.cs create mode 100644 Test/Visualizer.cs diff --git a/Config/IBindingContext.cs b/Config/IBindingContext.cs new file mode 100644 index 0000000..df3444e --- /dev/null +++ b/Config/IBindingContext.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Reflection; + +namespace Tapeti.Config +{ + public delegate object ValueFactory(IMessageContext context); + + + public interface IBindingContext + { + Type MessageClass { get; set; } + IReadOnlyList Parameters { get; } + } + + + public interface IBindingParameter + { + ParameterInfo Info { get; } + bool HasBinding { get; } + + void SetBinding(ValueFactory valueFactory); + } +} diff --git a/Config/IBindingMiddleware.cs b/Config/IBindingMiddleware.cs new file mode 100644 index 0000000..e2d977c --- /dev/null +++ b/Config/IBindingMiddleware.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Config +{ + public interface IBindingMiddleware + { + void Handle(IBindingContext context, Action next); + } +} diff --git a/Config/IConfig.cs b/Config/IConfig.cs new file mode 100644 index 0000000..8752c97 --- /dev/null +++ b/Config/IConfig.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface IConfig + { + string Exchange { get; } + IDependencyResolver DependencyResolver { get; } + IReadOnlyList MessageMiddleware { get; } + IEnumerable Queues { get; } + } + + + public interface IQueue + { + bool Dynamic { get; } + string Name { get; } + + IEnumerable Bindings { get; } + } + + + public interface IBinding + { + Type Controller { get; } + MethodInfo Method { get; } + Type MessageClass { get; } + + bool Accept(object message); + Task Invoke(IMessageContext context, object message); + } +} diff --git a/Config/IMessageContext.cs b/Config/IMessageContext.cs new file mode 100644 index 0000000..6bf8101 --- /dev/null +++ b/Config/IMessageContext.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; + +namespace Tapeti.Config +{ + public interface IMessageContext + { + object Controller { get; } + object Message { get; } + IDictionary Items { get; } + } +} diff --git a/Config/IMessageMiddleware.cs b/Config/IMessageMiddleware.cs new file mode 100644 index 0000000..aaec7eb --- /dev/null +++ b/Config/IMessageMiddleware.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Config +{ + public interface IMessageMiddleware + { + void Handle(IMessageContext context, Action next); + } +} diff --git a/Config/IMiddlewareBundle.cs b/Config/IMiddlewareBundle.cs new file mode 100644 index 0000000..82846f3 --- /dev/null +++ b/Config/IMiddlewareBundle.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace Tapeti.Config +{ + public interface IMiddlewareBundle + { + IEnumerable GetContents(IDependencyResolver dependencyResolver); + } +} diff --git a/Connection/TapetiConsumer.cs b/Connection/TapetiConsumer.cs index afabfd5..1687d24 100644 --- a/Connection/TapetiConsumer.cs +++ b/Connection/TapetiConsumer.cs @@ -1,21 +1,26 @@ using System; -using System.Diagnostics.Eventing.Reader; +using System.Collections.Generic; +using System.Linq; using RabbitMQ.Client; +using Tapeti.Config; +using Tapeti.Helpers; namespace Tapeti.Connection { public class TapetiConsumer : DefaultBasicConsumer { private readonly TapetiWorker worker; - private readonly IMessageSerializer messageSerializer; - private readonly IQueueRegistration queueRegistration; + private readonly IDependencyResolver dependencyResolver; + private readonly IReadOnlyList messageMiddleware; + private readonly List bindings; - public TapetiConsumer(TapetiWorker worker, IMessageSerializer messageSerializer, IQueueRegistration queueRegistration) + public TapetiConsumer(TapetiWorker worker, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware) { this.worker = worker; - this.messageSerializer = messageSerializer; - this.queueRegistration = queueRegistration; + this.dependencyResolver = dependencyResolver; + this.messageMiddleware = messageMiddleware; + this.bindings = bindings.ToList(); } @@ -24,22 +29,46 @@ namespace Tapeti.Connection { try { - var message = messageSerializer.Deserialize(body, properties); + var message = dependencyResolver.Resolve().Deserialize(body, properties); if (message == null) throw new ArgumentException("Empty message"); - if (queueRegistration.Accept(message)) - queueRegistration.Visit(message); - else + var handled = false; + foreach (var binding in bindings.Where(b => b.Accept(message))) + { + var context = new MessageContext + { + Controller = dependencyResolver.Resolve(binding.Controller), + Message = message + }; + + MiddlewareHelper.Go(messageMiddleware, (handler, next) => handler.Handle(context, next)); + + var result = binding.Invoke(context, message).Result; + if (result != null) + worker.Publish(result); + + handled = true; + } + + if (!handled) throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); worker.Respond(deliveryTag, ConsumeResponse.Ack); } catch (Exception) { - //TODO pluggable exception handling - worker.Respond(deliveryTag, ConsumeResponse.Nack); + worker.Respond(deliveryTag, ConsumeResponse.Requeue); + throw; } } + + + protected class MessageContext : IMessageContext + { + public object Controller { get; set; } + public object Message { get; set; } + public IDictionary Items { get; } = new Dictionary(); + } } } diff --git a/Connection/TapetiSubscriber.cs b/Connection/TapetiSubscriber.cs index 9d487f3..f6303cb 100644 --- a/Connection/TapetiSubscriber.cs +++ b/Connection/TapetiSubscriber.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Tapeti.Config; namespace Tapeti.Connection { @@ -16,9 +17,9 @@ namespace Tapeti.Connection } - public async Task BindQueues(IEnumerable registrations) + public async Task BindQueues(IEnumerable queues) { - await Task.WhenAll(registrations.Select(registration => workerFactory().Subscribe(registration)).ToList()); + await Task.WhenAll(queues.Select(queue => workerFactory().Subscribe(queue)).ToList()); } } } diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index 99c4b2f..f5ee254 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -1,8 +1,10 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; +using Tapeti.Config; using Tapeti.Tasks; namespace Tapeti.Connection @@ -10,20 +12,23 @@ namespace Tapeti.Connection public class TapetiWorker { public TapetiConnectionParams ConnectionParams { get; set; } - public string PublishExchange { get; set; } - + public string Exchange { get; set; } + private readonly IDependencyResolver dependencyResolver; + private readonly IReadOnlyList messageMiddleware; private readonly IMessageSerializer messageSerializer; private readonly IRoutingKeyStrategy routingKeyStrategy; private readonly Lazy taskQueue = new Lazy(); private RabbitMQ.Client.IConnection connection; - private IModel channel; + private IModel channelInstance; - public TapetiWorker(IMessageSerializer messageSerializer, IRoutingKeyStrategy routingKeyStrategy) + public TapetiWorker(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware) { - this.messageSerializer = messageSerializer; - this.routingKeyStrategy = routingKeyStrategy; + this.dependencyResolver = dependencyResolver; + this.messageMiddleware = messageMiddleware; + messageSerializer = dependencyResolver.Resolve(); + routingKeyStrategy = dependencyResolver.Resolve(); } @@ -35,29 +40,45 @@ namespace Tapeti.Connection var body = messageSerializer.Serialize(message, properties); (await GetChannel()) - .BasicPublish(PublishExchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false, + .BasicPublish(Exchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false, properties, body); }).Unwrap(); } - public Task Subscribe(string queueName, IQueueRegistration queueRegistration) + public Task Consume(string queueName, IEnumerable bindings) { return taskQueue.Value.Add(async () => { - (await GetChannel()) - .BasicConsume(queueName, false, new TapetiConsumer(this, messageSerializer, queueRegistration)); + (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, dependencyResolver, bindings, messageMiddleware)); }).Unwrap(); } - public async Task Subscribe(IQueueRegistration registration) + public async Task Subscribe(IQueue queue) { - var queueName = await taskQueue.Value.Add(async () => - registration.BindQueue(await GetChannel())) - .Unwrap(); + var queueName = await taskQueue.Value.Add(async () => + { + var channel = await GetChannel(); - await Subscribe(queueName, registration); + if (queue.Dynamic) + { + var dynamicQueue = channel.QueueDeclare(); + + foreach (var binding in queue.Bindings) + { + var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); + channel.QueueBind(dynamicQueue.QueueName, Exchange, routingKey); + } + + return dynamicQueue.QueueName; + } + + channel.QueueDeclarePassive(queue.Name); + return queue.Name; + }).Unwrap(); + + await Consume(queueName, queue.Bindings); } @@ -91,10 +112,10 @@ namespace Tapeti.Connection return taskQueue.Value.Add(() => { - if (channel != null) + if (channelInstance != null) { - channel.Dispose(); - channel = null; + channelInstance.Dispose(); + channelInstance = null; } // ReSharper disable once InvertIf @@ -115,8 +136,8 @@ namespace Tapeti.Connection /// private async Task GetChannel() { - if (channel != null) - return channel; + if (channelInstance != null) + return channelInstance; var connectionFactory = new ConnectionFactory { @@ -134,7 +155,7 @@ namespace Tapeti.Connection try { connection = connectionFactory.CreateConnection(); - channel = connection.CreateModel(); + channelInstance = connection.CreateModel(); break; } @@ -144,7 +165,7 @@ namespace Tapeti.Connection } } - return channel; + return channelInstance; } } } diff --git a/Default/BindingBufferStop.cs b/Default/BindingBufferStop.cs new file mode 100644 index 0000000..8d55a7c --- /dev/null +++ b/Default/BindingBufferStop.cs @@ -0,0 +1,13 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Default +{ + // End of the line... + public class BindingBufferStop : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + } + } +} diff --git a/Default/DefaultControllerFactory.cs b/Default/DefaultControllerFactory.cs deleted file mode 100644 index 9918c18..0000000 --- a/Default/DefaultControllerFactory.cs +++ /dev/null @@ -1,62 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reflection; - -namespace Tapeti.Default -{ - public class DefaultControllerFactory : IControllerFactory - { - private readonly Dictionary> controllerConstructors = new Dictionary>(); - private readonly Func publisherFactory; - - public DefaultControllerFactory(Func publisherFactory) - { - this.publisherFactory = publisherFactory; - } - - - public object CreateController(Type controllerType) - { - Func constructor; - if (!controllerConstructors.TryGetValue(controllerType, out constructor)) - throw new ArgumentException($"Can not create unregistered controller {controllerType.FullName}"); - - return constructor(); - } - - - public void RegisterController(Type type) - { - controllerConstructors.Add(type, GetConstructor(type)); - } - - - protected Func GetConstructor(Type type) - { - var constructors = type.GetConstructors(); - - ConstructorInfo publisherConstructor = null; - ConstructorInfo emptyConstructor = null; - - foreach (var constructor in constructors) - { - var parameters = constructor.GetParameters(); - if (parameters.Length > 0) - { - if (parameters.Length == 1 && parameters[0].ParameterType == typeof(IPublisher)) - publisherConstructor = constructor; - } - else - emptyConstructor = constructor; - } - - if (publisherConstructor != null) - return () => publisherConstructor.Invoke(new object[] { publisherFactory() }); - - if (emptyConstructor != null) - return () => emptyConstructor.Invoke(null); - - throw new ArgumentException($"Unable to construct type {type.Name}, a parameterless constructor or one with only an IPublisher parameter is required"); - } - } -} diff --git a/Default/DefaultDependencyResolver.cs b/Default/DefaultDependencyResolver.cs deleted file mode 100644 index 066307e..0000000 --- a/Default/DefaultDependencyResolver.cs +++ /dev/null @@ -1,72 +0,0 @@ -using System; - -namespace Tapeti.Default -{ - /** - * !! IoC Container 9000 !! - * - * ...you probably want to replace this one as soon as possible. - * - * A Simple Injector implementation is provided in the Tapeti.SimpleInjector package. - */ - public class DefaultDependencyResolver : IDependencyInjector - { - private readonly Lazy controllerFactory; - private readonly Lazy routingKeyStrategy = new Lazy(); - private readonly Lazy messageSerializer = new Lazy(); - private readonly Lazy logger; - private IPublisher publisher; - - - public DefaultDependencyResolver() - { - controllerFactory = new Lazy(() => new DefaultControllerFactory(() => publisher)); - - logger = new Lazy(() => - { - // http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console - try - { - // ReSharper disable once UnusedVariable - var dummy = Console.WindowHeight; - - return new ConsoleLogger(); - } - catch - { - return new DevNullLogger(); - } - }); - } - - - public T Resolve() where T : class - { - if (typeof(T) == typeof(IControllerFactory)) - return (T)(controllerFactory.Value as IControllerFactory); - - if (typeof(T) == typeof(IRoutingKeyStrategy)) - return (T)(routingKeyStrategy.Value as IRoutingKeyStrategy); - - if (typeof(T) == typeof(IMessageSerializer)) - return (T)(messageSerializer.Value as IMessageSerializer); - - if (typeof(T) == typeof(ILogger)) - return (T)logger.Value; - - return default(T); - } - - - public void RegisterPublisher(IPublisher value) - { - publisher = value; - } - - - public void RegisterController(Type type) - { - controllerFactory.Value.RegisterController(type); - } - } -} diff --git a/Default/DependencyResolverBinding.cs b/Default/DependencyResolverBinding.cs new file mode 100644 index 0000000..d9a8a0d --- /dev/null +++ b/Default/DependencyResolverBinding.cs @@ -0,0 +1,26 @@ +using System; +using System.Linq; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class DependencyResolverBinding : IBindingMiddleware + { + private readonly IDependencyResolver resolver; + + + public DependencyResolverBinding(IDependencyResolver resolver) + { + this.resolver = resolver; + } + + + public void Handle(IBindingContext context, Action next) + { + next(); + + foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType.IsClass)) + parameter.SetBinding(messageContext => resolver.Resolve(parameter.Info.ParameterType)); + } + } +} diff --git a/Default/MessageBinding.cs b/Default/MessageBinding.cs new file mode 100644 index 0000000..f21cd6b --- /dev/null +++ b/Default/MessageBinding.cs @@ -0,0 +1,23 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class MessageBinding : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + if (context.Parameters.Count == 0) + throw new TopologyConfigurationException("First parameter must be a message class"); + + var parameter = context.Parameters[0]; + if (!parameter.Info.ParameterType.IsClass) + throw new TopologyConfigurationException($"First parameter {parameter.Info.Name} must be a message class"); + + parameter.SetBinding(messageContext => messageContext.Message); + context.MessageClass = parameter.Info.ParameterType; + + next(); + } + } +} diff --git a/Helpers/ConsoleHelper.cs b/Helpers/ConsoleHelper.cs new file mode 100644 index 0000000..0769de8 --- /dev/null +++ b/Helpers/ConsoleHelper.cs @@ -0,0 +1,22 @@ +using System; + +namespace Tapeti.Helpers +{ + public static class ConsoleHelper + { + // Source: http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console + public static bool IsAvailable() + { + try + { + // ReSharper disable once UnusedVariable - that's why it's called dummy + var dummy = Console.WindowHeight; + return true; + } + catch + { + return false; + } + } + } +} diff --git a/Helpers/MiddlewareHelper.cs b/Helpers/MiddlewareHelper.cs new file mode 100644 index 0000000..4e813b1 --- /dev/null +++ b/Helpers/MiddlewareHelper.cs @@ -0,0 +1,26 @@ +using System; +using System.Collections.Generic; + +namespace Tapeti.Helpers +{ + public static class MiddlewareHelper + { + public static void Go(IReadOnlyList middleware, Action handle) + { + var handlerIndex = middleware.Count - 1; + if (handlerIndex == -1) + return; + + Action handleNext = null; + + handleNext = () => + { + handlerIndex--; + if (handlerIndex >= 0) + handle(middleware[handlerIndex], handleNext); + }; + + handle(middleware[handlerIndex], handleNext); + } + } +} diff --git a/IControllerFactory.cs b/IControllerFactory.cs deleted file mode 100644 index 6522f3f..0000000 --- a/IControllerFactory.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; - -namespace Tapeti -{ - public interface IControllerFactory - { - object CreateController(Type controllerType); - } -} diff --git a/IDependencyResolver.cs b/IDependencyResolver.cs index a177d25..6f2641b 100644 --- a/IDependencyResolver.cs +++ b/IDependencyResolver.cs @@ -5,12 +5,14 @@ namespace Tapeti public interface IDependencyResolver { T Resolve() where T : class; + object Resolve(Type type); } public interface IDependencyInjector : IDependencyResolver { - void RegisterPublisher(IPublisher publisher); + void RegisterDefault() where TService : class where TImplementation : class, TService; + void RegisterPublisher(Func publisher); void RegisterController(Type type); } } diff --git a/IQueueRegistration.cs b/IQueueRegistration.cs deleted file mode 100644 index 901af4b..0000000 --- a/IQueueRegistration.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System.Threading.Tasks; -using RabbitMQ.Client; - -namespace Tapeti -{ - public interface IQueueRegistration - { - string BindQueue(IModel channel); - - bool Accept(object message); - Task Visit(object message); - } -} diff --git a/ISubscriber.cs b/ISubscriber.cs index 2be1a35..fae4328 100644 --- a/ISubscriber.cs +++ b/ISubscriber.cs @@ -1,6 +1,4 @@ -using System; - -namespace Tapeti +namespace Tapeti { public interface ISubscriber { diff --git a/ITopology.cs b/ITopology.cs deleted file mode 100644 index 05c1934..0000000 --- a/ITopology.cs +++ /dev/null @@ -1,20 +0,0 @@ -using System.Collections.Generic; - -namespace Tapeti -{ - public interface ITopology - { - IEnumerable Queues(); - } - - - public interface IQueue - { - IEnumerable Bindings(); - } - - - public interface IBinding - { - } -} diff --git a/Properties/AssemblyInfo.cs b/Properties/AssemblyInfo.cs index f02ee9b..3428fd6 100644 --- a/Properties/AssemblyInfo.cs +++ b/Properties/AssemblyInfo.cs @@ -1,5 +1,4 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following diff --git a/Registration/AbstractControllerRegistration.cs b/Registration/AbstractControllerRegistration.cs deleted file mode 100644 index f0b2c64..0000000 --- a/Registration/AbstractControllerRegistration.cs +++ /dev/null @@ -1,142 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Threading.Tasks; -using RabbitMQ.Client; -using Tapeti.Annotations; - -namespace Tapeti.Registration -{ - using MessageHandlerAction = Func; - - public struct MessageHandler - { - public MessageHandlerAction Action; - public string Exchange; - public string RoutingKey; - } - - - public abstract class AbstractControllerRegistration : IQueueRegistration - { - private readonly Func controllerFactoryFactory; - private readonly Type controllerType; - private readonly string defaultExchange; - private readonly Dictionary> messageHandlers = new Dictionary>(); - - - protected AbstractControllerRegistration(Func controllerFactoryFactory, Type controllerType, string defaultExchange) - { - this.controllerFactoryFactory = controllerFactoryFactory; - this.controllerType = controllerType; - this.defaultExchange = defaultExchange; - - // ReSharper disable once VirtualMemberCallInConstructor - I know. What do you think this is, C++? - GetMessageHandlers(controllerType, (type, handler) => - { - if (!messageHandlers.ContainsKey(type)) - messageHandlers.Add(type, new List { handler }); - else - messageHandlers[type].Add(handler); - }); - } - - - protected virtual void GetMessageHandlers(Type type, Action add) - { - foreach (var method in type.GetMembers(BindingFlags.Public | BindingFlags.Instance) - .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object)) - .Select(m => (MethodInfo)m)) - { - Type messageType; - var messageHandler = GetMessageHandler(method, out messageType); - - add(messageType, messageHandler); - } - } - - - protected virtual MessageHandler GetMessageHandler(MethodInfo method, out Type messageType) - { - var parameters = method.GetParameters(); - - if (parameters.Length != 1 || !parameters[0].ParameterType.IsClass) - throw new ArgumentException($"Method {method.Name} does not have a single object parameter"); - - messageType = parameters[0].ParameterType; - var messageHandler = new MessageHandler(); - - if (method.ReturnType == typeof(void)) - messageHandler.Action = CreateSyncMessageHandler(method); - else if (method.ReturnType == typeof(Task)) - messageHandler.Action = CreateAsyncMessageHandler(method); - else - throw new ArgumentException($"Method {method.Name} needs to return void or a Task"); - - var exchangeAttribute = method.GetCustomAttribute() ?? method.DeclaringType.GetCustomAttribute(); - messageHandler.Exchange = exchangeAttribute?.Name; - - return messageHandler; - } - - - protected IEnumerable GetMessageTypes() - { - return messageHandlers.Keys; - } - - - protected IEnumerable GetMessageExchanges(Type type) - { - var exchanges = messageHandlers[type] - .Where(h => h.Exchange != null) - .Select(h => h.Exchange) - .Distinct(StringComparer.InvariantCulture) - .ToArray(); - - return exchanges.Length > 0 ? exchanges : new[] { defaultExchange }; - } - - - public abstract string BindQueue(IModel channel); - - - public bool Accept(object message) - { - return messageHandlers.ContainsKey(message.GetType()); - } - - - public Task Visit(object message) - { - var registeredHandlers = messageHandlers[message.GetType()]; - if (registeredHandlers != null) - return Task.WhenAll(registeredHandlers.Select(messageHandler => messageHandler.Action(message))); - - return Task.CompletedTask; - } - - - protected virtual MessageHandlerAction CreateSyncMessageHandler(MethodInfo method) - { - return message => - { - var controller = controllerFactoryFactory().CreateController(controllerType); - method.Invoke(controller, new[] { message }); - - return Task.CompletedTask; - }; - } - - - protected virtual MessageHandlerAction CreateAsyncMessageHandler(MethodInfo method) - { - return message => - { - var controller = controllerFactoryFactory().CreateController(controllerType); - return (Task)method.Invoke(controller, new[] { message }); - }; - } - } -} diff --git a/Registration/ControllerDynamicQueueRegistration.cs b/Registration/ControllerDynamicQueueRegistration.cs deleted file mode 100644 index ad7f787..0000000 --- a/Registration/ControllerDynamicQueueRegistration.cs +++ /dev/null @@ -1,33 +0,0 @@ -using System; -using RabbitMQ.Client; - -namespace Tapeti.Registration -{ - public class ControllerDynamicQueueRegistration : AbstractControllerRegistration - { - private readonly Func routingKeyStrategyFactory; - - - public ControllerDynamicQueueRegistration(Func controllerFactoryFactory, Func routingKeyStrategyFactory, Type controllerType, string defaultExchange) - : base(controllerFactoryFactory, controllerType, defaultExchange) - { - this.routingKeyStrategyFactory = routingKeyStrategyFactory; - } - - - public override string BindQueue(IModel channel) - { - var queue = channel.QueueDeclare(); - - foreach (var messageType in GetMessageTypes()) - { - var routingKey = routingKeyStrategyFactory().GetRoutingKey(messageType); - - foreach (var exchange in GetMessageExchanges(messageType)) - channel.QueueBind(queue.QueueName, exchange, routingKey); - } - - return queue.QueueName; - } - } -} diff --git a/Registration/ControllerQueueRegistration.cs b/Registration/ControllerQueueRegistration.cs deleted file mode 100644 index 3be95d5..0000000 --- a/Registration/ControllerQueueRegistration.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using RabbitMQ.Client; - -namespace Tapeti.Registration -{ - public class ControllerQueueRegistration : AbstractControllerRegistration - { - private readonly string queueName; - - public ControllerQueueRegistration(Func controllerFactoryFactory, Type controllerType, string defaultExchange, string queueName) : base(controllerFactoryFactory, controllerType, defaultExchange) - { - this.queueName = queueName; - } - - - public override string BindQueue(IModel channel) - { - return channel.QueueDeclarePassive(queueName).QueueName; - } - } -} diff --git a/Tapeti.Saga/SagaBindingMiddleware.cs b/Tapeti.Saga/SagaBindingMiddleware.cs new file mode 100644 index 0000000..f6a056f --- /dev/null +++ b/Tapeti.Saga/SagaBindingMiddleware.cs @@ -0,0 +1,28 @@ +using System; +using System.Linq; +using Tapeti.Config; + +namespace Tapeti.Saga +{ + public class SagaBindingMiddleware : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + foreach (var parameter in context.Parameters.Where(p => + p.Info.ParameterType.IsGenericType && + p.Info.ParameterType.GetGenericTypeDefinition() == typeof(ISaga<>))) + { + parameter.SetBinding(messageContext => + { + object saga; + if (!messageContext.Items.TryGetValue("Saga", out saga)) + return null; + + return saga.GetType() == typeof(ISaga<>) ? saga : null; + }); + } + + next(); + } + } +} diff --git a/Tapeti.Saga/SagaMemoryStore.cs b/Tapeti.Saga/SagaMemoryStore.cs new file mode 100644 index 0000000..52ee511 --- /dev/null +++ b/Tapeti.Saga/SagaMemoryStore.cs @@ -0,0 +1,43 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Tapeti.Saga +{ + public class SagaMemoryStore : ISagaStore + { + private ISagaStore decoratedStore; + private readonly Dictionary values = new Dictionary(); + + + // Not a constructor to allow standard injection to work when using only the MemoryStore + public static SagaMemoryStore AsCacheFor(ISagaStore store) + { + return new SagaMemoryStore + { + decoratedStore = store + }; + } + + + public async Task Read(string sagaId) + { + object value; + + // ReSharper disable once InvertIf + if (!values.TryGetValue(sagaId, out value) && decoratedStore != null) + { + value = await decoratedStore.Read(sagaId); + values.Add(sagaId, value); + } + + return value; + } + + public async Task Update(string sagaId, object state) + { + values[sagaId] = state; + if (decoratedStore != null) + await decoratedStore.Update(sagaId, state); + } + } +} diff --git a/Tapeti.Saga/SagaMessageMiddleware.cs b/Tapeti.Saga/SagaMessageMiddleware.cs new file mode 100644 index 0000000..bef101a --- /dev/null +++ b/Tapeti.Saga/SagaMessageMiddleware.cs @@ -0,0 +1,22 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Saga +{ + public class SagaMessageMiddleware : IMessageMiddleware + { + private readonly IDependencyResolver dependencyResolver; + + + public SagaMessageMiddleware(IDependencyResolver dependencyResolver) + { + this.dependencyResolver = dependencyResolver; + } + + public void Handle(IMessageContext context, Action next) + { + context.Items["Saga"] = dependencyResolver.Resolve().Continue(""); + next(); + } + } +} diff --git a/Tapeti.Saga/SagaMiddleware.cs b/Tapeti.Saga/SagaMiddleware.cs new file mode 100644 index 0000000..405a29f --- /dev/null +++ b/Tapeti.Saga/SagaMiddleware.cs @@ -0,0 +1,16 @@ +using System.Collections.Generic; +using Tapeti.Config; + +namespace Tapeti.Saga +{ + public class SagaMiddleware : IMiddlewareBundle + { + public IEnumerable GetContents(IDependencyResolver dependencyResolver) + { + (dependencyResolver as IDependencyInjector)?.RegisterDefault(); + + yield return new SagaBindingMiddleware(); + yield return new SagaMessageMiddleware(dependencyResolver); + } + } +} diff --git a/Tapeti.Saga/SagaProvider.cs b/Tapeti.Saga/SagaProvider.cs new file mode 100644 index 0000000..b21ece8 --- /dev/null +++ b/Tapeti.Saga/SagaProvider.cs @@ -0,0 +1,90 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace Tapeti.Saga +{ + public class SagaProvider : ISagaProvider + { + protected static readonly ConcurrentDictionary SagaLocks = new ConcurrentDictionary(); + private readonly ISagaStore store; + + public SagaProvider(ISagaStore store) + { + this.store = store; + } + + + public async Task> Begin(T initialState) where T : class + { + var saga = await Saga.Create(() => Task.FromResult(initialState)); + await store.Update(saga.Id, saga.State); + + return saga; + } + + public async Task> Continue(string sagaId) where T : class + { + return await Saga.Create(async () => await store.Read(sagaId) as T, sagaId); + } + + public async Task Continue(string sagaId) + { + return new Saga + { + Id = sagaId, + State = await store.Read(sagaId) + }; + } + + + protected class Saga : ISaga where T : class + { + private bool disposed; + + public string Id { get; set; } + public T State { get; set; } + + + public static async Task> Create(Func> getState, string id = null) + { + var sagaId = id ?? Guid.NewGuid().ToString(); + await SagaLocks.GetOrAdd(sagaId, new SemaphoreSlim(1)).WaitAsync(); + + var saga = new Saga + { + Id = sagaId, + State = await getState() + }; + + return saga; + } + + + public void Dispose() + { + if (disposed) + return; + + SemaphoreSlim semaphore; + if (SagaLocks.TryGetValue(Id, out semaphore)) + semaphore.Release(); + + disposed = true; + } + + + public void ExpectResponse(string callId) + { + throw new NotImplementedException(); + } + + + public void ResolveResponse(string callId) + { + throw new NotImplementedException(); + } + } + } +} diff --git a/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs b/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs index e01df40..06f8dc0 100644 --- a/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs +++ b/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs @@ -1,5 +1,4 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following diff --git a/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs b/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs deleted file mode 100644 index 8f32b3a..0000000 --- a/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; -using SimpleInjector; - -namespace Tapeti.SimpleInjector -{ - public class SimpleInjectorControllerFactory : IControllerFactory - { - private readonly Container container; - - - public SimpleInjectorControllerFactory(Container container) - { - this.container = container; - } - - - public object CreateController(Type controllerType) - { - return container.GetInstance(controllerType); - } - } -} diff --git a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs index 7dc3f21..6b74af3 100644 --- a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs +++ b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs @@ -1,23 +1,16 @@ using System; using System.Linq; -using System.Reflection; using SimpleInjector; -using Tapeti.Annotations; -using Tapeti.Default; -using System.Collections.Generic; namespace Tapeti.SimpleInjector { - public class SimpleInjectorDependencyResolver : IDependencyResolver, IDependencyInjector + public class SimpleInjectorDependencyResolver : IDependencyInjector { private readonly Container container; - public SimpleInjectorDependencyResolver(Container container, bool registerDefaults = true) + public SimpleInjectorDependencyResolver(Container container) { this.container = container; - - if (registerDefaults) - RegisterDefaults(); } public T Resolve() where T : class @@ -25,10 +18,25 @@ namespace Tapeti.SimpleInjector return container.GetInstance(); } - - public void RegisterPublisher(IPublisher publisher) + public object Resolve(Type type) { - IfUnregistered(container.GetCurrentRegistrations(), () => container.RegisterSingleton(publisher)); + return container.GetInstance(type); + } + + + public void RegisterDefault() where TService : class where TImplementation : class, TService + { + // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates + if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(TService))) + container.Register(); + } + + + public void RegisterPublisher(Func publisher) + { + // ReSharper disable once SimplifyLinqExpression - still not a fan of negative predicates + if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(IPublisher))) + container.Register(publisher); } @@ -36,32 +44,5 @@ namespace Tapeti.SimpleInjector { container.Register(type); } - - - public SimpleInjectorDependencyResolver RegisterDefaults() - { - var currentRegistrations = container.GetCurrentRegistrations(); - - IfUnregistered(currentRegistrations); - IfUnregistered(currentRegistrations); - IfUnregistered(currentRegistrations); - - return this; - } - - - private void IfUnregistered(IEnumerable currentRegistrations) where TService : class where TImplementation: class, TService - { - // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates - if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService))) - container.Register(); - } - - private void IfUnregistered(IEnumerable currentRegistrations, Action register) where TService : class - { - // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates - if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService))) - register(); - } } } diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj index f91dc29..3addf45 100644 --- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj @@ -45,7 +45,6 @@ - diff --git a/Tapeti.csproj b/Tapeti.csproj index 79e761d..e417e0f 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -59,35 +59,38 @@ + + - + + + + + - + - + - - - - - - - + + + +