diff --git a/Annotations/DynamicQueueAttribute.cs b/Annotations/DynamicQueueAttribute.cs
new file mode 100644
index 0000000..3d730c9
--- /dev/null
+++ b/Annotations/DynamicQueueAttribute.cs
@@ -0,0 +1,13 @@
+using System;
+
+namespace Tapeti.Annotations
+{
+ ///
+ /// Creates a non-durable auto-delete queue to receive messages. Can be used
+ /// on an entire MessageController class or on individual methods.
+ ///
+ [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
+ public class DynamicQueueAttribute : Attribute
+ {
+ }
+}
diff --git a/Annotations/MessageControllerAttribute.cs b/Annotations/MessageControllerAttribute.cs
new file mode 100644
index 0000000..1f419f2
--- /dev/null
+++ b/Annotations/MessageControllerAttribute.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace Tapeti.Annotations
+{
+ [AttributeUsage(AttributeTargets.Class)]
+ public class MessageControllerAttribute : Attribute
+ {
+ }
+}
diff --git a/Annotations/QueueAttribute.cs b/Annotations/QueueAttribute.cs
deleted file mode 100644
index 878390d..0000000
--- a/Annotations/QueueAttribute.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-using System;
-
-namespace Tapeti.Annotations
-{
- [AttributeUsage(AttributeTargets.Class)]
- public class QueueAttribute : Attribute
- {
- public string Name { get; set; }
- public bool Dynamic { get; set; }
-
-
- public QueueAttribute(string name = null)
- {
- Name = name;
- Dynamic = (name == null);
- }
- }
-}
diff --git a/Annotations/StaticQueueAttribute.cs b/Annotations/StaticQueueAttribute.cs
new file mode 100644
index 0000000..2a9c6b1
--- /dev/null
+++ b/Annotations/StaticQueueAttribute.cs
@@ -0,0 +1,25 @@
+using System;
+
+namespace Tapeti.Annotations
+{
+ ///
+ /// Binds to an existing durable queue to receive messages. Can be used
+ /// on an entire MessageController class or on individual methods.
+ ///
+ ///
+ /// At the moment there is no support for creating a durable queue and managing the
+ /// bindings. The author recommends https://git.x2software.net/pub/RabbitMetaQueue
+ /// for deploy-time management of durable queues (shameless plug intended).
+ ///
+ [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
+ public class StaticQueueAttribute : Attribute
+ {
+ public string Name { get; set; }
+
+
+ public StaticQueueAttribute(string name)
+ {
+ Name = name;
+ }
+ }
+}
diff --git a/Connection/TapetiPublisher.cs b/Connection/TapetiPublisher.cs
index ca021f9..4143cf5 100644
--- a/Connection/TapetiPublisher.cs
+++ b/Connection/TapetiPublisher.cs
@@ -1,21 +1,22 @@
-using System.Threading.Tasks;
+using System;
+using System.Threading.Tasks;
namespace Tapeti.Connection
{
public class TapetiPublisher : IPublisher
{
- private readonly TapetiWorker worker;
+ private readonly Func workerFactory;
- public TapetiPublisher(TapetiWorker worker)
+ public TapetiPublisher(Func workerFactory)
{
- this.worker = worker;
+ this.workerFactory = workerFactory;
}
public Task Publish(object message)
{
- return worker.Publish(message);
+ return workerFactory().Publish(message);
}
}
}
diff --git a/Connection/TapetiSubscriber.cs b/Connection/TapetiSubscriber.cs
index baafe9d..9d487f3 100644
--- a/Connection/TapetiSubscriber.cs
+++ b/Connection/TapetiSubscriber.cs
@@ -1,4 +1,5 @@
-using System.Collections.Generic;
+using System;
+using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
@@ -6,18 +7,18 @@ namespace Tapeti.Connection
{
public class TapetiSubscriber : ISubscriber
{
- private readonly TapetiWorker worker;
+ private readonly Func workerFactory;
- public TapetiSubscriber(TapetiWorker worker)
+ public TapetiSubscriber(Func workerFactory)
{
- this.worker = worker;
+ this.workerFactory = workerFactory;
}
public async Task BindQueues(IEnumerable registrations)
{
- await Task.WhenAll(registrations.Select(registration => worker.Subscribe(registration)).ToList());
+ await Task.WhenAll(registrations.Select(registration => workerFactory().Subscribe(registration)).ToList());
}
}
}
diff --git a/Default/DefaultDependencyResolver.cs b/Default/DefaultDependencyResolver.cs
index 062bab7..066307e 100644
--- a/Default/DefaultDependencyResolver.cs
+++ b/Default/DefaultDependencyResolver.cs
@@ -15,12 +15,12 @@ namespace Tapeti.Default
private readonly Lazy routingKeyStrategy = new Lazy();
private readonly Lazy messageSerializer = new Lazy();
private readonly Lazy logger;
+ private IPublisher publisher;
-
- public DefaultDependencyResolver(Func publisherFactory)
+ public DefaultDependencyResolver()
{
- controllerFactory = new Lazy(() => new DefaultControllerFactory(publisherFactory));
+ controllerFactory = new Lazy(() => new DefaultControllerFactory(() => publisher));
logger = new Lazy(() =>
{
@@ -58,6 +58,12 @@ namespace Tapeti.Default
}
+ public void RegisterPublisher(IPublisher value)
+ {
+ publisher = value;
+ }
+
+
public void RegisterController(Type type)
{
controllerFactory.Value.RegisterController(type);
diff --git a/IDependencyResolver.cs b/IDependencyResolver.cs
index e04aa84..a177d25 100644
--- a/IDependencyResolver.cs
+++ b/IDependencyResolver.cs
@@ -10,6 +10,7 @@ namespace Tapeti
public interface IDependencyInjector : IDependencyResolver
{
+ void RegisterPublisher(IPublisher publisher);
void RegisterController(Type type);
}
}
diff --git a/MessageController.cs b/MessageController.cs
new file mode 100644
index 0000000..a046171
--- /dev/null
+++ b/MessageController.cs
@@ -0,0 +1,16 @@
+using Tapeti.Annotations;
+
+namespace Tapeti
+{
+ ///
+ /// Base class for message controllers
+ ///
+ ///
+ /// Using this base class is not required, you can add the MessageController attribute
+ /// to any class.
+ ///
+ [MessageController]
+ public abstract class MessageController
+ {
+ }
+}
diff --git a/README.md b/README.md
index 3c2b121..2ac4bfc 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,14 @@
-'Small to medium-sized and classified as "Least Concern" by the IUCN.'
+# Tapeti
+> 'Small to medium-sized and classified as "Least Concern" by the IUCN.'
+>
+> [_Wikipedia_](https://en.wikipedia.org/wiki/Tapeti)
-- Wikipedia
\ No newline at end of file
+Tapeti is a wrapper for the RabbitMQ .NET client designed for long-running microservices with a few specific goals:
+
+1. Automatic registration of message handlers
+2. Publishing without transport details
+ * Routing key generated based on class name
+ * One exchange (per service / group of services) to publish them all
+3. Attribute based, no base class requirements (only for convenience)
+4. Graceful handling of connection issues, even at startup
+5. Basic Saga support
\ No newline at end of file
diff --git a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs
index db5e01e..7dc3f21 100644
--- a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs
+++ b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs
@@ -1,4 +1,5 @@
-using System.Linq;
+using System;
+using System.Linq;
using System.Reflection;
using SimpleInjector;
using Tapeti.Annotations;
@@ -7,7 +8,7 @@ using System.Collections.Generic;
namespace Tapeti.SimpleInjector
{
- public class SimpleInjectorDependencyResolver : IDependencyResolver
+ public class SimpleInjectorDependencyResolver : IDependencyResolver, IDependencyInjector
{
private readonly Container container;
@@ -25,6 +26,18 @@ namespace Tapeti.SimpleInjector
}
+ public void RegisterPublisher(IPublisher publisher)
+ {
+ IfUnregistered(container.GetCurrentRegistrations(), () => container.RegisterSingleton(publisher));
+ }
+
+
+ public void RegisterController(Type type)
+ {
+ container.Register(type);
+ }
+
+
public SimpleInjectorDependencyResolver RegisterDefaults()
{
var currentRegistrations = container.GetCurrentRegistrations();
@@ -37,20 +50,18 @@ namespace Tapeti.SimpleInjector
}
- public SimpleInjectorDependencyResolver RegisterAllControllers(Assembly assembly)
- {
- foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute))))
- container.Register(type);
-
- return this;
- }
-
-
private void IfUnregistered(IEnumerable currentRegistrations) where TService : class where TImplementation: class, TService
{
// ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates
if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService)))
container.Register();
}
+
+ private void IfUnregistered(IEnumerable currentRegistrations, Action register) where TService : class
+ {
+ // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates
+ if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService)))
+ register();
+ }
}
}
diff --git a/Tapeti.csproj b/Tapeti.csproj
index ca4d60b..f81dac3 100644
--- a/Tapeti.csproj
+++ b/Tapeti.csproj
@@ -50,7 +50,9 @@
-
+
+
+
@@ -58,6 +60,7 @@
+
diff --git a/TapetiConnection.cs b/TapetiConnection.cs
index 0a7a5cc..a4139c9 100644
--- a/TapetiConnection.cs
+++ b/TapetiConnection.cs
@@ -19,8 +19,20 @@ namespace Tapeti
public IDependencyResolver DependencyResolver
{
- get { return dependencyResolver ?? (dependencyResolver = new DefaultDependencyResolver(GetPublisher)); }
- set { dependencyResolver = value; }
+ get
+ {
+ if (dependencyResolver == null)
+ DependencyResolver = new DefaultDependencyResolver();
+
+ return dependencyResolver;
+ }
+ set
+ {
+ dependencyResolver = value;
+
+ var dependencyInjector = value as IDependencyInjector;
+ dependencyInjector?.RegisterPublisher(GetPublisher());
+ }
}
@@ -43,14 +55,14 @@ namespace Tapeti
public TapetiConnection WithDependencyResolver(IDependencyResolver resolver)
{
- dependencyResolver = resolver;
+ DependencyResolver = resolver;
return this;
}
public TapetiConnection RegisterController(Type type)
{
- var queueAttribute = type.GetCustomAttribute();
+ var queueAttribute = type.GetCustomAttribute();
if (queueAttribute == null)
throw new ArgumentException("Queue attribute required on class", nameof(type));
@@ -84,7 +96,7 @@ namespace Tapeti
if (!registrations.IsValueCreated || registrations.Value.Count == 0)
throw new ArgumentException("No controllers registered");
- var subscriber = new TapetiSubscriber(worker.Value);
+ var subscriber = new TapetiSubscriber(() => worker.Value);
await subscriber.BindQueues(registrations.Value);
return subscriber;
@@ -93,7 +105,7 @@ namespace Tapeti
public IPublisher GetPublisher()
{
- return new TapetiPublisher(worker.Value);
+ return new TapetiPublisher(() => worker.Value);
}
diff --git a/TapetiConnectionExtensions.cs b/TapetiConnectionExtensions.cs
index 72a0fe2..70c8b02 100644
--- a/TapetiConnectionExtensions.cs
+++ b/TapetiConnectionExtensions.cs
@@ -8,7 +8,7 @@ namespace Tapeti
{
public static TapetiConnection RegisterAllControllers(this TapetiConnection connection, Assembly assembly)
{
- foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute))))
+ foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(DynamicQueueAttribute))))
connection.RegisterController(type);
return connection;
diff --git a/Test/MarcoEmitter.cs b/Test/MarcoEmitter.cs
new file mode 100644
index 0000000..4fb8561
--- /dev/null
+++ b/Test/MarcoEmitter.cs
@@ -0,0 +1,37 @@
+using System.Threading;
+using System.Threading.Tasks;
+using Tapeti;
+
+namespace Test
+{
+ public class MarcoEmitter
+ {
+ private readonly IPublisher publisher;
+
+
+ public MarcoEmitter(IPublisher publisher)
+ {
+ this.publisher = publisher;
+ }
+
+
+ public async Task Run()
+ {
+ var concurrent = new SemaphoreSlim(20);
+
+ //for (var x = 0; x < 5000; x++)
+ while (true)
+ {
+ await concurrent.WaitAsync();
+ try
+ {
+ await publisher.Publish(new MarcoMessage());
+ }
+ finally
+ {
+ concurrent.Release();
+ }
+ }
+ }
+ }
+}
diff --git a/Test/Program.cs b/Test/Program.cs
index 6e6fe03..d27f8c8 100644
--- a/Test/Program.cs
+++ b/Test/Program.cs
@@ -19,19 +19,14 @@ namespace Test
.WithDependencyResolver(new SimpleInjectorDependencyResolver(container))
.RegisterAllControllers(typeof(Program).Assembly))
{
- container.Register(() => connection.GetPublisher());
-
+ container.Register();
+
Console.WriteLine("Subscribing...");
connection.Subscribe().Wait();
Console.WriteLine("Done!");
- var publisher = connection.GetPublisher();
-
- //for (var x = 0; x < 5000; x++)
- while(true)
- publisher.Publish(new MarcoMessage()).Wait();
-
- //Console.ReadLine();
+ var emitter = container.GetInstance();
+ emitter.Run().Wait();
}
}
}
diff --git a/Test/Test.csproj b/Test/Test.csproj
index d60e49b..98eb7c2 100644
--- a/Test/Test.csproj
+++ b/Test/Test.csproj
@@ -48,6 +48,7 @@
+
diff --git a/Test/TestQueueController.cs b/Test/TestQueueController.cs
index 2eed944..09f779b 100644
--- a/Test/TestQueueController.cs
+++ b/Test/TestQueueController.cs
@@ -5,10 +5,8 @@ using Tapeti.Annotations;
namespace Test
{
- //[Exchange("myexchange")]
- //[Queue("staticqueue")]
- [Queue]
- public class TestQueueController
+ [DynamicQueue]
+ public class TestQueueController : MessageController
{
private readonly IPublisher publisher;
@@ -19,10 +17,10 @@ namespace Test
}
- public async Task Marco(MarcoMessage message)
+ public PoloMessage Marco(MarcoMessage message)
{
Console.WriteLine("Marco!");
- await publisher.Publish(new PoloMessage());
+ return new PoloMessage();
}