Merge branch 'feature/multiqueuecontrollers' into develop
This commit is contained in:
commit
5871c8b7f0
13
Annotations/DynamicQueueAttribute.cs
Normal file
13
Annotations/DynamicQueueAttribute.cs
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Tapeti.Annotations
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a non-durable auto-delete queue to receive messages. Can be used
|
||||||
|
/// on an entire MessageController class or on individual methods.
|
||||||
|
/// </summary>
|
||||||
|
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
|
||||||
|
public class DynamicQueueAttribute : Attribute
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
9
Annotations/MessageControllerAttribute.cs
Normal file
9
Annotations/MessageControllerAttribute.cs
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Tapeti.Annotations
|
||||||
|
{
|
||||||
|
[AttributeUsage(AttributeTargets.Class)]
|
||||||
|
public class MessageControllerAttribute : Attribute
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
@ -1,18 +0,0 @@
|
|||||||
using System;
|
|
||||||
|
|
||||||
namespace Tapeti.Annotations
|
|
||||||
{
|
|
||||||
[AttributeUsage(AttributeTargets.Class)]
|
|
||||||
public class QueueAttribute : Attribute
|
|
||||||
{
|
|
||||||
public string Name { get; set; }
|
|
||||||
public bool Dynamic { get; set; }
|
|
||||||
|
|
||||||
|
|
||||||
public QueueAttribute(string name = null)
|
|
||||||
{
|
|
||||||
Name = name;
|
|
||||||
Dynamic = (name == null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
25
Annotations/StaticQueueAttribute.cs
Normal file
25
Annotations/StaticQueueAttribute.cs
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Tapeti.Annotations
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Binds to an existing durable queue to receive messages. Can be used
|
||||||
|
/// on an entire MessageController class or on individual methods.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// At the moment there is no support for creating a durable queue and managing the
|
||||||
|
/// bindings. The author recommends https://git.x2software.net/pub/RabbitMetaQueue
|
||||||
|
/// for deploy-time management of durable queues (shameless plug intended).
|
||||||
|
/// </remarks>
|
||||||
|
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
|
||||||
|
public class StaticQueueAttribute : Attribute
|
||||||
|
{
|
||||||
|
public string Name { get; set; }
|
||||||
|
|
||||||
|
|
||||||
|
public StaticQueueAttribute(string name)
|
||||||
|
{
|
||||||
|
Name = name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
24
Config/IBindingContext.cs
Normal file
24
Config/IBindingContext.cs
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Reflection;
|
||||||
|
|
||||||
|
namespace Tapeti.Config
|
||||||
|
{
|
||||||
|
public delegate object ValueFactory(IMessageContext context);
|
||||||
|
|
||||||
|
|
||||||
|
public interface IBindingContext
|
||||||
|
{
|
||||||
|
Type MessageClass { get; set; }
|
||||||
|
IReadOnlyList<IBindingParameter> Parameters { get; }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public interface IBindingParameter
|
||||||
|
{
|
||||||
|
ParameterInfo Info { get; }
|
||||||
|
bool HasBinding { get; }
|
||||||
|
|
||||||
|
void SetBinding(ValueFactory valueFactory);
|
||||||
|
}
|
||||||
|
}
|
9
Config/IBindingMiddleware.cs
Normal file
9
Config/IBindingMiddleware.cs
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Tapeti.Config
|
||||||
|
{
|
||||||
|
public interface IBindingMiddleware
|
||||||
|
{
|
||||||
|
void Handle(IBindingContext context, Action next);
|
||||||
|
}
|
||||||
|
}
|
35
Config/IConfig.cs
Normal file
35
Config/IConfig.cs
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Reflection;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Tapeti.Config
|
||||||
|
{
|
||||||
|
public interface IConfig
|
||||||
|
{
|
||||||
|
string Exchange { get; }
|
||||||
|
IDependencyResolver DependencyResolver { get; }
|
||||||
|
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
|
||||||
|
IEnumerable<IQueue> Queues { get; }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public interface IQueue
|
||||||
|
{
|
||||||
|
bool Dynamic { get; }
|
||||||
|
string Name { get; }
|
||||||
|
|
||||||
|
IEnumerable<IBinding> Bindings { get; }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public interface IBinding
|
||||||
|
{
|
||||||
|
Type Controller { get; }
|
||||||
|
MethodInfo Method { get; }
|
||||||
|
Type MessageClass { get; }
|
||||||
|
|
||||||
|
bool Accept(object message);
|
||||||
|
Task<object> Invoke(IMessageContext context, object message);
|
||||||
|
}
|
||||||
|
}
|
11
Config/IMessageContext.cs
Normal file
11
Config/IMessageContext.cs
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace Tapeti.Config
|
||||||
|
{
|
||||||
|
public interface IMessageContext
|
||||||
|
{
|
||||||
|
object Controller { get; }
|
||||||
|
object Message { get; }
|
||||||
|
IDictionary<string, object> Items { get; }
|
||||||
|
}
|
||||||
|
}
|
9
Config/IMessageMiddleware.cs
Normal file
9
Config/IMessageMiddleware.cs
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Tapeti.Config
|
||||||
|
{
|
||||||
|
public interface IMessageMiddleware
|
||||||
|
{
|
||||||
|
void Handle(IMessageContext context, Action next);
|
||||||
|
}
|
||||||
|
}
|
9
Config/IMiddlewareBundle.cs
Normal file
9
Config/IMiddlewareBundle.cs
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace Tapeti.Config
|
||||||
|
{
|
||||||
|
public interface IMiddlewareBundle
|
||||||
|
{
|
||||||
|
IEnumerable<object> GetContents(IDependencyResolver dependencyResolver);
|
||||||
|
}
|
||||||
|
}
|
@ -1,21 +1,26 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Diagnostics.Eventing.Reader;
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
using RabbitMQ.Client;
|
using RabbitMQ.Client;
|
||||||
|
using Tapeti.Config;
|
||||||
|
using Tapeti.Helpers;
|
||||||
|
|
||||||
namespace Tapeti.Connection
|
namespace Tapeti.Connection
|
||||||
{
|
{
|
||||||
public class TapetiConsumer : DefaultBasicConsumer
|
public class TapetiConsumer : DefaultBasicConsumer
|
||||||
{
|
{
|
||||||
private readonly TapetiWorker worker;
|
private readonly TapetiWorker worker;
|
||||||
private readonly IMessageSerializer messageSerializer;
|
private readonly IDependencyResolver dependencyResolver;
|
||||||
private readonly IQueueRegistration queueRegistration;
|
private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware;
|
||||||
|
private readonly List<IBinding> bindings;
|
||||||
|
|
||||||
|
|
||||||
public TapetiConsumer(TapetiWorker worker, IMessageSerializer messageSerializer, IQueueRegistration queueRegistration)
|
public TapetiConsumer(TapetiWorker worker, IDependencyResolver dependencyResolver, IEnumerable<IBinding> bindings, IReadOnlyList<IMessageMiddleware> messageMiddleware)
|
||||||
{
|
{
|
||||||
this.worker = worker;
|
this.worker = worker;
|
||||||
this.messageSerializer = messageSerializer;
|
this.dependencyResolver = dependencyResolver;
|
||||||
this.queueRegistration = queueRegistration;
|
this.messageMiddleware = messageMiddleware;
|
||||||
|
this.bindings = bindings.ToList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -24,22 +29,46 @@ namespace Tapeti.Connection
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var message = messageSerializer.Deserialize(body, properties);
|
var message = dependencyResolver.Resolve<IMessageSerializer>().Deserialize(body, properties);
|
||||||
if (message == null)
|
if (message == null)
|
||||||
throw new ArgumentException("Empty message");
|
throw new ArgumentException("Empty message");
|
||||||
|
|
||||||
if (queueRegistration.Accept(message))
|
var handled = false;
|
||||||
queueRegistration.Visit(message);
|
foreach (var binding in bindings.Where(b => b.Accept(message)))
|
||||||
else
|
{
|
||||||
|
var context = new MessageContext
|
||||||
|
{
|
||||||
|
Controller = dependencyResolver.Resolve(binding.Controller),
|
||||||
|
Message = message
|
||||||
|
};
|
||||||
|
|
||||||
|
MiddlewareHelper.Go(messageMiddleware, (handler, next) => handler.Handle(context, next));
|
||||||
|
|
||||||
|
var result = binding.Invoke(context, message).Result;
|
||||||
|
if (result != null)
|
||||||
|
worker.Publish(result);
|
||||||
|
|
||||||
|
handled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!handled)
|
||||||
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
|
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
|
||||||
|
|
||||||
worker.Respond(deliveryTag, ConsumeResponse.Ack);
|
worker.Respond(deliveryTag, ConsumeResponse.Ack);
|
||||||
}
|
}
|
||||||
catch (Exception)
|
catch (Exception)
|
||||||
{
|
{
|
||||||
//TODO pluggable exception handling
|
worker.Respond(deliveryTag, ConsumeResponse.Requeue);
|
||||||
worker.Respond(deliveryTag, ConsumeResponse.Nack);
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected class MessageContext : IMessageContext
|
||||||
|
{
|
||||||
|
public object Controller { get; set; }
|
||||||
|
public object Message { get; set; }
|
||||||
|
public IDictionary<string, object> Items { get; } = new Dictionary<string, object>();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,21 +1,22 @@
|
|||||||
using System.Threading.Tasks;
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace Tapeti.Connection
|
namespace Tapeti.Connection
|
||||||
{
|
{
|
||||||
public class TapetiPublisher : IPublisher
|
public class TapetiPublisher : IPublisher
|
||||||
{
|
{
|
||||||
private readonly TapetiWorker worker;
|
private readonly Func<TapetiWorker> workerFactory;
|
||||||
|
|
||||||
|
|
||||||
public TapetiPublisher(TapetiWorker worker)
|
public TapetiPublisher(Func<TapetiWorker> workerFactory)
|
||||||
{
|
{
|
||||||
this.worker = worker;
|
this.workerFactory = workerFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task Publish(object message)
|
public Task Publish(object message)
|
||||||
{
|
{
|
||||||
return worker.Publish(message);
|
return workerFactory().Publish(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,23 +1,25 @@
|
|||||||
using System.Collections.Generic;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
using Tapeti.Config;
|
||||||
|
|
||||||
namespace Tapeti.Connection
|
namespace Tapeti.Connection
|
||||||
{
|
{
|
||||||
public class TapetiSubscriber : ISubscriber
|
public class TapetiSubscriber : ISubscriber
|
||||||
{
|
{
|
||||||
private readonly TapetiWorker worker;
|
private readonly Func<TapetiWorker> workerFactory;
|
||||||
|
|
||||||
|
|
||||||
public TapetiSubscriber(TapetiWorker worker)
|
public TapetiSubscriber(Func<TapetiWorker> workerFactory)
|
||||||
{
|
{
|
||||||
this.worker = worker;
|
this.workerFactory = workerFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public async Task BindQueues(IEnumerable<IQueueRegistration> registrations)
|
public async Task BindQueues(IEnumerable<IQueue> queues)
|
||||||
{
|
{
|
||||||
await Task.WhenAll(registrations.Select(registration => worker.Subscribe(registration)).ToList());
|
await Task.WhenAll(queues.Select(queue => workerFactory().Subscribe(queue)).ToList());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using RabbitMQ.Client;
|
using RabbitMQ.Client;
|
||||||
using RabbitMQ.Client.Exceptions;
|
using RabbitMQ.Client.Exceptions;
|
||||||
using RabbitMQ.Client.Framing;
|
using RabbitMQ.Client.Framing;
|
||||||
|
using Tapeti.Config;
|
||||||
using Tapeti.Tasks;
|
using Tapeti.Tasks;
|
||||||
|
|
||||||
namespace Tapeti.Connection
|
namespace Tapeti.Connection
|
||||||
@ -10,20 +12,23 @@ namespace Tapeti.Connection
|
|||||||
public class TapetiWorker
|
public class TapetiWorker
|
||||||
{
|
{
|
||||||
public TapetiConnectionParams ConnectionParams { get; set; }
|
public TapetiConnectionParams ConnectionParams { get; set; }
|
||||||
public string PublishExchange { get; set; }
|
public string Exchange { get; set; }
|
||||||
|
|
||||||
|
|
||||||
|
private readonly IDependencyResolver dependencyResolver;
|
||||||
|
private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware;
|
||||||
private readonly IMessageSerializer messageSerializer;
|
private readonly IMessageSerializer messageSerializer;
|
||||||
private readonly IRoutingKeyStrategy routingKeyStrategy;
|
private readonly IRoutingKeyStrategy routingKeyStrategy;
|
||||||
private readonly Lazy<SingleThreadTaskQueue> taskQueue = new Lazy<SingleThreadTaskQueue>();
|
private readonly Lazy<SingleThreadTaskQueue> taskQueue = new Lazy<SingleThreadTaskQueue>();
|
||||||
private IConnection connection;
|
private RabbitMQ.Client.IConnection connection;
|
||||||
private IModel channel;
|
private IModel channelInstance;
|
||||||
|
|
||||||
|
|
||||||
public TapetiWorker(IMessageSerializer messageSerializer, IRoutingKeyStrategy routingKeyStrategy)
|
public TapetiWorker(IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware)
|
||||||
{
|
{
|
||||||
this.messageSerializer = messageSerializer;
|
this.dependencyResolver = dependencyResolver;
|
||||||
this.routingKeyStrategy = routingKeyStrategy;
|
this.messageMiddleware = messageMiddleware;
|
||||||
|
messageSerializer = dependencyResolver.Resolve<IMessageSerializer>();
|
||||||
|
routingKeyStrategy = dependencyResolver.Resolve<IRoutingKeyStrategy>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -35,29 +40,45 @@ namespace Tapeti.Connection
|
|||||||
var body = messageSerializer.Serialize(message, properties);
|
var body = messageSerializer.Serialize(message, properties);
|
||||||
|
|
||||||
(await GetChannel())
|
(await GetChannel())
|
||||||
.BasicPublish(PublishExchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false,
|
.BasicPublish(Exchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false,
|
||||||
properties, body);
|
properties, body);
|
||||||
}).Unwrap();
|
}).Unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task Subscribe(string queueName, IQueueRegistration queueRegistration)
|
public Task Consume(string queueName, IEnumerable<IBinding> bindings)
|
||||||
{
|
{
|
||||||
return taskQueue.Value.Add(async () =>
|
return taskQueue.Value.Add(async () =>
|
||||||
{
|
{
|
||||||
(await GetChannel())
|
(await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, dependencyResolver, bindings, messageMiddleware));
|
||||||
.BasicConsume(queueName, false, new TapetiConsumer(this, messageSerializer, queueRegistration));
|
|
||||||
}).Unwrap();
|
}).Unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public async Task Subscribe(IQueueRegistration registration)
|
public async Task Subscribe(IQueue queue)
|
||||||
{
|
{
|
||||||
var queueName = await taskQueue.Value.Add(async () =>
|
var queueName = await taskQueue.Value.Add(async () =>
|
||||||
registration.BindQueue(await GetChannel()))
|
{
|
||||||
.Unwrap();
|
var channel = await GetChannel();
|
||||||
|
|
||||||
await Subscribe(queueName, registration);
|
if (queue.Dynamic)
|
||||||
|
{
|
||||||
|
var dynamicQueue = channel.QueueDeclare();
|
||||||
|
|
||||||
|
foreach (var binding in queue.Bindings)
|
||||||
|
{
|
||||||
|
var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass);
|
||||||
|
channel.QueueBind(dynamicQueue.QueueName, Exchange, routingKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
return dynamicQueue.QueueName;
|
||||||
|
}
|
||||||
|
|
||||||
|
channel.QueueDeclarePassive(queue.Name);
|
||||||
|
return queue.Name;
|
||||||
|
}).Unwrap();
|
||||||
|
|
||||||
|
await Consume(queueName, queue.Bindings);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -91,10 +112,10 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
return taskQueue.Value.Add(() =>
|
return taskQueue.Value.Add(() =>
|
||||||
{
|
{
|
||||||
if (channel != null)
|
if (channelInstance != null)
|
||||||
{
|
{
|
||||||
channel.Dispose();
|
channelInstance.Dispose();
|
||||||
channel = null;
|
channelInstance = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReSharper disable once InvertIf
|
// ReSharper disable once InvertIf
|
||||||
@ -115,8 +136,8 @@ namespace Tapeti.Connection
|
|||||||
/// </remarks>
|
/// </remarks>
|
||||||
private async Task<IModel> GetChannel()
|
private async Task<IModel> GetChannel()
|
||||||
{
|
{
|
||||||
if (channel != null)
|
if (channelInstance != null)
|
||||||
return channel;
|
return channelInstance;
|
||||||
|
|
||||||
var connectionFactory = new ConnectionFactory
|
var connectionFactory = new ConnectionFactory
|
||||||
{
|
{
|
||||||
@ -134,7 +155,7 @@ namespace Tapeti.Connection
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
connection = connectionFactory.CreateConnection();
|
connection = connectionFactory.CreateConnection();
|
||||||
channel = connection.CreateModel();
|
channelInstance = connection.CreateModel();
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -144,7 +165,7 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return channel;
|
return channelInstance;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
13
Default/BindingBufferStop.cs
Normal file
13
Default/BindingBufferStop.cs
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
using System;
|
||||||
|
using Tapeti.Config;
|
||||||
|
|
||||||
|
namespace Tapeti.Default
|
||||||
|
{
|
||||||
|
// End of the line...
|
||||||
|
public class BindingBufferStop : IBindingMiddleware
|
||||||
|
{
|
||||||
|
public void Handle(IBindingContext context, Action next)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,62 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Reflection;
|
|
||||||
|
|
||||||
namespace Tapeti.Default
|
|
||||||
{
|
|
||||||
public class DefaultControllerFactory : IControllerFactory
|
|
||||||
{
|
|
||||||
private readonly Dictionary<Type, Func<object>> controllerConstructors = new Dictionary<Type, Func<object>>();
|
|
||||||
private readonly Func<IPublisher> publisherFactory;
|
|
||||||
|
|
||||||
public DefaultControllerFactory(Func<IPublisher> publisherFactory)
|
|
||||||
{
|
|
||||||
this.publisherFactory = publisherFactory;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public object CreateController(Type controllerType)
|
|
||||||
{
|
|
||||||
Func<object> constructor;
|
|
||||||
if (!controllerConstructors.TryGetValue(controllerType, out constructor))
|
|
||||||
throw new ArgumentException($"Can not create unregistered controller {controllerType.FullName}");
|
|
||||||
|
|
||||||
return constructor();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void RegisterController(Type type)
|
|
||||||
{
|
|
||||||
controllerConstructors.Add(type, GetConstructor(type));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
protected Func<object> GetConstructor(Type type)
|
|
||||||
{
|
|
||||||
var constructors = type.GetConstructors();
|
|
||||||
|
|
||||||
ConstructorInfo publisherConstructor = null;
|
|
||||||
ConstructorInfo emptyConstructor = null;
|
|
||||||
|
|
||||||
foreach (var constructor in constructors)
|
|
||||||
{
|
|
||||||
var parameters = constructor.GetParameters();
|
|
||||||
if (parameters.Length > 0)
|
|
||||||
{
|
|
||||||
if (parameters.Length == 1 && parameters[0].ParameterType == typeof(IPublisher))
|
|
||||||
publisherConstructor = constructor;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
emptyConstructor = constructor;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (publisherConstructor != null)
|
|
||||||
return () => publisherConstructor.Invoke(new object[] { publisherFactory() });
|
|
||||||
|
|
||||||
if (emptyConstructor != null)
|
|
||||||
return () => emptyConstructor.Invoke(null);
|
|
||||||
|
|
||||||
throw new ArgumentException($"Unable to construct type {type.Name}, a parameterless constructor or one with only an IPublisher parameter is required");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,66 +0,0 @@
|
|||||||
using System;
|
|
||||||
|
|
||||||
namespace Tapeti.Default
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* !! IoC Container 9000 !!
|
|
||||||
*
|
|
||||||
* ...you probably want to replace this one as soon as possible.
|
|
||||||
*
|
|
||||||
* A Simple Injector implementation is provided in the Tapeti.SimpleInjector package.
|
|
||||||
*/
|
|
||||||
public class DefaultDependencyResolver : IDependencyInjector
|
|
||||||
{
|
|
||||||
private readonly Lazy<DefaultControllerFactory> controllerFactory;
|
|
||||||
private readonly Lazy<DefaultRoutingKeyStrategy> routingKeyStrategy = new Lazy<DefaultRoutingKeyStrategy>();
|
|
||||||
private readonly Lazy<DefaultMessageSerializer> messageSerializer = new Lazy<DefaultMessageSerializer>();
|
|
||||||
private readonly Lazy<ILogger> logger;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public DefaultDependencyResolver(Func<IPublisher> publisherFactory)
|
|
||||||
{
|
|
||||||
controllerFactory = new Lazy<DefaultControllerFactory>(() => new DefaultControllerFactory(publisherFactory));
|
|
||||||
|
|
||||||
logger = new Lazy<ILogger>(() =>
|
|
||||||
{
|
|
||||||
// http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console
|
|
||||||
try
|
|
||||||
{
|
|
||||||
// ReSharper disable once UnusedVariable
|
|
||||||
var dummy = Console.WindowHeight;
|
|
||||||
|
|
||||||
return new ConsoleLogger();
|
|
||||||
}
|
|
||||||
catch
|
|
||||||
{
|
|
||||||
return new DevNullLogger();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public T Resolve<T>() where T : class
|
|
||||||
{
|
|
||||||
if (typeof(T) == typeof(IControllerFactory))
|
|
||||||
return (T)(controllerFactory.Value as IControllerFactory);
|
|
||||||
|
|
||||||
if (typeof(T) == typeof(IRoutingKeyStrategy))
|
|
||||||
return (T)(routingKeyStrategy.Value as IRoutingKeyStrategy);
|
|
||||||
|
|
||||||
if (typeof(T) == typeof(IMessageSerializer))
|
|
||||||
return (T)(messageSerializer.Value as IMessageSerializer);
|
|
||||||
|
|
||||||
if (typeof(T) == typeof(ILogger))
|
|
||||||
return (T)logger.Value;
|
|
||||||
|
|
||||||
return default(T);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void RegisterController(Type type)
|
|
||||||
{
|
|
||||||
controllerFactory.Value.RegisterController(type);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
26
Default/DependencyResolverBinding.cs
Normal file
26
Default/DependencyResolverBinding.cs
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
using System;
|
||||||
|
using System.Linq;
|
||||||
|
using Tapeti.Config;
|
||||||
|
|
||||||
|
namespace Tapeti.Default
|
||||||
|
{
|
||||||
|
public class DependencyResolverBinding : IBindingMiddleware
|
||||||
|
{
|
||||||
|
private readonly IDependencyResolver resolver;
|
||||||
|
|
||||||
|
|
||||||
|
public DependencyResolverBinding(IDependencyResolver resolver)
|
||||||
|
{
|
||||||
|
this.resolver = resolver;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void Handle(IBindingContext context, Action next)
|
||||||
|
{
|
||||||
|
next();
|
||||||
|
|
||||||
|
foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType.IsClass))
|
||||||
|
parameter.SetBinding(messageContext => resolver.Resolve(parameter.Info.ParameterType));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
23
Default/MessageBinding.cs
Normal file
23
Default/MessageBinding.cs
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
using System;
|
||||||
|
using Tapeti.Config;
|
||||||
|
|
||||||
|
namespace Tapeti.Default
|
||||||
|
{
|
||||||
|
public class MessageBinding : IBindingMiddleware
|
||||||
|
{
|
||||||
|
public void Handle(IBindingContext context, Action next)
|
||||||
|
{
|
||||||
|
if (context.Parameters.Count == 0)
|
||||||
|
throw new TopologyConfigurationException("First parameter must be a message class");
|
||||||
|
|
||||||
|
var parameter = context.Parameters[0];
|
||||||
|
if (!parameter.Info.ParameterType.IsClass)
|
||||||
|
throw new TopologyConfigurationException($"First parameter {parameter.Info.Name} must be a message class");
|
||||||
|
|
||||||
|
parameter.SetBinding(messageContext => messageContext.Message);
|
||||||
|
context.MessageClass = parameter.Info.ParameterType;
|
||||||
|
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
22
Helpers/ConsoleHelper.cs
Normal file
22
Helpers/ConsoleHelper.cs
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Tapeti.Helpers
|
||||||
|
{
|
||||||
|
public static class ConsoleHelper
|
||||||
|
{
|
||||||
|
// Source: http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console
|
||||||
|
public static bool IsAvailable()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// ReSharper disable once UnusedVariable - that's why it's called dummy
|
||||||
|
var dummy = Console.WindowHeight;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
26
Helpers/MiddlewareHelper.cs
Normal file
26
Helpers/MiddlewareHelper.cs
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace Tapeti.Helpers
|
||||||
|
{
|
||||||
|
public static class MiddlewareHelper
|
||||||
|
{
|
||||||
|
public static void Go<T>(IReadOnlyList<T> middleware, Action<T, Action> handle)
|
||||||
|
{
|
||||||
|
var handlerIndex = middleware.Count - 1;
|
||||||
|
if (handlerIndex == -1)
|
||||||
|
return;
|
||||||
|
|
||||||
|
Action handleNext = null;
|
||||||
|
|
||||||
|
handleNext = () =>
|
||||||
|
{
|
||||||
|
handlerIndex--;
|
||||||
|
if (handlerIndex >= 0)
|
||||||
|
handle(middleware[handlerIndex], handleNext);
|
||||||
|
};
|
||||||
|
|
||||||
|
handle(middleware[handlerIndex], handleNext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
10
IConnection.cs
Normal file
10
IConnection.cs
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Tapeti
|
||||||
|
{
|
||||||
|
public interface IConnection : IDisposable
|
||||||
|
{
|
||||||
|
Task<ISubscriber> Subscribe();
|
||||||
|
}
|
||||||
|
}
|
@ -1,9 +0,0 @@
|
|||||||
using System;
|
|
||||||
|
|
||||||
namespace Tapeti
|
|
||||||
{
|
|
||||||
public interface IControllerFactory
|
|
||||||
{
|
|
||||||
object CreateController(Type controllerType);
|
|
||||||
}
|
|
||||||
}
|
|
@ -5,11 +5,14 @@ namespace Tapeti
|
|||||||
public interface IDependencyResolver
|
public interface IDependencyResolver
|
||||||
{
|
{
|
||||||
T Resolve<T>() where T : class;
|
T Resolve<T>() where T : class;
|
||||||
|
object Resolve(Type type);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public interface IDependencyInjector : IDependencyResolver
|
public interface IDependencyInjector : IDependencyResolver
|
||||||
{
|
{
|
||||||
|
void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService;
|
||||||
|
void RegisterPublisher(Func<IPublisher> publisher);
|
||||||
void RegisterController(Type type);
|
void RegisterController(Type type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,13 +0,0 @@
|
|||||||
using System.Threading.Tasks;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
|
|
||||||
namespace Tapeti
|
|
||||||
{
|
|
||||||
public interface IQueueRegistration
|
|
||||||
{
|
|
||||||
string BindQueue(IModel channel);
|
|
||||||
|
|
||||||
bool Accept(object message);
|
|
||||||
Task Visit(object message);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,6 +1,4 @@
|
|||||||
using System;
|
namespace Tapeti
|
||||||
|
|
||||||
namespace Tapeti
|
|
||||||
{
|
{
|
||||||
public interface ISubscriber
|
public interface ISubscriber
|
||||||
{
|
{
|
||||||
|
16
MessageController.cs
Normal file
16
MessageController.cs
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
using Tapeti.Annotations;
|
||||||
|
|
||||||
|
namespace Tapeti
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Base class for message controllers
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Using this base class is not required, you can add the MessageController attribute
|
||||||
|
/// to any class.
|
||||||
|
/// </remarks>
|
||||||
|
[MessageController]
|
||||||
|
public abstract class MessageController
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
@ -1,5 +1,4 @@
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using System.Runtime.CompilerServices;
|
|
||||||
using System.Runtime.InteropServices;
|
using System.Runtime.InteropServices;
|
||||||
|
|
||||||
// General Information about an assembly is controlled through the following
|
// General Information about an assembly is controlled through the following
|
||||||
|
15
README.md
15
README.md
@ -1,3 +1,14 @@
|
|||||||
'Small to medium-sized and classified as "Least Concern" by the IUCN.'
|
# Tapeti
|
||||||
|
> 'Small to medium-sized and classified as "Least Concern" by the IUCN.'
|
||||||
|
>
|
||||||
|
> [_Wikipedia_](https://en.wikipedia.org/wiki/Tapeti)
|
||||||
|
|
||||||
- Wikipedia
|
Tapeti is a wrapper for the RabbitMQ .NET client designed for long-running microservices with a few specific goals:
|
||||||
|
|
||||||
|
1. Automatic registration of message handlers
|
||||||
|
2. Publishing without transport details
|
||||||
|
* Routing key generated based on class name
|
||||||
|
* One exchange (per service / group of services) to publish them all
|
||||||
|
3. Attribute based, no base class requirements (only for convenience)
|
||||||
|
4. Graceful handling of connection issues, even at startup
|
||||||
|
5. Basic Saga support
|
@ -1,142 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Reflection;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
using Tapeti.Annotations;
|
|
||||||
|
|
||||||
namespace Tapeti.Registration
|
|
||||||
{
|
|
||||||
using MessageHandlerAction = Func<object, Task>;
|
|
||||||
|
|
||||||
public struct MessageHandler
|
|
||||||
{
|
|
||||||
public MessageHandlerAction Action;
|
|
||||||
public string Exchange;
|
|
||||||
public string RoutingKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public abstract class AbstractControllerRegistration : IQueueRegistration
|
|
||||||
{
|
|
||||||
private readonly Func<IControllerFactory> controllerFactoryFactory;
|
|
||||||
private readonly Type controllerType;
|
|
||||||
private readonly string defaultExchange;
|
|
||||||
private readonly Dictionary<Type, List<MessageHandler>> messageHandlers = new Dictionary<Type, List<MessageHandler>>();
|
|
||||||
|
|
||||||
|
|
||||||
protected AbstractControllerRegistration(Func<IControllerFactory> controllerFactoryFactory, Type controllerType, string defaultExchange)
|
|
||||||
{
|
|
||||||
this.controllerFactoryFactory = controllerFactoryFactory;
|
|
||||||
this.controllerType = controllerType;
|
|
||||||
this.defaultExchange = defaultExchange;
|
|
||||||
|
|
||||||
// ReSharper disable once VirtualMemberCallInConstructor - I know. What do you think this is, C++?
|
|
||||||
GetMessageHandlers(controllerType, (type, handler) =>
|
|
||||||
{
|
|
||||||
if (!messageHandlers.ContainsKey(type))
|
|
||||||
messageHandlers.Add(type, new List<MessageHandler> { handler });
|
|
||||||
else
|
|
||||||
messageHandlers[type].Add(handler);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
protected virtual void GetMessageHandlers(Type type, Action<Type, MessageHandler> add)
|
|
||||||
{
|
|
||||||
foreach (var method in type.GetMembers(BindingFlags.Public | BindingFlags.Instance)
|
|
||||||
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object))
|
|
||||||
.Select(m => (MethodInfo)m))
|
|
||||||
{
|
|
||||||
Type messageType;
|
|
||||||
var messageHandler = GetMessageHandler(method, out messageType);
|
|
||||||
|
|
||||||
add(messageType, messageHandler);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
protected virtual MessageHandler GetMessageHandler(MethodInfo method, out Type messageType)
|
|
||||||
{
|
|
||||||
var parameters = method.GetParameters();
|
|
||||||
|
|
||||||
if (parameters.Length != 1 || !parameters[0].ParameterType.IsClass)
|
|
||||||
throw new ArgumentException($"Method {method.Name} does not have a single object parameter");
|
|
||||||
|
|
||||||
messageType = parameters[0].ParameterType;
|
|
||||||
var messageHandler = new MessageHandler();
|
|
||||||
|
|
||||||
if (method.ReturnType == typeof(void))
|
|
||||||
messageHandler.Action = CreateSyncMessageHandler(method);
|
|
||||||
else if (method.ReturnType == typeof(Task))
|
|
||||||
messageHandler.Action = CreateAsyncMessageHandler(method);
|
|
||||||
else
|
|
||||||
throw new ArgumentException($"Method {method.Name} needs to return void or a Task");
|
|
||||||
|
|
||||||
var exchangeAttribute = method.GetCustomAttribute<ExchangeAttribute>() ?? method.DeclaringType.GetCustomAttribute<ExchangeAttribute>();
|
|
||||||
messageHandler.Exchange = exchangeAttribute?.Name;
|
|
||||||
|
|
||||||
return messageHandler;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
protected IEnumerable<Type> GetMessageTypes()
|
|
||||||
{
|
|
||||||
return messageHandlers.Keys;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
protected IEnumerable<string> GetMessageExchanges(Type type)
|
|
||||||
{
|
|
||||||
var exchanges = messageHandlers[type]
|
|
||||||
.Where(h => h.Exchange != null)
|
|
||||||
.Select(h => h.Exchange)
|
|
||||||
.Distinct(StringComparer.InvariantCulture)
|
|
||||||
.ToArray();
|
|
||||||
|
|
||||||
return exchanges.Length > 0 ? exchanges : new[] { defaultExchange };
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public abstract string BindQueue(IModel channel);
|
|
||||||
|
|
||||||
|
|
||||||
public bool Accept(object message)
|
|
||||||
{
|
|
||||||
return messageHandlers.ContainsKey(message.GetType());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public Task Visit(object message)
|
|
||||||
{
|
|
||||||
var registeredHandlers = messageHandlers[message.GetType()];
|
|
||||||
if (registeredHandlers != null)
|
|
||||||
return Task.WhenAll(registeredHandlers.Select(messageHandler => messageHandler.Action(message)));
|
|
||||||
|
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
protected virtual MessageHandlerAction CreateSyncMessageHandler(MethodInfo method)
|
|
||||||
{
|
|
||||||
return message =>
|
|
||||||
{
|
|
||||||
var controller = controllerFactoryFactory().CreateController(controllerType);
|
|
||||||
method.Invoke(controller, new[] { message });
|
|
||||||
|
|
||||||
return Task.CompletedTask;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
protected virtual MessageHandlerAction CreateAsyncMessageHandler(MethodInfo method)
|
|
||||||
{
|
|
||||||
return message =>
|
|
||||||
{
|
|
||||||
var controller = controllerFactoryFactory().CreateController(controllerType);
|
|
||||||
return (Task)method.Invoke(controller, new[] { message });
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,33 +0,0 @@
|
|||||||
using System;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
|
|
||||||
namespace Tapeti.Registration
|
|
||||||
{
|
|
||||||
public class ControllerDynamicQueueRegistration : AbstractControllerRegistration
|
|
||||||
{
|
|
||||||
private readonly Func<IRoutingKeyStrategy> routingKeyStrategyFactory;
|
|
||||||
|
|
||||||
|
|
||||||
public ControllerDynamicQueueRegistration(Func<IControllerFactory> controllerFactoryFactory, Func<IRoutingKeyStrategy> routingKeyStrategyFactory, Type controllerType, string defaultExchange)
|
|
||||||
: base(controllerFactoryFactory, controllerType, defaultExchange)
|
|
||||||
{
|
|
||||||
this.routingKeyStrategyFactory = routingKeyStrategyFactory;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public override string BindQueue(IModel channel)
|
|
||||||
{
|
|
||||||
var queue = channel.QueueDeclare();
|
|
||||||
|
|
||||||
foreach (var messageType in GetMessageTypes())
|
|
||||||
{
|
|
||||||
var routingKey = routingKeyStrategyFactory().GetRoutingKey(messageType);
|
|
||||||
|
|
||||||
foreach (var exchange in GetMessageExchanges(messageType))
|
|
||||||
channel.QueueBind(queue.QueueName, exchange, routingKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
return queue.QueueName;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,21 +0,0 @@
|
|||||||
using System;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
|
|
||||||
namespace Tapeti.Registration
|
|
||||||
{
|
|
||||||
public class ControllerQueueRegistration : AbstractControllerRegistration
|
|
||||||
{
|
|
||||||
private readonly string queueName;
|
|
||||||
|
|
||||||
public ControllerQueueRegistration(Func<IControllerFactory> controllerFactoryFactory, Type controllerType, string defaultExchange, string queueName) : base(controllerFactoryFactory, controllerType, defaultExchange)
|
|
||||||
{
|
|
||||||
this.queueName = queueName;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public override string BindQueue(IModel channel)
|
|
||||||
{
|
|
||||||
return channel.QueueDeclarePassive(queueName).QueueName;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
13
Tapeti.Saga/ISaga.cs
Normal file
13
Tapeti.Saga/ISaga.cs
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Tapeti.Saga
|
||||||
|
{
|
||||||
|
public interface ISaga<out T> : IDisposable where T : class
|
||||||
|
{
|
||||||
|
string Id { get; }
|
||||||
|
T State { get; }
|
||||||
|
|
||||||
|
void ExpectResponse(string callId);
|
||||||
|
void ResolveResponse(string callId);
|
||||||
|
}
|
||||||
|
}
|
11
Tapeti.Saga/ISagaProvider.cs
Normal file
11
Tapeti.Saga/ISagaProvider.cs
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Tapeti.Saga
|
||||||
|
{
|
||||||
|
public interface ISagaProvider
|
||||||
|
{
|
||||||
|
Task<ISaga<T>> Begin<T>() where T : class;
|
||||||
|
Task<ISaga<T>> Continue<T>(string sagaId) where T : class;
|
||||||
|
Task<ISaga<T>> Current<T>() where T : class;
|
||||||
|
}
|
||||||
|
}
|
10
Tapeti.Saga/ISagaStore.cs
Normal file
10
Tapeti.Saga/ISagaStore.cs
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Tapeti.Saga
|
||||||
|
{
|
||||||
|
public interface ISagaStore
|
||||||
|
{
|
||||||
|
Task<object> Read(string sagaId);
|
||||||
|
Task Update(string sagaId, object state);
|
||||||
|
}
|
||||||
|
}
|
36
Tapeti.Saga/Properties/AssemblyInfo.cs
Normal file
36
Tapeti.Saga/Properties/AssemblyInfo.cs
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
using System.Reflection;
|
||||||
|
using System.Runtime.CompilerServices;
|
||||||
|
using System.Runtime.InteropServices;
|
||||||
|
|
||||||
|
// General Information about an assembly is controlled through the following
|
||||||
|
// set of attributes. Change these attribute values to modify the information
|
||||||
|
// associated with an assembly.
|
||||||
|
[assembly: AssemblyTitle("Tapeti.Saga")]
|
||||||
|
[assembly: AssemblyDescription("")]
|
||||||
|
[assembly: AssemblyConfiguration("")]
|
||||||
|
[assembly: AssemblyCompany("Hewlett-Packard Company")]
|
||||||
|
[assembly: AssemblyProduct("Tapeti.Saga")]
|
||||||
|
[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")]
|
||||||
|
[assembly: AssemblyTrademark("")]
|
||||||
|
[assembly: AssemblyCulture("")]
|
||||||
|
|
||||||
|
// Setting ComVisible to false makes the types in this assembly not visible
|
||||||
|
// to COM components. If you need to access a type in this assembly from
|
||||||
|
// COM, set the ComVisible attribute to true on that type.
|
||||||
|
[assembly: ComVisible(false)]
|
||||||
|
|
||||||
|
// The following GUID is for the ID of the typelib if this project is exposed to COM
|
||||||
|
[assembly: Guid("f84ad920-d5a1-455d-aed5-2542b3a47b85")]
|
||||||
|
|
||||||
|
// Version information for an assembly consists of the following four values:
|
||||||
|
//
|
||||||
|
// Major Version
|
||||||
|
// Minor Version
|
||||||
|
// Build Number
|
||||||
|
// Revision
|
||||||
|
//
|
||||||
|
// You can specify all the values or you can default the Build and Revision Numbers
|
||||||
|
// by using the '*' as shown below:
|
||||||
|
// [assembly: AssemblyVersion("1.0.*")]
|
||||||
|
[assembly: AssemblyVersion("1.0.0.0")]
|
||||||
|
[assembly: AssemblyFileVersion("1.0.0.0")]
|
28
Tapeti.Saga/SagaBindingMiddleware.cs
Normal file
28
Tapeti.Saga/SagaBindingMiddleware.cs
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
using System;
|
||||||
|
using System.Linq;
|
||||||
|
using Tapeti.Config;
|
||||||
|
|
||||||
|
namespace Tapeti.Saga
|
||||||
|
{
|
||||||
|
public class SagaBindingMiddleware : IBindingMiddleware
|
||||||
|
{
|
||||||
|
public void Handle(IBindingContext context, Action next)
|
||||||
|
{
|
||||||
|
foreach (var parameter in context.Parameters.Where(p =>
|
||||||
|
p.Info.ParameterType.IsGenericType &&
|
||||||
|
p.Info.ParameterType.GetGenericTypeDefinition() == typeof(ISaga<>)))
|
||||||
|
{
|
||||||
|
parameter.SetBinding(messageContext =>
|
||||||
|
{
|
||||||
|
object saga;
|
||||||
|
if (!messageContext.Items.TryGetValue("Saga", out saga))
|
||||||
|
return null;
|
||||||
|
|
||||||
|
return saga.GetType() == typeof(ISaga<>) ? saga : null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
43
Tapeti.Saga/SagaMemoryStore.cs
Normal file
43
Tapeti.Saga/SagaMemoryStore.cs
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Tapeti.Saga
|
||||||
|
{
|
||||||
|
public class SagaMemoryStore : ISagaStore
|
||||||
|
{
|
||||||
|
private ISagaStore decoratedStore;
|
||||||
|
private readonly Dictionary<string, object> values = new Dictionary<string, object>();
|
||||||
|
|
||||||
|
|
||||||
|
// Not a constructor to allow standard injection to work when using only the MemoryStore
|
||||||
|
public static SagaMemoryStore AsCacheFor(ISagaStore store)
|
||||||
|
{
|
||||||
|
return new SagaMemoryStore
|
||||||
|
{
|
||||||
|
decoratedStore = store
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public async Task<object> Read(string sagaId)
|
||||||
|
{
|
||||||
|
object value;
|
||||||
|
|
||||||
|
// ReSharper disable once InvertIf
|
||||||
|
if (!values.TryGetValue(sagaId, out value) && decoratedStore != null)
|
||||||
|
{
|
||||||
|
value = await decoratedStore.Read(sagaId);
|
||||||
|
values.Add(sagaId, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task Update(string sagaId, object state)
|
||||||
|
{
|
||||||
|
values[sagaId] = state;
|
||||||
|
if (decoratedStore != null)
|
||||||
|
await decoratedStore.Update(sagaId, state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
22
Tapeti.Saga/SagaMessageMiddleware.cs
Normal file
22
Tapeti.Saga/SagaMessageMiddleware.cs
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
using System;
|
||||||
|
using Tapeti.Config;
|
||||||
|
|
||||||
|
namespace Tapeti.Saga
|
||||||
|
{
|
||||||
|
public class SagaMessageMiddleware : IMessageMiddleware
|
||||||
|
{
|
||||||
|
private readonly IDependencyResolver dependencyResolver;
|
||||||
|
|
||||||
|
|
||||||
|
public SagaMessageMiddleware(IDependencyResolver dependencyResolver)
|
||||||
|
{
|
||||||
|
this.dependencyResolver = dependencyResolver;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Handle(IMessageContext context, Action next)
|
||||||
|
{
|
||||||
|
context.Items["Saga"] = dependencyResolver.Resolve<ISagaProvider>().Continue("");
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
16
Tapeti.Saga/SagaMiddleware.cs
Normal file
16
Tapeti.Saga/SagaMiddleware.cs
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
using System.Collections.Generic;
|
||||||
|
using Tapeti.Config;
|
||||||
|
|
||||||
|
namespace Tapeti.Saga
|
||||||
|
{
|
||||||
|
public class SagaMiddleware : IMiddlewareBundle
|
||||||
|
{
|
||||||
|
public IEnumerable<object> GetContents(IDependencyResolver dependencyResolver)
|
||||||
|
{
|
||||||
|
(dependencyResolver as IDependencyInjector)?.RegisterDefault<ISagaProvider, SagaProvider>();
|
||||||
|
|
||||||
|
yield return new SagaBindingMiddleware();
|
||||||
|
yield return new SagaMessageMiddleware(dependencyResolver);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
90
Tapeti.Saga/SagaProvider.cs
Normal file
90
Tapeti.Saga/SagaProvider.cs
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Tapeti.Saga
|
||||||
|
{
|
||||||
|
public class SagaProvider : ISagaProvider
|
||||||
|
{
|
||||||
|
protected static readonly ConcurrentDictionary<string, SemaphoreSlim> SagaLocks = new ConcurrentDictionary<string, SemaphoreSlim>();
|
||||||
|
private readonly ISagaStore store;
|
||||||
|
|
||||||
|
public SagaProvider(ISagaStore store)
|
||||||
|
{
|
||||||
|
this.store = store;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public async Task<ISaga<T>> Begin<T>(T initialState) where T : class
|
||||||
|
{
|
||||||
|
var saga = await Saga<T>.Create(() => Task.FromResult(initialState));
|
||||||
|
await store.Update(saga.Id, saga.State);
|
||||||
|
|
||||||
|
return saga;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<ISaga<T>> Continue<T>(string sagaId) where T : class
|
||||||
|
{
|
||||||
|
return await Saga<T>.Create(async () => await store.Read(sagaId) as T, sagaId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<object> Continue(string sagaId)
|
||||||
|
{
|
||||||
|
return new Saga<object>
|
||||||
|
{
|
||||||
|
Id = sagaId,
|
||||||
|
State = await store.Read(sagaId)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected class Saga<T> : ISaga<T> where T : class
|
||||||
|
{
|
||||||
|
private bool disposed;
|
||||||
|
|
||||||
|
public string Id { get; set; }
|
||||||
|
public T State { get; set; }
|
||||||
|
|
||||||
|
|
||||||
|
public static async Task<Saga<T>> Create(Func<Task<T>> getState, string id = null)
|
||||||
|
{
|
||||||
|
var sagaId = id ?? Guid.NewGuid().ToString();
|
||||||
|
await SagaLocks.GetOrAdd(sagaId, new SemaphoreSlim(1)).WaitAsync();
|
||||||
|
|
||||||
|
var saga = new Saga<T>
|
||||||
|
{
|
||||||
|
Id = sagaId,
|
||||||
|
State = await getState()
|
||||||
|
};
|
||||||
|
|
||||||
|
return saga;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (disposed)
|
||||||
|
return;
|
||||||
|
|
||||||
|
SemaphoreSlim semaphore;
|
||||||
|
if (SagaLocks.TryGetValue(Id, out semaphore))
|
||||||
|
semaphore.Release();
|
||||||
|
|
||||||
|
disposed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void ExpectResponse(string callId)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void ResolveResponse(string callId)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
56
Tapeti.Saga/Tapeti.Saga.csproj
Normal file
56
Tapeti.Saga/Tapeti.Saga.csproj
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
<?xml version="1.0" encoding="utf-8"?>
|
||||||
|
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
||||||
|
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
|
||||||
|
<PropertyGroup>
|
||||||
|
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
|
||||||
|
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
|
||||||
|
<ProjectGuid>{F84AD920-D5A1-455D-AED5-2542B3A47B85}</ProjectGuid>
|
||||||
|
<OutputType>Library</OutputType>
|
||||||
|
<AppDesignerFolder>Properties</AppDesignerFolder>
|
||||||
|
<RootNamespace>Tapeti.Saga</RootNamespace>
|
||||||
|
<AssemblyName>Tapeti.Saga</AssemblyName>
|
||||||
|
<TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
|
||||||
|
<FileAlignment>512</FileAlignment>
|
||||||
|
</PropertyGroup>
|
||||||
|
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
|
||||||
|
<DebugSymbols>true</DebugSymbols>
|
||||||
|
<DebugType>full</DebugType>
|
||||||
|
<Optimize>false</Optimize>
|
||||||
|
<OutputPath>bin\Debug\</OutputPath>
|
||||||
|
<DefineConstants>DEBUG;TRACE</DefineConstants>
|
||||||
|
<ErrorReport>prompt</ErrorReport>
|
||||||
|
<WarningLevel>4</WarningLevel>
|
||||||
|
</PropertyGroup>
|
||||||
|
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
|
||||||
|
<DebugType>pdbonly</DebugType>
|
||||||
|
<Optimize>true</Optimize>
|
||||||
|
<OutputPath>bin\Release\</OutputPath>
|
||||||
|
<DefineConstants>TRACE</DefineConstants>
|
||||||
|
<ErrorReport>prompt</ErrorReport>
|
||||||
|
<WarningLevel>4</WarningLevel>
|
||||||
|
</PropertyGroup>
|
||||||
|
<ItemGroup>
|
||||||
|
<Reference Include="System" />
|
||||||
|
<Reference Include="System.Core" />
|
||||||
|
<Reference Include="System.Xml.Linq" />
|
||||||
|
<Reference Include="System.Data.DataSetExtensions" />
|
||||||
|
<Reference Include="Microsoft.CSharp" />
|
||||||
|
<Reference Include="System.Data" />
|
||||||
|
<Reference Include="System.Net.Http" />
|
||||||
|
<Reference Include="System.Xml" />
|
||||||
|
</ItemGroup>
|
||||||
|
<ItemGroup>
|
||||||
|
<Compile Include="ISaga.cs" />
|
||||||
|
<Compile Include="ISagaProvider.cs" />
|
||||||
|
<Compile Include="ISagaStore.cs" />
|
||||||
|
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||||
|
</ItemGroup>
|
||||||
|
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
|
||||||
|
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
|
||||||
|
Other similar extension points exist, see Microsoft.Common.targets.
|
||||||
|
<Target Name="BeforeBuild">
|
||||||
|
</Target>
|
||||||
|
<Target Name="AfterBuild">
|
||||||
|
</Target>
|
||||||
|
-->
|
||||||
|
</Project>
|
@ -1,5 +1,4 @@
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using System.Runtime.CompilerServices;
|
|
||||||
using System.Runtime.InteropServices;
|
using System.Runtime.InteropServices;
|
||||||
|
|
||||||
// General Information about an assembly is controlled through the following
|
// General Information about an assembly is controlled through the following
|
||||||
|
@ -1,22 +0,0 @@
|
|||||||
using System;
|
|
||||||
using SimpleInjector;
|
|
||||||
|
|
||||||
namespace Tapeti.SimpleInjector
|
|
||||||
{
|
|
||||||
public class SimpleInjectorControllerFactory : IControllerFactory
|
|
||||||
{
|
|
||||||
private readonly Container container;
|
|
||||||
|
|
||||||
|
|
||||||
public SimpleInjectorControllerFactory(Container container)
|
|
||||||
{
|
|
||||||
this.container = container;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public object CreateController(Type controllerType)
|
|
||||||
{
|
|
||||||
return container.GetInstance(controllerType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,22 +1,16 @@
|
|||||||
using System.Linq;
|
using System;
|
||||||
using System.Reflection;
|
using System.Linq;
|
||||||
using SimpleInjector;
|
using SimpleInjector;
|
||||||
using Tapeti.Annotations;
|
|
||||||
using Tapeti.Default;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
|
|
||||||
namespace Tapeti.SimpleInjector
|
namespace Tapeti.SimpleInjector
|
||||||
{
|
{
|
||||||
public class SimpleInjectorDependencyResolver : IDependencyResolver
|
public class SimpleInjectorDependencyResolver : IDependencyInjector
|
||||||
{
|
{
|
||||||
private readonly Container container;
|
private readonly Container container;
|
||||||
|
|
||||||
public SimpleInjectorDependencyResolver(Container container, bool registerDefaults = true)
|
public SimpleInjectorDependencyResolver(Container container)
|
||||||
{
|
{
|
||||||
this.container = container;
|
this.container = container;
|
||||||
|
|
||||||
if (registerDefaults)
|
|
||||||
RegisterDefaults();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public T Resolve<T>() where T : class
|
public T Resolve<T>() where T : class
|
||||||
@ -24,33 +18,31 @@ namespace Tapeti.SimpleInjector
|
|||||||
return container.GetInstance<T>();
|
return container.GetInstance<T>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public object Resolve(Type type)
|
||||||
public SimpleInjectorDependencyResolver RegisterDefaults()
|
|
||||||
{
|
{
|
||||||
var currentRegistrations = container.GetCurrentRegistrations();
|
return container.GetInstance(type);
|
||||||
|
|
||||||
IfUnregistered<IControllerFactory, SimpleInjectorControllerFactory>(currentRegistrations);
|
|
||||||
IfUnregistered<IMessageSerializer, DefaultMessageSerializer>(currentRegistrations);
|
|
||||||
IfUnregistered<IRoutingKeyStrategy, DefaultRoutingKeyStrategy>(currentRegistrations);
|
|
||||||
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public SimpleInjectorDependencyResolver RegisterAllControllers(Assembly assembly)
|
public void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService
|
||||||
{
|
|
||||||
foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute))))
|
|
||||||
container.Register(type);
|
|
||||||
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private void IfUnregistered<TService, TImplementation>(IEnumerable<InstanceProducer> currentRegistrations) where TService : class where TImplementation: class, TService
|
|
||||||
{
|
{
|
||||||
// ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates
|
// ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates
|
||||||
if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService)))
|
if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(TService)))
|
||||||
container.Register<TService, TImplementation>();
|
container.Register<TService, TImplementation>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void RegisterPublisher(Func<IPublisher> publisher)
|
||||||
|
{
|
||||||
|
// ReSharper disable once SimplifyLinqExpression - still not a fan of negative predicates
|
||||||
|
if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(IPublisher)))
|
||||||
|
container.Register(publisher);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void RegisterController(Type type)
|
||||||
|
{
|
||||||
|
container.Register(type);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,6 @@
|
|||||||
<Reference Include="System.Xml" />
|
<Reference Include="System.Xml" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<Compile Include="SimpleInjectorControllerFactory.cs" />
|
|
||||||
<Compile Include="SimpleInjectorDependencyResolver.cs" />
|
<Compile Include="SimpleInjectorDependencyResolver.cs" />
|
||||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
@ -50,38 +50,47 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<Compile Include="Annotations\ExchangeAttribute.cs" />
|
<Compile Include="Annotations\ExchangeAttribute.cs" />
|
||||||
<Compile Include="Annotations\QueueAttribute.cs" />
|
<Compile Include="Annotations\MessageControllerAttribute.cs" />
|
||||||
|
<Compile Include="Annotations\StaticQueueAttribute.cs" />
|
||||||
|
<Compile Include="Annotations\DynamicQueueAttribute.cs" />
|
||||||
<Compile Include="Connection\TapetiConsumer.cs" />
|
<Compile Include="Connection\TapetiConsumer.cs" />
|
||||||
<Compile Include="Connection\TapetiPublisher.cs" />
|
<Compile Include="Connection\TapetiPublisher.cs" />
|
||||||
<Compile Include="Connection\TapetiSubscriber.cs" />
|
<Compile Include="Connection\TapetiSubscriber.cs" />
|
||||||
<Compile Include="Connection\TapetiWorker.cs" />
|
<Compile Include="Connection\TapetiWorker.cs" />
|
||||||
<Compile Include="Default\ConsoleLogger.cs" />
|
<Compile Include="Default\ConsoleLogger.cs" />
|
||||||
<Compile Include="Default\DevNullLogger.cs" />
|
<Compile Include="Default\DevNullLogger.cs" />
|
||||||
|
<Compile Include="Helpers\ConsoleHelper.cs" />
|
||||||
|
<Compile Include="Helpers\MiddlewareHelper.cs" />
|
||||||
|
<Compile Include="IConnection.cs" />
|
||||||
<Compile Include="ILogger.cs" />
|
<Compile Include="ILogger.cs" />
|
||||||
<Compile Include="TapetiConnectionExtensions.cs" />
|
<Compile Include="Config\IMessageContext.cs" />
|
||||||
|
<Compile Include="Default\BindingBufferStop.cs" />
|
||||||
|
<Compile Include="Config\IMessageMiddleware.cs" />
|
||||||
|
<Compile Include="Config\IMiddlewareBundle.cs" />
|
||||||
|
<Compile Include="Config\IConfig.cs" />
|
||||||
|
<Compile Include="MessageController.cs" />
|
||||||
|
<Compile Include="Config\IBindingMiddleware.cs" />
|
||||||
<Compile Include="TapetiConnectionParams.cs" />
|
<Compile Include="TapetiConnectionParams.cs" />
|
||||||
|
<Compile Include="TapetiConfig.cs" />
|
||||||
<Compile Include="TapetiTypes.cs" />
|
<Compile Include="TapetiTypes.cs" />
|
||||||
<Compile Include="Tasks\SingleThreadTaskQueue.cs" />
|
<Compile Include="Tasks\SingleThreadTaskQueue.cs" />
|
||||||
<Compile Include="Default\DefaultControllerFactory.cs" />
|
|
||||||
<Compile Include="Default\DefaultDependencyResolver.cs" />
|
|
||||||
<Compile Include="Default\DefaultMessageSerializer.cs" />
|
<Compile Include="Default\DefaultMessageSerializer.cs" />
|
||||||
<Compile Include="Default\DefaultRoutingKeyStrategy.cs" />
|
<Compile Include="Default\DefaultRoutingKeyStrategy.cs" />
|
||||||
<Compile Include="IControllerFactory.cs" />
|
|
||||||
<Compile Include="IDependencyResolver.cs" />
|
<Compile Include="IDependencyResolver.cs" />
|
||||||
<Compile Include="IMessageSerializer.cs" />
|
<Compile Include="IMessageSerializer.cs" />
|
||||||
<Compile Include="IPublisher.cs" />
|
<Compile Include="IPublisher.cs" />
|
||||||
<Compile Include="IRoutingKeyStrategy.cs" />
|
<Compile Include="IRoutingKeyStrategy.cs" />
|
||||||
<Compile Include="IQueueRegistration.cs" />
|
|
||||||
<Compile Include="ISubscriber.cs" />
|
<Compile Include="ISubscriber.cs" />
|
||||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||||
<Compile Include="Registration\AbstractControllerRegistration.cs" />
|
|
||||||
<Compile Include="Registration\ControllerDynamicQueueRegistration.cs" />
|
|
||||||
<Compile Include="Registration\ControllerQueueRegistration.cs" />
|
|
||||||
<Compile Include="TapetiConnection.cs" />
|
<Compile Include="TapetiConnection.cs" />
|
||||||
|
<Compile Include="Config\IBindingContext.cs" />
|
||||||
|
<Compile Include="Default\DependencyResolverBinding.cs" />
|
||||||
|
<Compile Include="Default\MessageBinding.cs" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<None Include="packages.config" />
|
<None Include="packages.config" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
<ItemGroup />
|
||||||
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
|
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
|
||||||
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
|
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
|
||||||
Other similar extension points exist, see Microsoft.Common.targets.
|
Other similar extension points exist, see Microsoft.Common.targets.
|
||||||
|
@ -9,6 +9,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.SimpleInjector", "Ta
|
|||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Test", "Test\Test.csproj", "{90559950-1B32-4119-A78E-517E2C71EE23}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Test", "Test\Test.csproj", "{90559950-1B32-4119-A78E-517E2C71EE23}"
|
||||||
EndProject
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Saga", "Tapeti.Saga\Tapeti.Saga.csproj", "{F84AD920-D5A1-455D-AED5-2542B3A47B85}"
|
||||||
|
EndProject
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
Debug|Any CPU = Debug|Any CPU
|
Debug|Any CPU = Debug|Any CPU
|
||||||
@ -27,6 +29,10 @@ Global
|
|||||||
{90559950-1B32-4119-A78E-517E2C71EE23}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
{90559950-1B32-4119-A78E-517E2C71EE23}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
{90559950-1B32-4119-A78E-517E2C71EE23}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
{90559950-1B32-4119-A78E-517E2C71EE23}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
{90559950-1B32-4119-A78E-517E2C71EE23}.Release|Any CPU.Build.0 = Release|Any CPU
|
{90559950-1B32-4119-A78E-517E2C71EE23}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{F84AD920-D5A1-455D-AED5-2542B3A47B85}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{F84AD920-D5A1-455D-AED5-2542B3A47B85}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{F84AD920-D5A1-455D-AED5-2542B3A47B85}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{F84AD920-D5A1-455D-AED5-2542B3A47B85}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(SolutionProperties) = preSolution
|
GlobalSection(SolutionProperties) = preSolution
|
||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
|
400
TapetiConfig.cs
Normal file
400
TapetiConfig.cs
Normal file
@ -0,0 +1,400 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Reflection;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Tapeti.Annotations;
|
||||||
|
using Tapeti.Config;
|
||||||
|
using Tapeti.Default;
|
||||||
|
using Tapeti.Helpers;
|
||||||
|
|
||||||
|
|
||||||
|
namespace Tapeti
|
||||||
|
{
|
||||||
|
public class TopologyConfigurationException : Exception
|
||||||
|
{
|
||||||
|
public TopologyConfigurationException(string message) : base(message) { }
|
||||||
|
}
|
||||||
|
|
||||||
|
public delegate Task<object> MessageHandlerFunc(IMessageContext context, object message);
|
||||||
|
|
||||||
|
|
||||||
|
public class TapetiConfig
|
||||||
|
{
|
||||||
|
private readonly Dictionary<string, List<Binding>> staticRegistrations = new Dictionary<string, List<Binding>>();
|
||||||
|
private readonly Dictionary<Type, List<Binding>> dynamicRegistrations = new Dictionary<Type, List<Binding>>();
|
||||||
|
|
||||||
|
private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>();
|
||||||
|
private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>();
|
||||||
|
|
||||||
|
private readonly string exchange;
|
||||||
|
private readonly IDependencyResolver dependencyResolver;
|
||||||
|
|
||||||
|
|
||||||
|
public TapetiConfig(string exchange, IDependencyResolver dependencyResolver)
|
||||||
|
{
|
||||||
|
this.exchange = exchange;
|
||||||
|
this.dependencyResolver = dependencyResolver;
|
||||||
|
|
||||||
|
Use(new BindingBufferStop());
|
||||||
|
Use(new DependencyResolverBinding(dependencyResolver));
|
||||||
|
Use(new MessageBinding());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public IConfig Build()
|
||||||
|
{
|
||||||
|
var dependencyInjector = dependencyResolver as IDependencyInjector;
|
||||||
|
if (dependencyInjector != null)
|
||||||
|
{
|
||||||
|
if (ConsoleHelper.IsAvailable())
|
||||||
|
dependencyInjector.RegisterDefault<ILogger, ConsoleLogger>();
|
||||||
|
else
|
||||||
|
dependencyInjector.RegisterDefault<ILogger, DevNullLogger>();
|
||||||
|
|
||||||
|
dependencyInjector.RegisterDefault<IMessageSerializer, DefaultMessageSerializer>();
|
||||||
|
dependencyInjector.RegisterDefault<IRoutingKeyStrategy, DefaultRoutingKeyStrategy>();
|
||||||
|
}
|
||||||
|
|
||||||
|
var queues = new List<IQueue>();
|
||||||
|
queues.AddRange(staticRegistrations.Select(qb => new Queue(new QueueInfo { Dynamic = false, Name = qb.Key }, qb.Value)));
|
||||||
|
|
||||||
|
// Group all bindings with the same index into queues, this will
|
||||||
|
// ensure each message type is unique on their queue
|
||||||
|
var dynamicBindings = new List<List<Binding>>();
|
||||||
|
foreach (var bindings in dynamicRegistrations.Values)
|
||||||
|
{
|
||||||
|
while (dynamicBindings.Count < bindings.Count)
|
||||||
|
dynamicBindings.Add(new List<Binding>());
|
||||||
|
|
||||||
|
for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++)
|
||||||
|
dynamicBindings[bindingIndex].Add(bindings[bindingIndex]);
|
||||||
|
}
|
||||||
|
|
||||||
|
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl)));
|
||||||
|
|
||||||
|
return new Config(exchange, dependencyResolver, messageMiddleware, queues);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public TapetiConfig Use(IBindingMiddleware handler)
|
||||||
|
{
|
||||||
|
bindingMiddleware.Add(handler);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public TapetiConfig Use(IMessageMiddleware handler)
|
||||||
|
{
|
||||||
|
messageMiddleware.Add(handler);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public TapetiConfig Use(IMiddlewareBundle bundle)
|
||||||
|
{
|
||||||
|
foreach (var middleware in bundle.GetContents(dependencyResolver))
|
||||||
|
{
|
||||||
|
// ReSharper disable once CanBeReplacedWithTryCastAndCheckForNull
|
||||||
|
if (middleware is IBindingMiddleware)
|
||||||
|
Use((IBindingMiddleware) middleware);
|
||||||
|
else if (middleware is IMessageMiddleware)
|
||||||
|
Use((IMessageMiddleware)middleware);
|
||||||
|
else
|
||||||
|
throw new ArgumentException($"Unsupported middleware implementation: {middleware.GetType().Name}");
|
||||||
|
}
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public TapetiConfig RegisterController(Type controller)
|
||||||
|
{
|
||||||
|
var controllerQueueInfo = GetQueueInfo(controller);
|
||||||
|
|
||||||
|
foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance)
|
||||||
|
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object))
|
||||||
|
.Select(m => (MethodInfo)m))
|
||||||
|
{
|
||||||
|
var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo;
|
||||||
|
if (!methodQueueInfo.IsValid)
|
||||||
|
throw new TopologyConfigurationException($"Method {method.Name} or controller {controller.Name} requires a queue attribute");
|
||||||
|
|
||||||
|
var context = new BindingContext(method.GetParameters().Select(p => new BindingParameter(p)).ToList());
|
||||||
|
var messageHandler = GetMessageHandler(context, method);
|
||||||
|
|
||||||
|
var handlerInfo = new Binding
|
||||||
|
{
|
||||||
|
Controller = controller,
|
||||||
|
Method = method,
|
||||||
|
QueueInfo = methodQueueInfo,
|
||||||
|
MessageClass = context.MessageClass,
|
||||||
|
MessageHandler = messageHandler
|
||||||
|
};
|
||||||
|
|
||||||
|
if (methodQueueInfo.Dynamic.GetValueOrDefault())
|
||||||
|
AddDynamicRegistration(context, handlerInfo);
|
||||||
|
else
|
||||||
|
AddStaticRegistration(context, handlerInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public TapetiConfig RegisterAllControllers(Assembly assembly)
|
||||||
|
{
|
||||||
|
foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(MessageControllerAttribute))))
|
||||||
|
RegisterController(type);
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public TapetiConfig RegisterAllControllers()
|
||||||
|
{
|
||||||
|
return RegisterAllControllers(Assembly.GetCallingAssembly());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method)
|
||||||
|
{
|
||||||
|
MiddlewareHelper.Go(bindingMiddleware, (handler, next) => handler.Handle(context, next));
|
||||||
|
|
||||||
|
if (context.MessageClass == null)
|
||||||
|
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} does not resolve to a message class");
|
||||||
|
|
||||||
|
|
||||||
|
var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList();
|
||||||
|
|
||||||
|
// ReSharper disable once InvertIf - doesn't make the flow clearer imo
|
||||||
|
if (invalidBindings.Count > 0)
|
||||||
|
{
|
||||||
|
var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name));
|
||||||
|
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}");
|
||||||
|
}
|
||||||
|
|
||||||
|
return WrapMethod(method, context.Parameters.Select(p => ((IBindingParameterAccess)p).GetBinding()));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
|
||||||
|
{
|
||||||
|
if (method.ReturnType == typeof(void))
|
||||||
|
return WrapNullMethod(method, parameters);
|
||||||
|
|
||||||
|
if (method.ReturnType == typeof(Task))
|
||||||
|
return WrapTaskMethod(method, parameters);
|
||||||
|
|
||||||
|
if (method.ReturnType == typeof(Task<>))
|
||||||
|
{
|
||||||
|
var genericArguments = method.GetGenericArguments();
|
||||||
|
if (genericArguments.Length != 1)
|
||||||
|
throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} must have exactly one generic argument to Task<>");
|
||||||
|
|
||||||
|
if (!genericArguments[0].IsClass)
|
||||||
|
throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} must have an object generic argument to Task<>");
|
||||||
|
|
||||||
|
return WrapGenericTaskMethod(method, parameters);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (method.ReturnType.IsClass)
|
||||||
|
return WrapObjectMethod(method, parameters);
|
||||||
|
|
||||||
|
throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} has an invalid return type");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected MessageHandlerFunc WrapNullMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
|
||||||
|
{
|
||||||
|
return (context, message) =>
|
||||||
|
{
|
||||||
|
method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
|
||||||
|
return Task.FromResult<object>(null);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected MessageHandlerFunc WrapTaskMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
|
||||||
|
{
|
||||||
|
return async (context, message) =>
|
||||||
|
{
|
||||||
|
await (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
|
||||||
|
return Task.FromResult<object>(null);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected MessageHandlerFunc WrapGenericTaskMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
|
||||||
|
{
|
||||||
|
return (context, message) =>
|
||||||
|
{
|
||||||
|
return (Task<object>)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected MessageHandlerFunc WrapObjectMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
|
||||||
|
{
|
||||||
|
return (context, message) =>
|
||||||
|
{
|
||||||
|
return Task.FromResult(method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void AddStaticRegistration(IBindingContext context, Binding binding)
|
||||||
|
{
|
||||||
|
if (staticRegistrations.ContainsKey(binding.QueueInfo.Name))
|
||||||
|
{
|
||||||
|
var existing = staticRegistrations[binding.QueueInfo.Name];
|
||||||
|
|
||||||
|
// Technically we could easily do multicasting, but it complicates exception handling and requeueing
|
||||||
|
if (existing.Any(h => h.MessageClass == binding.MessageClass))
|
||||||
|
throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}");
|
||||||
|
|
||||||
|
existing.Add(binding);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
staticRegistrations.Add(binding.QueueInfo.Name, new List<Binding> { binding });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void AddDynamicRegistration(IBindingContext context, Binding binding)
|
||||||
|
{
|
||||||
|
if (dynamicRegistrations.ContainsKey(context.MessageClass))
|
||||||
|
dynamicRegistrations[context.MessageClass].Add(binding);
|
||||||
|
else
|
||||||
|
dynamicRegistrations.Add(context.MessageClass, new List<Binding> { binding });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected QueueInfo GetQueueInfo(MemberInfo member)
|
||||||
|
{
|
||||||
|
var dynamicQueueAttribute = member.GetCustomAttribute<DynamicQueueAttribute>();
|
||||||
|
var staticQueueAttribute = member.GetCustomAttribute<StaticQueueAttribute>();
|
||||||
|
|
||||||
|
if (dynamicQueueAttribute != null && staticQueueAttribute != null)
|
||||||
|
throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on {member.Name}");
|
||||||
|
|
||||||
|
if (dynamicQueueAttribute != null)
|
||||||
|
return new QueueInfo { Dynamic = true };
|
||||||
|
|
||||||
|
if (staticQueueAttribute != null)
|
||||||
|
return new QueueInfo { Dynamic = false, Name = staticQueueAttribute.Name };
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected class QueueInfo
|
||||||
|
{
|
||||||
|
public bool? Dynamic { get; set; }
|
||||||
|
public string Name { get; set; }
|
||||||
|
|
||||||
|
public bool IsValid => Dynamic.HasValue || !string.IsNullOrEmpty(Name);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected class Config : IConfig
|
||||||
|
{
|
||||||
|
public string Exchange { get; }
|
||||||
|
public IDependencyResolver DependencyResolver { get; }
|
||||||
|
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
|
||||||
|
public IEnumerable<IQueue> Queues { get; }
|
||||||
|
|
||||||
|
|
||||||
|
public Config(string exchange, IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware, IEnumerable<IQueue> queues)
|
||||||
|
{
|
||||||
|
Exchange = exchange;
|
||||||
|
DependencyResolver = dependencyResolver;
|
||||||
|
MessageMiddleware = messageMiddleware;
|
||||||
|
Queues = queues;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected class Queue : IQueue
|
||||||
|
{
|
||||||
|
public bool Dynamic { get; }
|
||||||
|
public string Name { get; }
|
||||||
|
public IEnumerable<IBinding> Bindings { get; }
|
||||||
|
|
||||||
|
|
||||||
|
public Queue(QueueInfo queue, IEnumerable<IBinding> bindings)
|
||||||
|
{
|
||||||
|
Dynamic = queue.Dynamic.GetValueOrDefault();
|
||||||
|
Name = queue.Name;
|
||||||
|
Bindings = bindings;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected class Binding : IBinding
|
||||||
|
{
|
||||||
|
public Type Controller { get; set; }
|
||||||
|
public MethodInfo Method { get; set; }
|
||||||
|
public Type MessageClass { get; set; }
|
||||||
|
|
||||||
|
public QueueInfo QueueInfo { get; set; }
|
||||||
|
public MessageHandlerFunc MessageHandler { get; set; }
|
||||||
|
|
||||||
|
|
||||||
|
public bool Accept(object message)
|
||||||
|
{
|
||||||
|
return message.GetType() == MessageClass;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public Task<object> Invoke(IMessageContext context, object message)
|
||||||
|
{
|
||||||
|
return MessageHandler(context, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
internal interface IBindingParameterAccess
|
||||||
|
{
|
||||||
|
ValueFactory GetBinding();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
internal class BindingContext : IBindingContext
|
||||||
|
{
|
||||||
|
public Type MessageClass { get; set; }
|
||||||
|
public IReadOnlyList<IBindingParameter> Parameters { get; }
|
||||||
|
|
||||||
|
|
||||||
|
public BindingContext(IReadOnlyList<IBindingParameter> parameters)
|
||||||
|
{
|
||||||
|
Parameters = parameters;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
internal class BindingParameter : IBindingParameter, IBindingParameterAccess
|
||||||
|
{
|
||||||
|
private ValueFactory binding;
|
||||||
|
|
||||||
|
public ParameterInfo Info { get; }
|
||||||
|
public bool HasBinding => binding != null;
|
||||||
|
|
||||||
|
|
||||||
|
public BindingParameter(ParameterInfo parameter)
|
||||||
|
{
|
||||||
|
Info = parameter;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueFactory GetBinding()
|
||||||
|
{
|
||||||
|
return binding;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void SetBinding(ValueFactory valueFactory)
|
||||||
|
{
|
||||||
|
binding = valueFactory;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,91 +1,35 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Reflection;
|
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Tapeti.Annotations;
|
using Tapeti.Config;
|
||||||
using Tapeti.Connection;
|
using Tapeti.Connection;
|
||||||
using Tapeti.Default;
|
|
||||||
using Tapeti.Registration;
|
|
||||||
|
|
||||||
namespace Tapeti
|
namespace Tapeti
|
||||||
{
|
{
|
||||||
public class TapetiConnection : IDisposable
|
public class TapetiConnection : IDisposable
|
||||||
{
|
{
|
||||||
|
private readonly IConfig config;
|
||||||
public TapetiConnectionParams Params { get; set; }
|
public TapetiConnectionParams Params { get; set; }
|
||||||
|
|
||||||
public string PublishExchange { get; set; } = "";
|
|
||||||
public string SubscribeExchange { get; set; } = "";
|
|
||||||
|
|
||||||
|
|
||||||
public IDependencyResolver DependencyResolver
|
|
||||||
{
|
|
||||||
get { return dependencyResolver ?? (dependencyResolver = new DefaultDependencyResolver(GetPublisher)); }
|
|
||||||
set { dependencyResolver = value; }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private IDependencyResolver dependencyResolver;
|
|
||||||
private readonly Lazy<List<IQueueRegistration>> registrations = new Lazy<List<IQueueRegistration>>();
|
|
||||||
private readonly Lazy<TapetiWorker> worker;
|
private readonly Lazy<TapetiWorker> worker;
|
||||||
|
|
||||||
|
|
||||||
public TapetiConnection()
|
public TapetiConnection(IConfig config)
|
||||||
{
|
{
|
||||||
worker = new Lazy<TapetiWorker>(() => new TapetiWorker(
|
this.config = config;
|
||||||
DependencyResolver.Resolve<IMessageSerializer>(),
|
(config.DependencyResolver as IDependencyInjector)?.RegisterPublisher(GetPublisher);
|
||||||
DependencyResolver.Resolve<IRoutingKeyStrategy>())
|
|
||||||
|
worker = new Lazy<TapetiWorker>(() => new TapetiWorker(config.DependencyResolver, config.MessageMiddleware)
|
||||||
{
|
{
|
||||||
ConnectionParams = Params ?? new TapetiConnectionParams(),
|
ConnectionParams = Params ?? new TapetiConnectionParams(),
|
||||||
PublishExchange = PublishExchange
|
Exchange = config.Exchange
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public TapetiConnection WithDependencyResolver(IDependencyResolver resolver)
|
|
||||||
{
|
|
||||||
dependencyResolver = resolver;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public TapetiConnection RegisterController(Type type)
|
|
||||||
{
|
|
||||||
var queueAttribute = type.GetCustomAttribute<QueueAttribute>();
|
|
||||||
if (queueAttribute == null)
|
|
||||||
throw new ArgumentException("Queue attribute required on class", nameof(type));
|
|
||||||
|
|
||||||
if (queueAttribute.Dynamic)
|
|
||||||
{
|
|
||||||
if (!string.IsNullOrEmpty(queueAttribute.Name))
|
|
||||||
throw new ArgumentException("Dynamic queue attributes must not have a Name");
|
|
||||||
|
|
||||||
registrations.Value.Add(new ControllerDynamicQueueRegistration(
|
|
||||||
DependencyResolver.Resolve<IControllerFactory>,
|
|
||||||
DependencyResolver.Resolve<IRoutingKeyStrategy>,
|
|
||||||
type, SubscribeExchange));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (string.IsNullOrEmpty(queueAttribute.Name))
|
|
||||||
throw new ArgumentException("Non-dynamic queue attribute must have a Name");
|
|
||||||
|
|
||||||
registrations.Value.Add(new ControllerQueueRegistration(
|
|
||||||
DependencyResolver.Resolve<IControllerFactory>,
|
|
||||||
type, SubscribeExchange, queueAttribute.Name));
|
|
||||||
}
|
|
||||||
|
|
||||||
(DependencyResolver as IDependencyInjector)?.RegisterController(type);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public async Task<ISubscriber> Subscribe()
|
public async Task<ISubscriber> Subscribe()
|
||||||
{
|
{
|
||||||
if (!registrations.IsValueCreated || registrations.Value.Count == 0)
|
var subscriber = new TapetiSubscriber(() => worker.Value);
|
||||||
throw new ArgumentException("No controllers registered");
|
await subscriber.BindQueues(config.Queues);
|
||||||
|
|
||||||
var subscriber = new TapetiSubscriber(worker.Value);
|
|
||||||
await subscriber.BindQueues(registrations.Value);
|
|
||||||
|
|
||||||
return subscriber;
|
return subscriber;
|
||||||
}
|
}
|
||||||
@ -93,7 +37,7 @@ namespace Tapeti
|
|||||||
|
|
||||||
public IPublisher GetPublisher()
|
public IPublisher GetPublisher()
|
||||||
{
|
{
|
||||||
return new TapetiPublisher(worker.Value);
|
return new TapetiPublisher(() => worker.Value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,23 +0,0 @@
|
|||||||
using System.Linq;
|
|
||||||
using System.Reflection;
|
|
||||||
using Tapeti.Annotations;
|
|
||||||
|
|
||||||
namespace Tapeti
|
|
||||||
{
|
|
||||||
public static class TapetiConnectionExtensions
|
|
||||||
{
|
|
||||||
public static TapetiConnection RegisterAllControllers(this TapetiConnection connection, Assembly assembly)
|
|
||||||
{
|
|
||||||
foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute))))
|
|
||||||
connection.RegisterController(type);
|
|
||||||
|
|
||||||
return connection;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public static TapetiConnection RegisterAllControllers(this TapetiConnection connection)
|
|
||||||
{
|
|
||||||
return RegisterAllControllers(connection, Assembly.GetCallingAssembly());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
91
Test/MarcoController.cs
Normal file
91
Test/MarcoController.cs
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
using System;
|
||||||
|
using Microsoft.SqlServer.Server;
|
||||||
|
using Tapeti;
|
||||||
|
using Tapeti.Annotations;
|
||||||
|
|
||||||
|
namespace Test
|
||||||
|
{
|
||||||
|
[DynamicQueue]
|
||||||
|
public class MarcoController : MessageController
|
||||||
|
{
|
||||||
|
private readonly IPublisher publisher;
|
||||||
|
|
||||||
|
|
||||||
|
public MarcoController(IPublisher publisher/*, ISagaProvider sagaProvider*/)
|
||||||
|
{
|
||||||
|
this.publisher = publisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//[StaticQueue("test")]
|
||||||
|
public PoloMessage Marco(MarcoMessage message)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
using (sagaProvider.Begin<MarcoState>(new MarcoState
|
||||||
|
{
|
||||||
|
...
|
||||||
|
}))
|
||||||
|
{
|
||||||
|
//publisher.Publish(new PoloColorRequest(), saga, PoloColorResponse1);
|
||||||
|
//publisher.Publish(new PoloColorRequest(), saga, callID = "tweede");
|
||||||
|
|
||||||
|
// Saga refcount = 2
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
return new PoloMessage(); ;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
[CallID("eerste")]
|
||||||
|
Implicit:
|
||||||
|
|
||||||
|
using (sagaProvider.Continue(correlatieID))
|
||||||
|
{
|
||||||
|
saga refcount--;
|
||||||
|
public void PoloColorResponse1(PoloColorResponse message, ISaga<MarcoState> saga)
|
||||||
|
{
|
||||||
|
|
||||||
|
saga.State == MarcoState
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
state.Color = message.Color;
|
||||||
|
|
||||||
|
if (state.Complete)
|
||||||
|
{
|
||||||
|
publisher.Publish(new PoloMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
public void Polo(PoloMessage message)
|
||||||
|
{
|
||||||
|
Console.WriteLine("Polo!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public class MarcoMessage
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public class PoloMessage
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public class PoloColorRequest
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public class PoloColorResponse
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
37
Test/MarcoEmitter.cs
Normal file
37
Test/MarcoEmitter.cs
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Tapeti;
|
||||||
|
|
||||||
|
namespace Test
|
||||||
|
{
|
||||||
|
public class MarcoEmitter
|
||||||
|
{
|
||||||
|
private readonly IPublisher publisher;
|
||||||
|
|
||||||
|
|
||||||
|
public MarcoEmitter(IPublisher publisher)
|
||||||
|
{
|
||||||
|
this.publisher = publisher;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public async Task Run()
|
||||||
|
{
|
||||||
|
var concurrent = new SemaphoreSlim(20);
|
||||||
|
|
||||||
|
//for (var x = 0; x < 5000; x++)
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
await concurrent.WaitAsync();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await publisher.Publish(new MarcoMessage());
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
concurrent.Release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -10,28 +10,25 @@ namespace Test
|
|||||||
private static void Main()
|
private static void Main()
|
||||||
{
|
{
|
||||||
var container = new Container();
|
var container = new Container();
|
||||||
|
container.Register<MarcoEmitter>();
|
||||||
|
|
||||||
using (var connection = new TapetiConnection
|
|
||||||
{
|
|
||||||
PublishExchange = "test",
|
|
||||||
SubscribeExchange = "test"
|
|
||||||
}
|
|
||||||
.WithDependencyResolver(new SimpleInjectorDependencyResolver(container))
|
|
||||||
.RegisterAllControllers(typeof(Program).Assembly))
|
|
||||||
{
|
|
||||||
container.Register(() => connection.GetPublisher());
|
|
||||||
|
|
||||||
|
var topology = new TapetiTopologyBuilder()
|
||||||
|
.RegisterAllControllers()
|
||||||
|
.Build();
|
||||||
|
|
||||||
|
using (var connection = new TapetiConnectionBuilder()
|
||||||
|
.SetExchange("test")
|
||||||
|
.SetDependencyResolver(new SimpleInjectorDependencyResolver(container))
|
||||||
|
.SetTopology(topology)
|
||||||
|
.Build())
|
||||||
|
{
|
||||||
Console.WriteLine("Subscribing...");
|
Console.WriteLine("Subscribing...");
|
||||||
connection.Subscribe().Wait();
|
connection.Subscribe().Wait();
|
||||||
Console.WriteLine("Done!");
|
Console.WriteLine("Done!");
|
||||||
|
|
||||||
var publisher = connection.GetPublisher();
|
var emitter = container.GetInstance<MarcoEmitter>();
|
||||||
|
emitter.Run().Wait();
|
||||||
//for (var x = 0; x < 5000; x++)
|
|
||||||
while(true)
|
|
||||||
publisher.Publish(new MarcoMessage()).Wait();
|
|
||||||
|
|
||||||
//Console.ReadLine();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using System.Runtime.CompilerServices;
|
|
||||||
using System.Runtime.InteropServices;
|
using System.Runtime.InteropServices;
|
||||||
|
|
||||||
// General Information about an assembly is controlled through the following
|
// General Information about an assembly is controlled through the following
|
||||||
|
@ -48,9 +48,10 @@
|
|||||||
<Reference Include="System.Xml" />
|
<Reference Include="System.Xml" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
<Compile Include="MarcoEmitter.cs" />
|
||||||
<Compile Include="Program.cs" />
|
<Compile Include="Program.cs" />
|
||||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||||
<Compile Include="TestQueueController.cs" />
|
<Compile Include="MarcoController.cs" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<None Include="App.config" />
|
<None Include="App.config" />
|
||||||
|
@ -1,44 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Tapeti;
|
|
||||||
using Tapeti.Annotations;
|
|
||||||
|
|
||||||
namespace Test
|
|
||||||
{
|
|
||||||
//[Exchange("myexchange")]
|
|
||||||
//[Queue("staticqueue")]
|
|
||||||
[Queue]
|
|
||||||
public class TestQueueController
|
|
||||||
{
|
|
||||||
private readonly IPublisher publisher;
|
|
||||||
|
|
||||||
|
|
||||||
public TestQueueController(IPublisher publisher)
|
|
||||||
{
|
|
||||||
this.publisher = publisher;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public async Task Marco(MarcoMessage message)
|
|
||||||
{
|
|
||||||
Console.WriteLine("Marco!");
|
|
||||||
await publisher.Publish(new PoloMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void Polo(PoloMessage message)
|
|
||||||
{
|
|
||||||
Console.WriteLine("Polo!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public class MarcoMessage
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public class PoloMessage
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
17
Test/Visualizer.cs
Normal file
17
Test/Visualizer.cs
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Test
|
||||||
|
{
|
||||||
|
public class Visualizer
|
||||||
|
{
|
||||||
|
public void VisualizeMarco()
|
||||||
|
{
|
||||||
|
Console.WriteLine("Marco!");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void VisualizePolo()
|
||||||
|
{
|
||||||
|
Console.WriteLine("Polo!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user