diff --git a/Annotations/ExchangeAttribute.cs b/Annotations/ExchangeAttribute.cs new file mode 100644 index 0000000..13be56f --- /dev/null +++ b/Annotations/ExchangeAttribute.cs @@ -0,0 +1,15 @@ +using System; + +namespace Tapeti.Annotations +{ + [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + public class ExchangeAttribute : Attribute + { + public string Name { get; set; } + + public ExchangeAttribute(string name) + { + Name = name; + } + } +} diff --git a/Annotations/MessageHandlerAttribute.cs b/Annotations/MessageHandlerAttribute.cs deleted file mode 100644 index 2875c61..0000000 --- a/Annotations/MessageHandlerAttribute.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; - -namespace Tapeti.Annotations -{ - [AttributeUsage(AttributeTargets.Method)] - public class MessageHandlerAttribute : Attribute - { - } -} diff --git a/Annotations/QueueAttribute.cs b/Annotations/QueueAttribute.cs index 0f3e629..878390d 100644 --- a/Annotations/QueueAttribute.cs +++ b/Annotations/QueueAttribute.cs @@ -5,7 +5,14 @@ namespace Tapeti.Annotations [AttributeUsage(AttributeTargets.Class)] public class QueueAttribute : Attribute { - public string Name { get; set; } = null; - public bool Dynamic { get; set; } = false; + public string Name { get; set; } + public bool Dynamic { get; set; } + + + public QueueAttribute(string name = null) + { + Name = name; + Dynamic = (name == null); + } } } diff --git a/Connection/TapetiConsumer.cs b/Connection/TapetiConsumer.cs new file mode 100644 index 0000000..afabfd5 --- /dev/null +++ b/Connection/TapetiConsumer.cs @@ -0,0 +1,45 @@ +using System; +using System.Diagnostics.Eventing.Reader; +using RabbitMQ.Client; + +namespace Tapeti.Connection +{ + public class TapetiConsumer : DefaultBasicConsumer + { + private readonly TapetiWorker worker; + private readonly IMessageSerializer messageSerializer; + private readonly IQueueRegistration queueRegistration; + + + public TapetiConsumer(TapetiWorker worker, IMessageSerializer messageSerializer, IQueueRegistration queueRegistration) + { + this.worker = worker; + this.messageSerializer = messageSerializer; + this.queueRegistration = queueRegistration; + } + + + public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, + IBasicProperties properties, byte[] body) + { + try + { + var message = messageSerializer.Deserialize(body, properties); + if (message == null) + throw new ArgumentException("Empty message"); + + if (queueRegistration.Accept(message)) + queueRegistration.Visit(message); + else + throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); + + worker.Respond(deliveryTag, ConsumeResponse.Ack); + } + catch (Exception) + { + //TODO pluggable exception handling + worker.Respond(deliveryTag, ConsumeResponse.Nack); + } + } + } +} diff --git a/Connection/TapetiSubscriber.cs b/Connection/TapetiSubscriber.cs index 7adb01e..baafe9d 100644 --- a/Connection/TapetiSubscriber.cs +++ b/Connection/TapetiSubscriber.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; namespace Tapeti.Connection @@ -8,18 +9,15 @@ namespace Tapeti.Connection private readonly TapetiWorker worker; - public TapetiSubscriber(TapetiWorker worker, IEnumerable registrations) + public TapetiSubscriber(TapetiWorker worker) { this.worker = worker; - - ApplyTopology(registrations); } - private void ApplyTopology(IEnumerable registrations) + public async Task BindQueues(IEnumerable registrations) { - foreach (var registration in registrations) - worker.ApplyTopology(registration); + await Task.WhenAll(registrations.Select(registration => worker.Subscribe(registration)).ToList()); } } } diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index 7cbe533..10ea446 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -1,6 +1,9 @@ using System; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; +using RabbitMQ.Client.Framing; +using Tapeti.Tasks; namespace Tapeti.Connection { @@ -11,48 +14,110 @@ namespace Tapeti.Connection public string VirtualHost { get; set; } public string Username { get; set; } public string Password { get; set; } + public string PublishExchange { get; set; } + private readonly IMessageSerializer messageSerializer; + private readonly IRoutingKeyStrategy routingKeyStrategy; + private readonly Lazy taskQueue = new Lazy(); private IConnection connection; private IModel channel; - private readonly Lazy taskQueue = new Lazy(); + + + public TapetiWorker(IMessageSerializer messageSerializer, IRoutingKeyStrategy routingKeyStrategy) + { + this.messageSerializer = messageSerializer; + this.routingKeyStrategy = routingKeyStrategy; + } public Task Publish(object message) { - return taskQueue.Value.Add(() => + return taskQueue.Value.Add(async () => { - //GetChannel().BasicPublish(); - }); + var properties = new BasicProperties(); + var body = messageSerializer.Serialize(message, properties); + + (await GetChannel()) + .BasicPublish(PublishExchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false, + properties, body); + }).Unwrap(); } - public void ApplyTopology(IMessageHandlerRegistration registration) + public Task Subscribe(string queueName, IQueueRegistration queueRegistration) { - registration.ApplyTopology(GetChannel()); + return taskQueue.Value.Add(async () => + { + (await GetChannel()) + .BasicConsume(queueName, false, new TapetiConsumer(this, messageSerializer, queueRegistration)); + }).Unwrap(); + } + + + public async Task Subscribe(IQueueRegistration registration) + { + var queueName = await taskQueue.Value.Add(async () => + registration.BindQueue(await GetChannel())) + .Unwrap(); + + await Subscribe(queueName, registration); + } + + + public Task Respond(ulong deliveryTag, ConsumeResponse response) + { + return taskQueue.Value.Add(async () => + { + switch (response) + { + case ConsumeResponse.Ack: + (await GetChannel()).BasicAck(deliveryTag, false); + break; + + case ConsumeResponse.Nack: + (await GetChannel()).BasicNack(deliveryTag, false, false); + break; + + case ConsumeResponse.Requeue: + (await GetChannel()).BasicNack(deliveryTag, false, true); + break; + } + + }).Unwrap(); } public Task Close() { - if (channel != null) - { - channel.Dispose(); - channel = null; - } + if (!taskQueue.IsValueCreated) + return Task.CompletedTask; - // ReSharper disable once InvertIf - if (connection != null) + return taskQueue.Value.Add(() => { - connection.Dispose(); - connection = null; - } + if (channel != null) + { + channel.Dispose(); + channel = null; + } - return Task.CompletedTask; + // ReSharper disable once InvertIf + if (connection != null) + { + connection.Dispose(); + connection = null; + } + + taskQueue.Value.Dispose(); + }); } - private IModel GetChannel() + /// + /// Only call this from a task in the taskQueue to ensure IModel is only used + /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. + /// + private async Task GetChannel() { if (channel != null) return channel; @@ -64,19 +129,26 @@ namespace Tapeti.Connection VirtualHost = VirtualHost, UserName = Username, Password = Password, - AutomaticRecoveryEnabled = true + AutomaticRecoveryEnabled = true, + RequestedHeartbeat = 30 }; - connection = connectionFactory.CreateConnection(); - channel = connection.CreateModel(); + while (true) + { + try + { + connection = connectionFactory.CreateConnection(); + channel = connection.CreateModel(); + + break; + } + catch (BrokerUnreachableException) + { + await Task.Delay(5000); + } + } return channel; } - - - private class ScheduledWorkItem - { - - } } } diff --git a/Connection/TaskQueue.cs b/Connection/TaskQueue.cs deleted file mode 100644 index 0dae6b5..0000000 --- a/Connection/TaskQueue.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Tapeti.Connection -{ - public class TaskQueue - { - private readonly object previousTaskLock = new object(); - private Task previousTask = Task.CompletedTask; - - - public Task Add(Action action) - { - lock (previousTaskLock) - { - previousTask = previousTask.ContinueWith(t => action(), CancellationToken.None - , TaskContinuationOptions.None - , TaskScheduler.Default); - return previousTask; - } - } - } -} diff --git a/Default/DefaultControllerFactory.cs b/Default/DefaultControllerFactory.cs index 7cd0a17..9918c18 100644 --- a/Default/DefaultControllerFactory.cs +++ b/Default/DefaultControllerFactory.cs @@ -1,12 +1,62 @@ using System; +using System.Collections.Generic; +using System.Reflection; namespace Tapeti.Default { public class DefaultControllerFactory : IControllerFactory { + private readonly Dictionary> controllerConstructors = new Dictionary>(); + private readonly Func publisherFactory; + + public DefaultControllerFactory(Func publisherFactory) + { + this.publisherFactory = publisherFactory; + } + + public object CreateController(Type controllerType) { - throw new NotImplementedException(); + Func constructor; + if (!controllerConstructors.TryGetValue(controllerType, out constructor)) + throw new ArgumentException($"Can not create unregistered controller {controllerType.FullName}"); + + return constructor(); + } + + + public void RegisterController(Type type) + { + controllerConstructors.Add(type, GetConstructor(type)); + } + + + protected Func GetConstructor(Type type) + { + var constructors = type.GetConstructors(); + + ConstructorInfo publisherConstructor = null; + ConstructorInfo emptyConstructor = null; + + foreach (var constructor in constructors) + { + var parameters = constructor.GetParameters(); + if (parameters.Length > 0) + { + if (parameters.Length == 1 && parameters[0].ParameterType == typeof(IPublisher)) + publisherConstructor = constructor; + } + else + emptyConstructor = constructor; + } + + if (publisherConstructor != null) + return () => publisherConstructor.Invoke(new object[] { publisherFactory() }); + + if (emptyConstructor != null) + return () => emptyConstructor.Invoke(null); + + throw new ArgumentException($"Unable to construct type {type.Name}, a parameterless constructor or one with only an IPublisher parameter is required"); } } } diff --git a/Default/DefaultDependencyResolver.cs b/Default/DefaultDependencyResolver.cs new file mode 100644 index 0000000..f9d987c --- /dev/null +++ b/Default/DefaultDependencyResolver.cs @@ -0,0 +1,46 @@ +using System; + +namespace Tapeti.Default +{ + /** + * !! IoC Container 9000 !! + * + * ...you probably want to replace this one as soon as possible. + * + * A Simple Injector implementation is provided in the Tapeti.SimpleInjector package. + */ + public class DefaultDependencyResolver : IDependencyInjector + { + private readonly Lazy controllerFactory; + private readonly Lazy routingKeyStrategy = new Lazy(); + private readonly Lazy messageSerializer = new Lazy(); + + + + public DefaultDependencyResolver(Func publisherFactory) + { + controllerFactory = new Lazy(() => new DefaultControllerFactory(publisherFactory)); + } + + + public T Resolve() where T : class + { + if (typeof(T) == typeof(IControllerFactory)) + return (T)(controllerFactory.Value as IControllerFactory); + + if (typeof(T) == typeof(IRoutingKeyStrategy)) + return (T)(routingKeyStrategy.Value as IRoutingKeyStrategy); + + if (typeof(T) == typeof(IMessageSerializer)) + return (T)(messageSerializer.Value as IMessageSerializer); + + return default(T); + } + + + public void RegisterController(Type type) + { + controllerFactory.Value.RegisterController(type); + } + } +} diff --git a/Default/DefaultMessageSerializer.cs b/Default/DefaultMessageSerializer.cs new file mode 100644 index 0000000..37fde89 --- /dev/null +++ b/Default/DefaultMessageSerializer.cs @@ -0,0 +1,84 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using Newtonsoft.Json; +using Newtonsoft.Json.Converters; +using RabbitMQ.Client; + +namespace Tapeti.Default +{ + public class DefaultMessageSerializer : IMessageSerializer + { + protected const string ContentType = "application/json"; + protected const string ClassTypeHeader = "classType"; + + + private readonly ConcurrentDictionary deserializedTypeNames = new ConcurrentDictionary(); + private readonly ConcurrentDictionary serializedTypeNames = new ConcurrentDictionary(); + private readonly JsonSerializerSettings serializerSettings; + + public DefaultMessageSerializer() + { + serializerSettings = new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore + }; + + serializerSettings.Converters.Add(new StringEnumConverter()); + } + + + public byte[] Serialize(object message, IBasicProperties properties) + { + if (properties.Headers == null) + properties.Headers = new Dictionary(); + + var typeName = serializedTypeNames.GetOrAdd(message.GetType(), SerializeTypeName); + + properties.Headers.Add(ClassTypeHeader, Encoding.UTF8.GetBytes(typeName)); + properties.ContentType = ContentType; + + return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message, serializerSettings)); + } + + + public object Deserialize(byte[] body, IBasicProperties properties) + { + object typeName; + + if (!properties.ContentType.Equals(ContentType)) + throw new ArgumentException("content_type must be {ContentType}"); + + if (properties.Headers == null || !properties.Headers.TryGetValue(ClassTypeHeader, out typeName)) + throw new ArgumentException($"{ClassTypeHeader} header not present"); + + var messageType = deserializedTypeNames.GetOrAdd(Encoding.UTF8.GetString((byte[])typeName), DeserializeTypeName); + return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body), messageType); + } + + + + public virtual Type DeserializeTypeName(string typeName) + { + var parts = typeName.Split(':'); + if (parts.Length != 2) + throw new ArgumentException($"Invalid type name {typeName}"); + + var type = Type.GetType(parts[0] + "," + parts[1]); + if (type == null) + throw new ArgumentException($"Unable to resolve type {typeName}"); + + return type; + } + + public virtual string SerializeTypeName(Type type) + { + var typeName = type.FullName + ":" + type.Assembly.GetName().Name; + if (typeName.Length > 255) + throw new ArgumentException($"Type name {typeName} exceeds AMQP 255 character limit"); + + return typeName; + } + } +} diff --git a/Default/DefaultRoutingKeyStrategy.cs b/Default/DefaultRoutingKeyStrategy.cs index 98c6cd0..48e9260 100644 --- a/Default/DefaultRoutingKeyStrategy.cs +++ b/Default/DefaultRoutingKeyStrategy.cs @@ -1,6 +1,59 @@ -namespace Tapeti.Default +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; + +namespace Tapeti.Default { public class DefaultRoutingKeyStrategy : IRoutingKeyStrategy { + private readonly ConcurrentDictionary routingKeyCache = new ConcurrentDictionary(); + + + public string GetRoutingKey(Type messageType) + { + return routingKeyCache.GetOrAdd(messageType, BuildRoutingKey); + } + + + protected virtual string BuildRoutingKey(Type messageType) + { + // Split PascalCase into dot-separated parts. If the class name ends in "Message" leave that out. + var words = SplitUpperCase(messageType.Name); + + if (words.Count > 1 && words.Last().Equals("Message", StringComparison.InvariantCulture)) + words.RemoveAt(words.Count - 1); + + return string.Join(".", words.Select(s => s.ToLower())); + } + + + protected static List SplitUpperCase(string source) + { + var words = new List(); + + if (string.IsNullOrEmpty(source)) + return words; + + var wordStartIndex = 0; + + var letters = source.ToCharArray(); + var previousChar = char.MinValue; + + // Intentionally skip the first character + for (var charIndex = 1; charIndex < letters.Length; charIndex++) + { + if (char.IsUpper(letters[charIndex]) && !char.IsWhiteSpace(previousChar)) + { + words.Add(new string(letters, wordStartIndex, charIndex - wordStartIndex)); + wordStartIndex = charIndex; + } + + previousChar = letters[charIndex]; + } + + words.Add(new string(letters, wordStartIndex, letters.Length - wordStartIndex)); + return words; + } } } diff --git a/IDependencyResolver.cs b/IDependencyResolver.cs new file mode 100644 index 0000000..e04aa84 --- /dev/null +++ b/IDependencyResolver.cs @@ -0,0 +1,15 @@ +using System; + +namespace Tapeti +{ + public interface IDependencyResolver + { + T Resolve() where T : class; + } + + + public interface IDependencyInjector : IDependencyResolver + { + void RegisterController(Type type); + } +} diff --git a/IMessageSerializer.cs b/IMessageSerializer.cs new file mode 100644 index 0000000..ada89c6 --- /dev/null +++ b/IMessageSerializer.cs @@ -0,0 +1,10 @@ +using RabbitMQ.Client; + +namespace Tapeti +{ + public interface IMessageSerializer + { + byte[] Serialize(object message, IBasicProperties properties); + object Deserialize(byte[] body, IBasicProperties properties); + } +} diff --git a/IMessageHandlerRegistration.cs b/IQueueRegistration.cs similarity index 63% rename from IMessageHandlerRegistration.cs rename to IQueueRegistration.cs index e2ccd51..901af4b 100644 --- a/IMessageHandlerRegistration.cs +++ b/IQueueRegistration.cs @@ -3,9 +3,9 @@ using RabbitMQ.Client; namespace Tapeti { - public interface IMessageHandlerRegistration + public interface IQueueRegistration { - void ApplyTopology(IModel channel); + string BindQueue(IModel channel); bool Accept(object message); Task Visit(object message); diff --git a/IRoutingKeyStrategy.cs b/IRoutingKeyStrategy.cs index 3bb3dfe..e13f287 100644 --- a/IRoutingKeyStrategy.cs +++ b/IRoutingKeyStrategy.cs @@ -1,6 +1,9 @@ -namespace Tapeti +using System; + +namespace Tapeti { public interface IRoutingKeyStrategy { + string GetRoutingKey(Type messageType); } } diff --git a/ISubscriber.cs b/ISubscriber.cs index fae4328..2be1a35 100644 --- a/ISubscriber.cs +++ b/ISubscriber.cs @@ -1,4 +1,6 @@ -namespace Tapeti +using System; + +namespace Tapeti { public interface ISubscriber { diff --git a/Registration/AbstractControllerRegistration.cs b/Registration/AbstractControllerRegistration.cs index b0f4725..f0b2c64 100644 --- a/Registration/AbstractControllerRegistration.cs +++ b/Registration/AbstractControllerRegistration.cs @@ -10,59 +10,96 @@ namespace Tapeti.Registration { using MessageHandlerAction = Func; - public abstract class AbstractControllerRegistration : IMessageHandlerRegistration + public struct MessageHandler { - private readonly IControllerFactory controllerFactory; + public MessageHandlerAction Action; + public string Exchange; + public string RoutingKey; + } + + + public abstract class AbstractControllerRegistration : IQueueRegistration + { + private readonly Func controllerFactoryFactory; private readonly Type controllerType; - private readonly Dictionary> messageHandlers = new Dictionary>(); + private readonly string defaultExchange; + private readonly Dictionary> messageHandlers = new Dictionary>(); - protected AbstractControllerRegistration(IControllerFactory controllerFactory, Type controllerType) + protected AbstractControllerRegistration(Func controllerFactoryFactory, Type controllerType, string defaultExchange) { - this.controllerFactory = controllerFactory; + this.controllerFactoryFactory = controllerFactoryFactory; this.controllerType = controllerType; + this.defaultExchange = defaultExchange; // ReSharper disable once VirtualMemberCallInConstructor - I know. What do you think this is, C++? - GetMessageHandlers((type, handler) => + GetMessageHandlers(controllerType, (type, handler) => { if (!messageHandlers.ContainsKey(type)) - messageHandlers.Add(type, new List { handler }); + messageHandlers.Add(type, new List { handler }); else messageHandlers[type].Add(handler); }); } - protected virtual void GetMessageHandlers(Action add) + protected virtual void GetMessageHandlers(Type type, Action add) { - foreach (var method in GetType().GetMembers() - .Where(m => m.MemberType == MemberTypes.Method && m.IsDefined(typeof(MessageHandlerAttribute), true)) + foreach (var method in type.GetMembers(BindingFlags.Public | BindingFlags.Instance) + .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object)) .Select(m => (MethodInfo)m)) { - var parameters = method.GetParameters(); + Type messageType; + var messageHandler = GetMessageHandler(method, out messageType); - if (parameters.Length != 1 || !parameters[0].ParameterType.IsClass) - throw new ArgumentException($"Method {0} does not have a single object parameter", method.Name); - - var messageType = parameters[0].ParameterType; - - if (method.ReturnType == typeof(void)) - add(messageType, CreateSyncMessageHandler(method)); - else if (method.ReturnType == typeof(Task)) - add(messageType, CreateAsyncMessageHandler(method)); - else - throw new ArgumentException($"Method {0} needs to return void or a Task", method.Name); + add(messageType, messageHandler); } } + protected virtual MessageHandler GetMessageHandler(MethodInfo method, out Type messageType) + { + var parameters = method.GetParameters(); + + if (parameters.Length != 1 || !parameters[0].ParameterType.IsClass) + throw new ArgumentException($"Method {method.Name} does not have a single object parameter"); + + messageType = parameters[0].ParameterType; + var messageHandler = new MessageHandler(); + + if (method.ReturnType == typeof(void)) + messageHandler.Action = CreateSyncMessageHandler(method); + else if (method.ReturnType == typeof(Task)) + messageHandler.Action = CreateAsyncMessageHandler(method); + else + throw new ArgumentException($"Method {method.Name} needs to return void or a Task"); + + var exchangeAttribute = method.GetCustomAttribute() ?? method.DeclaringType.GetCustomAttribute(); + messageHandler.Exchange = exchangeAttribute?.Name; + + return messageHandler; + } + + protected IEnumerable GetMessageTypes() { return messageHandlers.Keys; } - public abstract void ApplyTopology(IModel channel); + protected IEnumerable GetMessageExchanges(Type type) + { + var exchanges = messageHandlers[type] + .Where(h => h.Exchange != null) + .Select(h => h.Exchange) + .Distinct(StringComparer.InvariantCulture) + .ToArray(); + + return exchanges.Length > 0 ? exchanges : new[] { defaultExchange }; + } + + + public abstract string BindQueue(IModel channel); public bool Accept(object message) @@ -75,7 +112,7 @@ namespace Tapeti.Registration { var registeredHandlers = messageHandlers[message.GetType()]; if (registeredHandlers != null) - return Task.WhenAll(registeredHandlers.Select(messageHandler => messageHandler(message))); + return Task.WhenAll(registeredHandlers.Select(messageHandler => messageHandler.Action(message))); return Task.CompletedTask; } @@ -85,7 +122,7 @@ namespace Tapeti.Registration { return message => { - var controller = controllerFactory.CreateController(controllerType); + var controller = controllerFactoryFactory().CreateController(controllerType); method.Invoke(controller, new[] { message }); return Task.CompletedTask; @@ -97,7 +134,7 @@ namespace Tapeti.Registration { return message => { - var controller = controllerFactory.CreateController(controllerType); + var controller = controllerFactoryFactory().CreateController(controllerType); return (Task)method.Invoke(controller, new[] { message }); }; } diff --git a/Registration/ControllerDynamicQueueRegistration.cs b/Registration/ControllerDynamicQueueRegistration.cs index a9aa5f1..ad7f787 100644 --- a/Registration/ControllerDynamicQueueRegistration.cs +++ b/Registration/ControllerDynamicQueueRegistration.cs @@ -5,27 +5,29 @@ namespace Tapeti.Registration { public class ControllerDynamicQueueRegistration : AbstractControllerRegistration { - private readonly IRoutingKeyStrategy routingKeyStrategy; + private readonly Func routingKeyStrategyFactory; - public ControllerDynamicQueueRegistration(IControllerFactory controllerFactory, IRoutingKeyStrategy routingKeyStrategy, Type controllerType) - : base(controllerFactory, controllerType) + public ControllerDynamicQueueRegistration(Func controllerFactoryFactory, Func routingKeyStrategyFactory, Type controllerType, string defaultExchange) + : base(controllerFactoryFactory, controllerType, defaultExchange) { - this.routingKeyStrategy = routingKeyStrategy; + this.routingKeyStrategyFactory = routingKeyStrategyFactory; } - public override void ApplyTopology(IModel channel) + public override string BindQueue(IModel channel) { var queue = channel.QueueDeclare(); foreach (var messageType in GetMessageTypes()) { - //TODO use routing key attribute(s) for method or use strategy - //TODO use exchange attribute or default setting + var routingKey = routingKeyStrategyFactory().GetRoutingKey(messageType); - //channel.QueueBind(queue.QueueName, ); + foreach (var exchange in GetMessageExchanges(messageType)) + channel.QueueBind(queue.QueueName, exchange, routingKey); } + + return queue.QueueName; } } } diff --git a/Registration/ControllerQueueRegistration.cs b/Registration/ControllerQueueRegistration.cs index eb8d96c..3be95d5 100644 --- a/Registration/ControllerQueueRegistration.cs +++ b/Registration/ControllerQueueRegistration.cs @@ -7,15 +7,15 @@ namespace Tapeti.Registration { private readonly string queueName; - public ControllerQueueRegistration(IControllerFactory controllerFactory, IRoutingKeyStrategy routingKeyStrategy, Type controllerType, string queueName) : base(controllerFactory, controllerType) + public ControllerQueueRegistration(Func controllerFactoryFactory, Type controllerType, string defaultExchange, string queueName) : base(controllerFactoryFactory, controllerType, defaultExchange) { this.queueName = queueName; } - public override void ApplyTopology(IModel channel) + public override string BindQueue(IModel channel) { - channel.QueueDeclarePassive(queueName); + return channel.QueueDeclarePassive(queueName).QueueName; } } } diff --git a/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs b/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..e01df40 --- /dev/null +++ b/Tapeti.SimpleInjector/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Tapeti.SimpleInjector")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Hewlett-Packard Company")] +[assembly: AssemblyProduct("Tapeti.SimpleInjector")] +[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("d7ec6f86-eb3b-49c3-8fe7-6e8c1bb413a6")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs b/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs new file mode 100644 index 0000000..8f32b3a --- /dev/null +++ b/Tapeti.SimpleInjector/SimpleInjectorControllerFactory.cs @@ -0,0 +1,22 @@ +using System; +using SimpleInjector; + +namespace Tapeti.SimpleInjector +{ + public class SimpleInjectorControllerFactory : IControllerFactory + { + private readonly Container container; + + + public SimpleInjectorControllerFactory(Container container) + { + this.container = container; + } + + + public object CreateController(Type controllerType) + { + return container.GetInstance(controllerType); + } + } +} diff --git a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs new file mode 100644 index 0000000..db5e01e --- /dev/null +++ b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs @@ -0,0 +1,56 @@ +using System.Linq; +using System.Reflection; +using SimpleInjector; +using Tapeti.Annotations; +using Tapeti.Default; +using System.Collections.Generic; + +namespace Tapeti.SimpleInjector +{ + public class SimpleInjectorDependencyResolver : IDependencyResolver + { + private readonly Container container; + + public SimpleInjectorDependencyResolver(Container container, bool registerDefaults = true) + { + this.container = container; + + if (registerDefaults) + RegisterDefaults(); + } + + public T Resolve() where T : class + { + return container.GetInstance(); + } + + + public SimpleInjectorDependencyResolver RegisterDefaults() + { + var currentRegistrations = container.GetCurrentRegistrations(); + + IfUnregistered(currentRegistrations); + IfUnregistered(currentRegistrations); + IfUnregistered(currentRegistrations); + + return this; + } + + + public SimpleInjectorDependencyResolver RegisterAllControllers(Assembly assembly) + { + foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute)))) + container.Register(type); + + return this; + } + + + private void IfUnregistered(IEnumerable currentRegistrations) where TService : class where TImplementation: class, TService + { + // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates + if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService))) + container.Register(); + } + } +} diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj new file mode 100644 index 0000000..f91dc29 --- /dev/null +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj @@ -0,0 +1,69 @@ + + + + + Debug + AnyCPU + {D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6} + Library + Properties + Tapeti.SimpleInjector + Tapeti.SimpleInjector + v4.6.1 + 512 + + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\packages\SimpleInjector.3.2.7\lib\net45\SimpleInjector.dll + True + + + + + + + + + + + + + + + + + + {8ab4fd33-4aaa-465c-8579-9db3f3b23813} + Tapeti + + + + + + + + \ No newline at end of file diff --git a/Tapeti.SimpleInjector/packages.config b/Tapeti.SimpleInjector/packages.config new file mode 100644 index 0000000..40ad547 --- /dev/null +++ b/Tapeti.SimpleInjector/packages.config @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/Tapeti.csproj b/Tapeti.csproj index dae1b5d..a32def8 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -31,6 +31,10 @@ 4 + + packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll + True + packages\RabbitMQ.Client.4.1.1\lib\net451\RabbitMQ.Client.dll True @@ -45,18 +49,24 @@ - + + - + + + + + + - + diff --git a/Tapeti.sln b/Tapeti.sln index e895061..49530f0 100644 --- a/Tapeti.sln +++ b/Tapeti.sln @@ -5,6 +5,10 @@ VisualStudioVersion = 14.0.25420.1 MinimumVisualStudioVersion = 10.0.40219.1 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti", "Tapeti.csproj", "{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.SimpleInjector", "Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj", "{D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Test", "Test\Test.csproj", "{90559950-1B32-4119-A78E-517E2C71EE23}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -15,6 +19,14 @@ Global {8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Debug|Any CPU.Build.0 = Debug|Any CPU {8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Release|Any CPU.ActiveCfg = Release|Any CPU {8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Release|Any CPU.Build.0 = Release|Any CPU + {D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6}.Release|Any CPU.Build.0 = Release|Any CPU + {90559950-1B32-4119-A78E-517E2C71EE23}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {90559950-1B32-4119-A78E-517E2C71EE23}.Debug|Any CPU.Build.0 = Debug|Any CPU + {90559950-1B32-4119-A78E-517E2C71EE23}.Release|Any CPU.ActiveCfg = Release|Any CPU + {90559950-1B32-4119-A78E-517E2C71EE23}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/TapetiConnection.cs b/TapetiConnection.cs index 7e01be1..f0c1111 100644 --- a/TapetiConnection.cs +++ b/TapetiConnection.cs @@ -17,31 +17,26 @@ namespace Tapeti public string VirtualHost { get; set; } = "/"; public string Username { get; set; } = "guest"; public string Password { get; set; } = "guest"; + public string PublishExchange { get; set; } = ""; + public string SubscribeExchange { get; set; } = ""; - public IControllerFactory ControllerFactory + + public IDependencyResolver DependencyResolver { - get { return controllerFactory ?? (controllerFactory = new DefaultControllerFactory()); } - set { controllerFactory = value; } + get { return dependencyResolver ?? (dependencyResolver = new DefaultDependencyResolver(GetPublisher)); } + set { dependencyResolver = value; } } - public IRoutingKeyStrategy RoutingKeyStrategy - { - get { return routingKeyStrategy ?? (routingKeyStrategy = new DefaultRoutingKeyStrategy()); } - set { routingKeyStrategy = value; } - } - - - private IControllerFactory controllerFactory; - private IRoutingKeyStrategy routingKeyStrategy; - private List registrations; + private IDependencyResolver dependencyResolver; + private List registrations; private TapetiWorker worker; - public TapetiConnection WithControllerFactory(IControllerFactory factory) + public TapetiConnection WithDependencyResolver(IDependencyResolver resolver) { - controllerFactory = factory; + dependencyResolver = resolver; return this; } @@ -57,16 +52,22 @@ namespace Tapeti if (!string.IsNullOrEmpty(queueAttribute.Name)) throw new ArgumentException("Dynamic queue attributes must not have a Name"); - GetRegistrations().Add(new ControllerDynamicQueueRegistration(controllerFactory, routingKeyStrategy, type)); + GetRegistrations().Add(new ControllerDynamicQueueRegistration( + DependencyResolver.Resolve, + DependencyResolver.Resolve, + type, SubscribeExchange)); } else { if (string.IsNullOrEmpty(queueAttribute.Name)) throw new ArgumentException("Non-dynamic queue attribute must have a Name"); - GetRegistrations().Add(new ControllerQueueRegistration(controllerFactory, routingKeyStrategy, type, queueAttribute.Name)); + GetRegistrations().Add(new ControllerQueueRegistration( + DependencyResolver.Resolve, + type, SubscribeExchange, queueAttribute.Name)); } + (DependencyResolver as IDependencyInjector)?.RegisterController(type); return this; } @@ -86,12 +87,15 @@ namespace Tapeti } - public ISubscriber Subscribe() + public async Task Subscribe() { if (registrations == null || registrations.Count == 0) throw new ArgumentException("No controllers registered"); - return new TapetiSubscriber(GetWorker(), registrations); + var subscriber = new TapetiSubscriber(GetWorker()); + await subscriber.BindQueues(registrations); + + return subscriber; } @@ -117,21 +121,24 @@ namespace Tapeti } - protected List GetRegistrations() + protected List GetRegistrations() { - return registrations ?? (registrations = new List()); + return registrations ?? (registrations = new List()); } protected TapetiWorker GetWorker() { - return worker ?? (worker = new TapetiWorker + return worker ?? (worker = new TapetiWorker( + DependencyResolver.Resolve(), + DependencyResolver.Resolve()) { HostName = HostName, Port = Port, VirtualHost = VirtualHost, Username = Username, - Password = Password + Password = Password, + PublishExchange = PublishExchange }); } } diff --git a/TapetiTypes.cs b/TapetiTypes.cs new file mode 100644 index 0000000..2997930 --- /dev/null +++ b/TapetiTypes.cs @@ -0,0 +1,9 @@ +namespace Tapeti +{ + public enum ConsumeResponse + { + Ack, + Nack, + Requeue + } +} diff --git a/Tasks/SingleThreadTaskQueue.cs b/Tasks/SingleThreadTaskQueue.cs new file mode 100644 index 0000000..fa28949 --- /dev/null +++ b/Tasks/SingleThreadTaskQueue.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Tapeti.Tasks +{ + public class SingleThreadTaskQueue : IDisposable + { + private readonly object previousTaskLock = new object(); + private Task previousTask = Task.CompletedTask; + + private readonly Lazy singleThreadScheduler = new Lazy(); + + + public Task Add(Action action) + { + lock (previousTaskLock) + { + previousTask = previousTask.ContinueWith(t => action(), CancellationToken.None + , TaskContinuationOptions.None + , singleThreadScheduler.Value); + + return previousTask; + } + } + + + public Task Add(Func func) + { + lock (previousTaskLock) + { + var task = previousTask.ContinueWith(t => func(), CancellationToken.None + , TaskContinuationOptions.None + , singleThreadScheduler.Value); + + previousTask = task; + return task; + } + } + + + public void Dispose() + { + if (singleThreadScheduler.IsValueCreated) + singleThreadScheduler.Value.Dispose(); + } + } + + + public class SingleThreadTaskScheduler : TaskScheduler, IDisposable + { + public override int MaximumConcurrencyLevel => 1; + + + private readonly Queue scheduledTasks = new Queue(); + private bool disposed; + + + public SingleThreadTaskScheduler() + { + // ReSharper disable once ObjectCreationAsStatement - fire and forget! + new Thread(WorkerThread).Start(); + } + + + public void Dispose() + { + lock (scheduledTasks) + { + disposed = true; + Monitor.PulseAll(scheduledTasks); + } + } + + + protected override void QueueTask(Task task) + { + if (disposed) return; + + lock (scheduledTasks) + { + scheduledTasks.Enqueue(task); + Monitor.Pulse(scheduledTasks); + } + } + + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + return false; + } + + + protected override IEnumerable GetScheduledTasks() + { + lock (scheduledTasks) + { + return scheduledTasks.ToList(); + } + } + + + private void WorkerThread() + { + while(true) + { + Task task; + lock (scheduledTasks) + { + task = WaitAndDequeueTask(); + } + + if (task == null) + break; + + TryExecuteTask(task); + } + } + + private Task WaitAndDequeueTask() + { + while (!scheduledTasks.Any() && !disposed) + Monitor.Wait(scheduledTasks); + + return disposed ? null : scheduledTasks.Dequeue(); + } + } +} diff --git a/Test/App.config b/Test/App.config new file mode 100644 index 0000000..bae5d6d --- /dev/null +++ b/Test/App.config @@ -0,0 +1,6 @@ + + + + + + diff --git a/Test/Program.cs b/Test/Program.cs new file mode 100644 index 0000000..37858a9 --- /dev/null +++ b/Test/Program.cs @@ -0,0 +1,38 @@ +using System; +using SimpleInjector; +using Tapeti; +using Tapeti.SimpleInjector; + +namespace Test +{ + internal class Program + { + private static void Main() + { + var container = new Container(); + + using (var connection = new TapetiConnection + { + PublishExchange = "test", + SubscribeExchange = "test" + } + .WithDependencyResolver(new SimpleInjectorDependencyResolver(container)) + .RegisterAllControllers(typeof(Program).Assembly)) + { + container.Register(() => connection.GetPublisher()); + + Console.WriteLine("Subscribing..."); + connection.Subscribe().Wait(); + Console.WriteLine("Done!"); + + var publisher = connection.GetPublisher(); + + //for (var x = 0; x < 5000; x++) + while(true) + publisher.Publish(new MarcoMessage()).Wait(); + + Console.ReadLine(); + } + } + } +} diff --git a/Test/Properties/AssemblyInfo.cs b/Test/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..5d677b1 --- /dev/null +++ b/Test/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Test")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Hewlett-Packard Company")] +[assembly: AssemblyProduct("Test")] +[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("90559950-1b32-4119-a78e-517e2c71ee23")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Test/Test.csproj b/Test/Test.csproj new file mode 100644 index 0000000..d60e49b --- /dev/null +++ b/Test/Test.csproj @@ -0,0 +1,77 @@ + + + + + Debug + AnyCPU + {90559950-1B32-4119-A78E-517E2C71EE23} + Exe + Properties + Test + Test + v4.6.1 + 512 + true + + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\packages\SimpleInjector.3.2.7\lib\net45\SimpleInjector.dll + True + + + + + + + + + + + + + + + + + + + + + + {8ab4fd33-4aaa-465c-8579-9db3f3b23813} + Tapeti + + + {d7ec6f86-eb3b-49c3-8fe7-6e8c1bb413a6} + Tapeti.SimpleInjector + + + + + \ No newline at end of file diff --git a/Test/TestQueueController.cs b/Test/TestQueueController.cs new file mode 100644 index 0000000..2eed944 --- /dev/null +++ b/Test/TestQueueController.cs @@ -0,0 +1,44 @@ +using System; +using System.Threading.Tasks; +using Tapeti; +using Tapeti.Annotations; + +namespace Test +{ + //[Exchange("myexchange")] + //[Queue("staticqueue")] + [Queue] + public class TestQueueController + { + private readonly IPublisher publisher; + + + public TestQueueController(IPublisher publisher) + { + this.publisher = publisher; + } + + + public async Task Marco(MarcoMessage message) + { + Console.WriteLine("Marco!"); + await publisher.Publish(new PoloMessage()); + } + + + public void Polo(PoloMessage message) + { + Console.WriteLine("Polo!"); + } + } + + + public class MarcoMessage + { + } + + + public class PoloMessage + { + } +} diff --git a/Test/packages.config b/Test/packages.config new file mode 100644 index 0000000..5f610fe --- /dev/null +++ b/Test/packages.config @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/packages.config b/packages.config index 0a2eac0..05f5cb9 100644 --- a/packages.config +++ b/packages.config @@ -1,4 +1,5 @@  + \ No newline at end of file