1
0
mirror of synced 2025-01-23 08:23:08 +01:00

Started refactoring, not in any usable state yet

This commit is contained in:
Mark van Renswoude 2016-12-05 08:00:09 +01:00
parent 1f64fea559
commit e62aa7d482
18 changed files with 189 additions and 67 deletions

View File

@ -0,0 +1,13 @@
using System;
namespace Tapeti.Annotations
{
/// <summary>
/// Creates a non-durable auto-delete queue to receive messages. Can be used
/// on an entire MessageController class or on individual methods.
/// </summary>
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
public class DynamicQueueAttribute : Attribute
{
}
}

View File

@ -0,0 +1,9 @@
using System;
namespace Tapeti.Annotations
{
[AttributeUsage(AttributeTargets.Class)]
public class MessageControllerAttribute : Attribute
{
}
}

View File

@ -1,18 +0,0 @@
using System;
namespace Tapeti.Annotations
{
[AttributeUsage(AttributeTargets.Class)]
public class QueueAttribute : Attribute
{
public string Name { get; set; }
public bool Dynamic { get; set; }
public QueueAttribute(string name = null)
{
Name = name;
Dynamic = (name == null);
}
}
}

View File

@ -0,0 +1,25 @@
using System;
namespace Tapeti.Annotations
{
/// <summary>
/// Binds to an existing durable queue to receive messages. Can be used
/// on an entire MessageController class or on individual methods.
/// </summary>
/// <remarks>
/// At the moment there is no support for creating a durable queue and managing the
/// bindings. The author recommends https://git.x2software.net/pub/RabbitMetaQueue
/// for deploy-time management of durable queues (shameless plug intended).
/// </remarks>
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
public class StaticQueueAttribute : Attribute
{
public string Name { get; set; }
public StaticQueueAttribute(string name)
{
Name = name;
}
}
}

View File

@ -1,21 +1,22 @@
using System.Threading.Tasks; using System;
using System.Threading.Tasks;
namespace Tapeti.Connection namespace Tapeti.Connection
{ {
public class TapetiPublisher : IPublisher public class TapetiPublisher : IPublisher
{ {
private readonly TapetiWorker worker; private readonly Func<TapetiWorker> workerFactory;
public TapetiPublisher(TapetiWorker worker) public TapetiPublisher(Func<TapetiWorker> workerFactory)
{ {
this.worker = worker; this.workerFactory = workerFactory;
} }
public Task Publish(object message) public Task Publish(object message)
{ {
return worker.Publish(message); return workerFactory().Publish(message);
} }
} }
} }

View File

@ -1,4 +1,5 @@
using System.Collections.Generic; using System;
using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -6,18 +7,18 @@ namespace Tapeti.Connection
{ {
public class TapetiSubscriber : ISubscriber public class TapetiSubscriber : ISubscriber
{ {
private readonly TapetiWorker worker; private readonly Func<TapetiWorker> workerFactory;
public TapetiSubscriber(TapetiWorker worker) public TapetiSubscriber(Func<TapetiWorker> workerFactory)
{ {
this.worker = worker; this.workerFactory = workerFactory;
} }
public async Task BindQueues(IEnumerable<IQueueRegistration> registrations) public async Task BindQueues(IEnumerable<IQueueRegistration> registrations)
{ {
await Task.WhenAll(registrations.Select(registration => worker.Subscribe(registration)).ToList()); await Task.WhenAll(registrations.Select(registration => workerFactory().Subscribe(registration)).ToList());
} }
} }
} }

View File

@ -15,12 +15,12 @@ namespace Tapeti.Default
private readonly Lazy<DefaultRoutingKeyStrategy> routingKeyStrategy = new Lazy<DefaultRoutingKeyStrategy>(); private readonly Lazy<DefaultRoutingKeyStrategy> routingKeyStrategy = new Lazy<DefaultRoutingKeyStrategy>();
private readonly Lazy<DefaultMessageSerializer> messageSerializer = new Lazy<DefaultMessageSerializer>(); private readonly Lazy<DefaultMessageSerializer> messageSerializer = new Lazy<DefaultMessageSerializer>();
private readonly Lazy<ILogger> logger; private readonly Lazy<ILogger> logger;
private IPublisher publisher;
public DefaultDependencyResolver()
public DefaultDependencyResolver(Func<IPublisher> publisherFactory)
{ {
controllerFactory = new Lazy<DefaultControllerFactory>(() => new DefaultControllerFactory(publisherFactory)); controllerFactory = new Lazy<DefaultControllerFactory>(() => new DefaultControllerFactory(() => publisher));
logger = new Lazy<ILogger>(() => logger = new Lazy<ILogger>(() =>
{ {
@ -58,6 +58,12 @@ namespace Tapeti.Default
} }
public void RegisterPublisher(IPublisher value)
{
publisher = value;
}
public void RegisterController(Type type) public void RegisterController(Type type)
{ {
controllerFactory.Value.RegisterController(type); controllerFactory.Value.RegisterController(type);

View File

@ -10,6 +10,7 @@ namespace Tapeti
public interface IDependencyInjector : IDependencyResolver public interface IDependencyInjector : IDependencyResolver
{ {
void RegisterPublisher(IPublisher publisher);
void RegisterController(Type type); void RegisterController(Type type);
} }
} }

16
MessageController.cs Normal file
View File

@ -0,0 +1,16 @@
using Tapeti.Annotations;
namespace Tapeti
{
/// <summary>
/// Base class for message controllers
/// </summary>
/// <remarks>
/// Using this base class is not required, you can add the MessageController attribute
/// to any class.
/// </remarks>
[MessageController]
public abstract class MessageController
{
}
}

View File

@ -1,3 +1,14 @@
'Small to medium-sized and classified as "Least Concern" by the IUCN.' # Tapeti
> 'Small to medium-sized and classified as "Least Concern" by the IUCN.'
>
> [_Wikipedia_](https://en.wikipedia.org/wiki/Tapeti)
- Wikipedia Tapeti is a wrapper for the RabbitMQ .NET client designed for long-running microservices with a few specific goals:
1. Automatic registration of message handlers
2. Publishing without transport details
* Routing key generated based on class name
* One exchange (per service / group of services) to publish them all
3. Attribute based, no base class requirements (only for convenience)
4. Graceful handling of connection issues, even at startup
5. Basic Saga support

View File

@ -1,4 +1,5 @@
using System.Linq; using System;
using System.Linq;
using System.Reflection; using System.Reflection;
using SimpleInjector; using SimpleInjector;
using Tapeti.Annotations; using Tapeti.Annotations;
@ -7,7 +8,7 @@ using System.Collections.Generic;
namespace Tapeti.SimpleInjector namespace Tapeti.SimpleInjector
{ {
public class SimpleInjectorDependencyResolver : IDependencyResolver public class SimpleInjectorDependencyResolver : IDependencyResolver, IDependencyInjector
{ {
private readonly Container container; private readonly Container container;
@ -25,6 +26,18 @@ namespace Tapeti.SimpleInjector
} }
public void RegisterPublisher(IPublisher publisher)
{
IfUnregistered<IPublisher>(container.GetCurrentRegistrations(), () => container.RegisterSingleton(publisher));
}
public void RegisterController(Type type)
{
container.Register(type);
}
public SimpleInjectorDependencyResolver RegisterDefaults() public SimpleInjectorDependencyResolver RegisterDefaults()
{ {
var currentRegistrations = container.GetCurrentRegistrations(); var currentRegistrations = container.GetCurrentRegistrations();
@ -37,20 +50,18 @@ namespace Tapeti.SimpleInjector
} }
public SimpleInjectorDependencyResolver RegisterAllControllers(Assembly assembly)
{
foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute))))
container.Register(type);
return this;
}
private void IfUnregistered<TService, TImplementation>(IEnumerable<InstanceProducer> currentRegistrations) where TService : class where TImplementation: class, TService private void IfUnregistered<TService, TImplementation>(IEnumerable<InstanceProducer> currentRegistrations) where TService : class where TImplementation: class, TService
{ {
// ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates
if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService))) if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService)))
container.Register<TService, TImplementation>(); container.Register<TService, TImplementation>();
} }
private void IfUnregistered<TService>(IEnumerable<InstanceProducer> currentRegistrations, Action register) where TService : class
{
// ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates
if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService)))
register();
}
} }
} }

View File

@ -50,7 +50,9 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="Annotations\ExchangeAttribute.cs" /> <Compile Include="Annotations\ExchangeAttribute.cs" />
<Compile Include="Annotations\QueueAttribute.cs" /> <Compile Include="Annotations\MessageControllerAttribute.cs" />
<Compile Include="Annotations\StaticQueueAttribute.cs" />
<Compile Include="Annotations\DynamicQueueAttribute.cs" />
<Compile Include="Connection\TapetiConsumer.cs" /> <Compile Include="Connection\TapetiConsumer.cs" />
<Compile Include="Connection\TapetiPublisher.cs" /> <Compile Include="Connection\TapetiPublisher.cs" />
<Compile Include="Connection\TapetiSubscriber.cs" /> <Compile Include="Connection\TapetiSubscriber.cs" />
@ -58,6 +60,7 @@
<Compile Include="Default\ConsoleLogger.cs" /> <Compile Include="Default\ConsoleLogger.cs" />
<Compile Include="Default\DevNullLogger.cs" /> <Compile Include="Default\DevNullLogger.cs" />
<Compile Include="ILogger.cs" /> <Compile Include="ILogger.cs" />
<Compile Include="MessageController.cs" />
<Compile Include="TapetiConnectionExtensions.cs" /> <Compile Include="TapetiConnectionExtensions.cs" />
<Compile Include="TapetiConnectionParams.cs" /> <Compile Include="TapetiConnectionParams.cs" />
<Compile Include="TapetiTypes.cs" /> <Compile Include="TapetiTypes.cs" />

View File

@ -19,8 +19,20 @@ namespace Tapeti
public IDependencyResolver DependencyResolver public IDependencyResolver DependencyResolver
{ {
get { return dependencyResolver ?? (dependencyResolver = new DefaultDependencyResolver(GetPublisher)); } get
set { dependencyResolver = value; } {
if (dependencyResolver == null)
DependencyResolver = new DefaultDependencyResolver();
return dependencyResolver;
}
set
{
dependencyResolver = value;
var dependencyInjector = value as IDependencyInjector;
dependencyInjector?.RegisterPublisher(GetPublisher());
}
} }
@ -43,14 +55,14 @@ namespace Tapeti
public TapetiConnection WithDependencyResolver(IDependencyResolver resolver) public TapetiConnection WithDependencyResolver(IDependencyResolver resolver)
{ {
dependencyResolver = resolver; DependencyResolver = resolver;
return this; return this;
} }
public TapetiConnection RegisterController(Type type) public TapetiConnection RegisterController(Type type)
{ {
var queueAttribute = type.GetCustomAttribute<QueueAttribute>(); var queueAttribute = type.GetCustomAttribute<MessageController>();
if (queueAttribute == null) if (queueAttribute == null)
throw new ArgumentException("Queue attribute required on class", nameof(type)); throw new ArgumentException("Queue attribute required on class", nameof(type));
@ -84,7 +96,7 @@ namespace Tapeti
if (!registrations.IsValueCreated || registrations.Value.Count == 0) if (!registrations.IsValueCreated || registrations.Value.Count == 0)
throw new ArgumentException("No controllers registered"); throw new ArgumentException("No controllers registered");
var subscriber = new TapetiSubscriber(worker.Value); var subscriber = new TapetiSubscriber(() => worker.Value);
await subscriber.BindQueues(registrations.Value); await subscriber.BindQueues(registrations.Value);
return subscriber; return subscriber;
@ -93,7 +105,7 @@ namespace Tapeti
public IPublisher GetPublisher() public IPublisher GetPublisher()
{ {
return new TapetiPublisher(worker.Value); return new TapetiPublisher(() => worker.Value);
} }

View File

@ -8,7 +8,7 @@ namespace Tapeti
{ {
public static TapetiConnection RegisterAllControllers(this TapetiConnection connection, Assembly assembly) public static TapetiConnection RegisterAllControllers(this TapetiConnection connection, Assembly assembly)
{ {
foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute)))) foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(DynamicQueueAttribute))))
connection.RegisterController(type); connection.RegisterController(type);
return connection; return connection;

37
Test/MarcoEmitter.cs Normal file
View File

@ -0,0 +1,37 @@
using System.Threading;
using System.Threading.Tasks;
using Tapeti;
namespace Test
{
public class MarcoEmitter
{
private readonly IPublisher publisher;
public MarcoEmitter(IPublisher publisher)
{
this.publisher = publisher;
}
public async Task Run()
{
var concurrent = new SemaphoreSlim(20);
//for (var x = 0; x < 5000; x++)
while (true)
{
await concurrent.WaitAsync();
try
{
await publisher.Publish(new MarcoMessage());
}
finally
{
concurrent.Release();
}
}
}
}
}

View File

@ -19,19 +19,14 @@ namespace Test
.WithDependencyResolver(new SimpleInjectorDependencyResolver(container)) .WithDependencyResolver(new SimpleInjectorDependencyResolver(container))
.RegisterAllControllers(typeof(Program).Assembly)) .RegisterAllControllers(typeof(Program).Assembly))
{ {
container.Register(() => connection.GetPublisher()); container.Register<MarcoEmitter>();
Console.WriteLine("Subscribing..."); Console.WriteLine("Subscribing...");
connection.Subscribe().Wait(); connection.Subscribe().Wait();
Console.WriteLine("Done!"); Console.WriteLine("Done!");
var publisher = connection.GetPublisher(); var emitter = container.GetInstance<MarcoEmitter>();
emitter.Run().Wait();
//for (var x = 0; x < 5000; x++)
while(true)
publisher.Publish(new MarcoMessage()).Wait();
//Console.ReadLine();
} }
} }
} }

View File

@ -48,6 +48,7 @@
<Reference Include="System.Xml" /> <Reference Include="System.Xml" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="MarcoEmitter.cs" />
<Compile Include="Program.cs" /> <Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TestQueueController.cs" /> <Compile Include="TestQueueController.cs" />

View File

@ -5,10 +5,8 @@ using Tapeti.Annotations;
namespace Test namespace Test
{ {
//[Exchange("myexchange")] [DynamicQueue]
//[Queue("staticqueue")] public class TestQueueController : MessageController
[Queue]
public class TestQueueController
{ {
private readonly IPublisher publisher; private readonly IPublisher publisher;
@ -19,10 +17,10 @@ namespace Test
} }
public async Task Marco(MarcoMessage message) public PoloMessage Marco(MarcoMessage message)
{ {
Console.WriteLine("Marco!"); Console.WriteLine("Marco!");
await publisher.Publish(new PoloMessage()); return new PoloMessage();
} }