From e62aa7d482ac88b7ef9c5a01292c3de8a46f7dc7 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 5 Dec 2016 08:00:09 +0100 Subject: [PATCH] Started refactoring, not in any usable state yet --- Annotations/DynamicQueueAttribute.cs | 13 +++++++ Annotations/MessageControllerAttribute.cs | 9 +++++ Annotations/QueueAttribute.cs | 18 --------- Annotations/StaticQueueAttribute.cs | 25 +++++++++++++ Connection/TapetiPublisher.cs | 11 +++--- Connection/TapetiSubscriber.cs | 11 +++--- Default/DefaultDependencyResolver.cs | 12 ++++-- IDependencyResolver.cs | 1 + MessageController.cs | 16 ++++++++ README.md | 15 +++++++- .../SimpleInjectorDependencyResolver.cs | 33 +++++++++++------ Tapeti.csproj | 5 ++- TapetiConnection.cs | 24 +++++++++--- TapetiConnectionExtensions.cs | 2 +- Test/MarcoEmitter.cs | 37 +++++++++++++++++++ Test/Program.cs | 13 ++----- Test/Test.csproj | 1 + Test/TestQueueController.cs | 10 ++--- 18 files changed, 189 insertions(+), 67 deletions(-) create mode 100644 Annotations/DynamicQueueAttribute.cs create mode 100644 Annotations/MessageControllerAttribute.cs delete mode 100644 Annotations/QueueAttribute.cs create mode 100644 Annotations/StaticQueueAttribute.cs create mode 100644 MessageController.cs create mode 100644 Test/MarcoEmitter.cs diff --git a/Annotations/DynamicQueueAttribute.cs b/Annotations/DynamicQueueAttribute.cs new file mode 100644 index 0000000..3d730c9 --- /dev/null +++ b/Annotations/DynamicQueueAttribute.cs @@ -0,0 +1,13 @@ +using System; + +namespace Tapeti.Annotations +{ + /// + /// Creates a non-durable auto-delete queue to receive messages. Can be used + /// on an entire MessageController class or on individual methods. + /// + [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + public class DynamicQueueAttribute : Attribute + { + } +} diff --git a/Annotations/MessageControllerAttribute.cs b/Annotations/MessageControllerAttribute.cs new file mode 100644 index 0000000..1f419f2 --- /dev/null +++ b/Annotations/MessageControllerAttribute.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Annotations +{ + [AttributeUsage(AttributeTargets.Class)] + public class MessageControllerAttribute : Attribute + { + } +} diff --git a/Annotations/QueueAttribute.cs b/Annotations/QueueAttribute.cs deleted file mode 100644 index 878390d..0000000 --- a/Annotations/QueueAttribute.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System; - -namespace Tapeti.Annotations -{ - [AttributeUsage(AttributeTargets.Class)] - public class QueueAttribute : Attribute - { - public string Name { get; set; } - public bool Dynamic { get; set; } - - - public QueueAttribute(string name = null) - { - Name = name; - Dynamic = (name == null); - } - } -} diff --git a/Annotations/StaticQueueAttribute.cs b/Annotations/StaticQueueAttribute.cs new file mode 100644 index 0000000..2a9c6b1 --- /dev/null +++ b/Annotations/StaticQueueAttribute.cs @@ -0,0 +1,25 @@ +using System; + +namespace Tapeti.Annotations +{ + /// + /// Binds to an existing durable queue to receive messages. Can be used + /// on an entire MessageController class or on individual methods. + /// + /// + /// At the moment there is no support for creating a durable queue and managing the + /// bindings. The author recommends https://git.x2software.net/pub/RabbitMetaQueue + /// for deploy-time management of durable queues (shameless plug intended). + /// + [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + public class StaticQueueAttribute : Attribute + { + public string Name { get; set; } + + + public StaticQueueAttribute(string name) + { + Name = name; + } + } +} diff --git a/Connection/TapetiPublisher.cs b/Connection/TapetiPublisher.cs index ca021f9..4143cf5 100644 --- a/Connection/TapetiPublisher.cs +++ b/Connection/TapetiPublisher.cs @@ -1,21 +1,22 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; namespace Tapeti.Connection { public class TapetiPublisher : IPublisher { - private readonly TapetiWorker worker; + private readonly Func workerFactory; - public TapetiPublisher(TapetiWorker worker) + public TapetiPublisher(Func workerFactory) { - this.worker = worker; + this.workerFactory = workerFactory; } public Task Publish(object message) { - return worker.Publish(message); + return workerFactory().Publish(message); } } } diff --git a/Connection/TapetiSubscriber.cs b/Connection/TapetiSubscriber.cs index baafe9d..9d487f3 100644 --- a/Connection/TapetiSubscriber.cs +++ b/Connection/TapetiSubscriber.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -6,18 +7,18 @@ namespace Tapeti.Connection { public class TapetiSubscriber : ISubscriber { - private readonly TapetiWorker worker; + private readonly Func workerFactory; - public TapetiSubscriber(TapetiWorker worker) + public TapetiSubscriber(Func workerFactory) { - this.worker = worker; + this.workerFactory = workerFactory; } public async Task BindQueues(IEnumerable registrations) { - await Task.WhenAll(registrations.Select(registration => worker.Subscribe(registration)).ToList()); + await Task.WhenAll(registrations.Select(registration => workerFactory().Subscribe(registration)).ToList()); } } } diff --git a/Default/DefaultDependencyResolver.cs b/Default/DefaultDependencyResolver.cs index 062bab7..066307e 100644 --- a/Default/DefaultDependencyResolver.cs +++ b/Default/DefaultDependencyResolver.cs @@ -15,12 +15,12 @@ namespace Tapeti.Default private readonly Lazy routingKeyStrategy = new Lazy(); private readonly Lazy messageSerializer = new Lazy(); private readonly Lazy logger; + private IPublisher publisher; - - public DefaultDependencyResolver(Func publisherFactory) + public DefaultDependencyResolver() { - controllerFactory = new Lazy(() => new DefaultControllerFactory(publisherFactory)); + controllerFactory = new Lazy(() => new DefaultControllerFactory(() => publisher)); logger = new Lazy(() => { @@ -58,6 +58,12 @@ namespace Tapeti.Default } + public void RegisterPublisher(IPublisher value) + { + publisher = value; + } + + public void RegisterController(Type type) { controllerFactory.Value.RegisterController(type); diff --git a/IDependencyResolver.cs b/IDependencyResolver.cs index e04aa84..a177d25 100644 --- a/IDependencyResolver.cs +++ b/IDependencyResolver.cs @@ -10,6 +10,7 @@ namespace Tapeti public interface IDependencyInjector : IDependencyResolver { + void RegisterPublisher(IPublisher publisher); void RegisterController(Type type); } } diff --git a/MessageController.cs b/MessageController.cs new file mode 100644 index 0000000..a046171 --- /dev/null +++ b/MessageController.cs @@ -0,0 +1,16 @@ +using Tapeti.Annotations; + +namespace Tapeti +{ + /// + /// Base class for message controllers + /// + /// + /// Using this base class is not required, you can add the MessageController attribute + /// to any class. + /// + [MessageController] + public abstract class MessageController + { + } +} diff --git a/README.md b/README.md index 3c2b121..2ac4bfc 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,14 @@ -'Small to medium-sized and classified as "Least Concern" by the IUCN.' +# Tapeti +> 'Small to medium-sized and classified as "Least Concern" by the IUCN.' +> +> [_Wikipedia_](https://en.wikipedia.org/wiki/Tapeti) -- Wikipedia \ No newline at end of file +Tapeti is a wrapper for the RabbitMQ .NET client designed for long-running microservices with a few specific goals: + +1. Automatic registration of message handlers +2. Publishing without transport details + * Routing key generated based on class name + * One exchange (per service / group of services) to publish them all +3. Attribute based, no base class requirements (only for convenience) +4. Graceful handling of connection issues, even at startup +5. Basic Saga support \ No newline at end of file diff --git a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs index db5e01e..7dc3f21 100644 --- a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs +++ b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs @@ -1,4 +1,5 @@ -using System.Linq; +using System; +using System.Linq; using System.Reflection; using SimpleInjector; using Tapeti.Annotations; @@ -7,7 +8,7 @@ using System.Collections.Generic; namespace Tapeti.SimpleInjector { - public class SimpleInjectorDependencyResolver : IDependencyResolver + public class SimpleInjectorDependencyResolver : IDependencyResolver, IDependencyInjector { private readonly Container container; @@ -25,6 +26,18 @@ namespace Tapeti.SimpleInjector } + public void RegisterPublisher(IPublisher publisher) + { + IfUnregistered(container.GetCurrentRegistrations(), () => container.RegisterSingleton(publisher)); + } + + + public void RegisterController(Type type) + { + container.Register(type); + } + + public SimpleInjectorDependencyResolver RegisterDefaults() { var currentRegistrations = container.GetCurrentRegistrations(); @@ -37,20 +50,18 @@ namespace Tapeti.SimpleInjector } - public SimpleInjectorDependencyResolver RegisterAllControllers(Assembly assembly) - { - foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute)))) - container.Register(type); - - return this; - } - - private void IfUnregistered(IEnumerable currentRegistrations) where TService : class where TImplementation: class, TService { // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService))) container.Register(); } + + private void IfUnregistered(IEnumerable currentRegistrations, Action register) where TService : class + { + // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates + if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService))) + register(); + } } } diff --git a/Tapeti.csproj b/Tapeti.csproj index ca4d60b..f81dac3 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -50,7 +50,9 @@ - + + + @@ -58,6 +60,7 @@ + diff --git a/TapetiConnection.cs b/TapetiConnection.cs index 0a7a5cc..a4139c9 100644 --- a/TapetiConnection.cs +++ b/TapetiConnection.cs @@ -19,8 +19,20 @@ namespace Tapeti public IDependencyResolver DependencyResolver { - get { return dependencyResolver ?? (dependencyResolver = new DefaultDependencyResolver(GetPublisher)); } - set { dependencyResolver = value; } + get + { + if (dependencyResolver == null) + DependencyResolver = new DefaultDependencyResolver(); + + return dependencyResolver; + } + set + { + dependencyResolver = value; + + var dependencyInjector = value as IDependencyInjector; + dependencyInjector?.RegisterPublisher(GetPublisher()); + } } @@ -43,14 +55,14 @@ namespace Tapeti public TapetiConnection WithDependencyResolver(IDependencyResolver resolver) { - dependencyResolver = resolver; + DependencyResolver = resolver; return this; } public TapetiConnection RegisterController(Type type) { - var queueAttribute = type.GetCustomAttribute(); + var queueAttribute = type.GetCustomAttribute(); if (queueAttribute == null) throw new ArgumentException("Queue attribute required on class", nameof(type)); @@ -84,7 +96,7 @@ namespace Tapeti if (!registrations.IsValueCreated || registrations.Value.Count == 0) throw new ArgumentException("No controllers registered"); - var subscriber = new TapetiSubscriber(worker.Value); + var subscriber = new TapetiSubscriber(() => worker.Value); await subscriber.BindQueues(registrations.Value); return subscriber; @@ -93,7 +105,7 @@ namespace Tapeti public IPublisher GetPublisher() { - return new TapetiPublisher(worker.Value); + return new TapetiPublisher(() => worker.Value); } diff --git a/TapetiConnectionExtensions.cs b/TapetiConnectionExtensions.cs index 72a0fe2..70c8b02 100644 --- a/TapetiConnectionExtensions.cs +++ b/TapetiConnectionExtensions.cs @@ -8,7 +8,7 @@ namespace Tapeti { public static TapetiConnection RegisterAllControllers(this TapetiConnection connection, Assembly assembly) { - foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute)))) + foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(DynamicQueueAttribute)))) connection.RegisterController(type); return connection; diff --git a/Test/MarcoEmitter.cs b/Test/MarcoEmitter.cs new file mode 100644 index 0000000..4fb8561 --- /dev/null +++ b/Test/MarcoEmitter.cs @@ -0,0 +1,37 @@ +using System.Threading; +using System.Threading.Tasks; +using Tapeti; + +namespace Test +{ + public class MarcoEmitter + { + private readonly IPublisher publisher; + + + public MarcoEmitter(IPublisher publisher) + { + this.publisher = publisher; + } + + + public async Task Run() + { + var concurrent = new SemaphoreSlim(20); + + //for (var x = 0; x < 5000; x++) + while (true) + { + await concurrent.WaitAsync(); + try + { + await publisher.Publish(new MarcoMessage()); + } + finally + { + concurrent.Release(); + } + } + } + } +} diff --git a/Test/Program.cs b/Test/Program.cs index 6e6fe03..d27f8c8 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -19,19 +19,14 @@ namespace Test .WithDependencyResolver(new SimpleInjectorDependencyResolver(container)) .RegisterAllControllers(typeof(Program).Assembly)) { - container.Register(() => connection.GetPublisher()); - + container.Register(); + Console.WriteLine("Subscribing..."); connection.Subscribe().Wait(); Console.WriteLine("Done!"); - var publisher = connection.GetPublisher(); - - //for (var x = 0; x < 5000; x++) - while(true) - publisher.Publish(new MarcoMessage()).Wait(); - - //Console.ReadLine(); + var emitter = container.GetInstance(); + emitter.Run().Wait(); } } } diff --git a/Test/Test.csproj b/Test/Test.csproj index d60e49b..98eb7c2 100644 --- a/Test/Test.csproj +++ b/Test/Test.csproj @@ -48,6 +48,7 @@ + diff --git a/Test/TestQueueController.cs b/Test/TestQueueController.cs index 2eed944..09f779b 100644 --- a/Test/TestQueueController.cs +++ b/Test/TestQueueController.cs @@ -5,10 +5,8 @@ using Tapeti.Annotations; namespace Test { - //[Exchange("myexchange")] - //[Queue("staticqueue")] - [Queue] - public class TestQueueController + [DynamicQueue] + public class TestQueueController : MessageController { private readonly IPublisher publisher; @@ -19,10 +17,10 @@ namespace Test } - public async Task Marco(MarcoMessage message) + public PoloMessage Marco(MarcoMessage message) { Console.WriteLine("Marco!"); - await publisher.Publish(new PoloMessage()); + return new PoloMessage(); }