From 97b654737a0545e470a9931db2af729a45ae02b6 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 16 Nov 2016 23:11:05 +0100 Subject: [PATCH] Some initial ideas worked out, with a generous amount of todos --- .gitignore | 4 + Tapeti/Annotations/MessageHandlerAttribute.cs | 9 ++ Tapeti/Annotations/QueueAttribute.cs | 11 ++ Tapeti/Connection/TapetiPublisher.cs | 13 ++ Tapeti/Connection/TapetiSubscriber.cs | 25 ++++ Tapeti/Connection/TapetiWorker.cs | 58 ++++++++ Tapeti/Default/DefaultControllerFactory.cs | 12 ++ Tapeti/Default/DefaultRoutingKeyStrategy.cs | 6 + Tapeti/IControllerFactory.cs | 9 ++ Tapeti/IMessageHandlerRegistration.cs | 13 ++ Tapeti/IPublisher.cs | 6 + Tapeti/IRoutingKeyStrategy.cs | 6 + Tapeti/ISubscriber.cs | 6 + Tapeti/Properties/AssemblyInfo.cs | 36 +++++ .../AbstractControllerRegistration.cs | 105 +++++++++++++ .../ControllerDynamicQueueRegistration.cs | 31 ++++ .../ControllerQueueRegistration.cs | 21 +++ Tapeti/Tapeti.csproj | 77 ++++++++++ Tapeti/Tapeti.sln | 22 +++ Tapeti/TapetiConnection.cs | 138 ++++++++++++++++++ Tapeti/packages.config | 4 + 21 files changed, 612 insertions(+) create mode 100644 .gitignore create mode 100644 Tapeti/Annotations/MessageHandlerAttribute.cs create mode 100644 Tapeti/Annotations/QueueAttribute.cs create mode 100644 Tapeti/Connection/TapetiPublisher.cs create mode 100644 Tapeti/Connection/TapetiSubscriber.cs create mode 100644 Tapeti/Connection/TapetiWorker.cs create mode 100644 Tapeti/Default/DefaultControllerFactory.cs create mode 100644 Tapeti/Default/DefaultRoutingKeyStrategy.cs create mode 100644 Tapeti/IControllerFactory.cs create mode 100644 Tapeti/IMessageHandlerRegistration.cs create mode 100644 Tapeti/IPublisher.cs create mode 100644 Tapeti/IRoutingKeyStrategy.cs create mode 100644 Tapeti/ISubscriber.cs create mode 100644 Tapeti/Properties/AssemblyInfo.cs create mode 100644 Tapeti/Registration/AbstractControllerRegistration.cs create mode 100644 Tapeti/Registration/ControllerDynamicQueueRegistration.cs create mode 100644 Tapeti/Registration/ControllerQueueRegistration.cs create mode 100644 Tapeti/Tapeti.csproj create mode 100644 Tapeti/Tapeti.sln create mode 100644 Tapeti/TapetiConnection.cs create mode 100644 Tapeti/packages.config diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e758713 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.vs/ +bin/ +obj/ +packages/ diff --git a/Tapeti/Annotations/MessageHandlerAttribute.cs b/Tapeti/Annotations/MessageHandlerAttribute.cs new file mode 100644 index 0000000..2875c61 --- /dev/null +++ b/Tapeti/Annotations/MessageHandlerAttribute.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Annotations +{ + [AttributeUsage(AttributeTargets.Method)] + public class MessageHandlerAttribute : Attribute + { + } +} diff --git a/Tapeti/Annotations/QueueAttribute.cs b/Tapeti/Annotations/QueueAttribute.cs new file mode 100644 index 0000000..0f3e629 --- /dev/null +++ b/Tapeti/Annotations/QueueAttribute.cs @@ -0,0 +1,11 @@ +using System; + +namespace Tapeti.Annotations +{ + [AttributeUsage(AttributeTargets.Class)] + public class QueueAttribute : Attribute + { + public string Name { get; set; } = null; + public bool Dynamic { get; set; } = false; + } +} diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs new file mode 100644 index 0000000..60af7c6 --- /dev/null +++ b/Tapeti/Connection/TapetiPublisher.cs @@ -0,0 +1,13 @@ +namespace Tapeti.Connection +{ + public class TapetiPublisher : IPublisher + { + private readonly TapetiWorker worker; + + + public TapetiPublisher(TapetiWorker worker) + { + this.worker = worker; + } + } +} diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs new file mode 100644 index 0000000..c83fc70 --- /dev/null +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -0,0 +1,25 @@ +using System.Collections.Generic; + +namespace Tapeti.Connection +{ + public class TapetiSubscriber : ISubscriber + { + private readonly TapetiWorker worker; + + + public TapetiSubscriber(TapetiWorker worker, IEnumerable registrations) + { + this.worker = worker; + + ApplyTopology(registrations); + + } + + + private void ApplyTopology(IEnumerable registrations) + { + foreach (var registration in registrations) + registration.ApplyTopology(worker.GetChannel()); + } + } +} diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs new file mode 100644 index 0000000..b1719ff --- /dev/null +++ b/Tapeti/Connection/TapetiWorker.cs @@ -0,0 +1,58 @@ +using System.Threading.Tasks; +using RabbitMQ.Client; + +namespace Tapeti.Connection +{ + public class TapetiWorker + { + public string HostName { get; set; } + public int Port { get; set; } + public string VirtualHost { get; set; } + public string Username { get; set; } + public string Password { get; set; } + + + private IConnection connection; + private IModel channel; + + + public Task Close() + { + if (channel != null) + { + channel.Dispose(); + channel = null; + } + + if (connection != null) + { + connection.Dispose(); + connection = null; + } + + return Task.CompletedTask; + } + + + public IModel GetChannel() + { + if (channel != null) + return channel; + + var connectionFactory = new ConnectionFactory + { + HostName = HostName, + Port = Port, + VirtualHost = VirtualHost, + UserName = Username, + Password = Password, + AutomaticRecoveryEnabled = true + }; + + connection = connectionFactory.CreateConnection(); + channel = connection.CreateModel(); + + return channel; + } + } +} diff --git a/Tapeti/Default/DefaultControllerFactory.cs b/Tapeti/Default/DefaultControllerFactory.cs new file mode 100644 index 0000000..7cd0a17 --- /dev/null +++ b/Tapeti/Default/DefaultControllerFactory.cs @@ -0,0 +1,12 @@ +using System; + +namespace Tapeti.Default +{ + public class DefaultControllerFactory : IControllerFactory + { + public object CreateController(Type controllerType) + { + throw new NotImplementedException(); + } + } +} diff --git a/Tapeti/Default/DefaultRoutingKeyStrategy.cs b/Tapeti/Default/DefaultRoutingKeyStrategy.cs new file mode 100644 index 0000000..98c6cd0 --- /dev/null +++ b/Tapeti/Default/DefaultRoutingKeyStrategy.cs @@ -0,0 +1,6 @@ +namespace Tapeti.Default +{ + public class DefaultRoutingKeyStrategy : IRoutingKeyStrategy + { + } +} diff --git a/Tapeti/IControllerFactory.cs b/Tapeti/IControllerFactory.cs new file mode 100644 index 0000000..6522f3f --- /dev/null +++ b/Tapeti/IControllerFactory.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti +{ + public interface IControllerFactory + { + object CreateController(Type controllerType); + } +} diff --git a/Tapeti/IMessageHandlerRegistration.cs b/Tapeti/IMessageHandlerRegistration.cs new file mode 100644 index 0000000..e2ccd51 --- /dev/null +++ b/Tapeti/IMessageHandlerRegistration.cs @@ -0,0 +1,13 @@ +using System.Threading.Tasks; +using RabbitMQ.Client; + +namespace Tapeti +{ + public interface IMessageHandlerRegistration + { + void ApplyTopology(IModel channel); + + bool Accept(object message); + Task Visit(object message); + } +} diff --git a/Tapeti/IPublisher.cs b/Tapeti/IPublisher.cs new file mode 100644 index 0000000..c92496a --- /dev/null +++ b/Tapeti/IPublisher.cs @@ -0,0 +1,6 @@ +namespace Tapeti +{ + public interface IPublisher + { + } +} diff --git a/Tapeti/IRoutingKeyStrategy.cs b/Tapeti/IRoutingKeyStrategy.cs new file mode 100644 index 0000000..3bb3dfe --- /dev/null +++ b/Tapeti/IRoutingKeyStrategy.cs @@ -0,0 +1,6 @@ +namespace Tapeti +{ + public interface IRoutingKeyStrategy + { + } +} diff --git a/Tapeti/ISubscriber.cs b/Tapeti/ISubscriber.cs new file mode 100644 index 0000000..fae4328 --- /dev/null +++ b/Tapeti/ISubscriber.cs @@ -0,0 +1,6 @@ +namespace Tapeti +{ + public interface ISubscriber + { + } +} diff --git a/Tapeti/Properties/AssemblyInfo.cs b/Tapeti/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..f02ee9b --- /dev/null +++ b/Tapeti/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Tapeti")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Hewlett-Packard Company")] +[assembly: AssemblyProduct("Tapeti")] +[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("8ab4fd33-4aaa-465c-8579-9db3f3b23813")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Tapeti/Registration/AbstractControllerRegistration.cs b/Tapeti/Registration/AbstractControllerRegistration.cs new file mode 100644 index 0000000..b0f4725 --- /dev/null +++ b/Tapeti/Registration/AbstractControllerRegistration.cs @@ -0,0 +1,105 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Threading.Tasks; +using RabbitMQ.Client; +using Tapeti.Annotations; + +namespace Tapeti.Registration +{ + using MessageHandlerAction = Func; + + public abstract class AbstractControllerRegistration : IMessageHandlerRegistration + { + private readonly IControllerFactory controllerFactory; + private readonly Type controllerType; + private readonly Dictionary> messageHandlers = new Dictionary>(); + + + protected AbstractControllerRegistration(IControllerFactory controllerFactory, Type controllerType) + { + this.controllerFactory = controllerFactory; + this.controllerType = controllerType; + + // ReSharper disable once VirtualMemberCallInConstructor - I know. What do you think this is, C++? + GetMessageHandlers((type, handler) => + { + if (!messageHandlers.ContainsKey(type)) + messageHandlers.Add(type, new List { handler }); + else + messageHandlers[type].Add(handler); + }); + } + + + protected virtual void GetMessageHandlers(Action add) + { + foreach (var method in GetType().GetMembers() + .Where(m => m.MemberType == MemberTypes.Method && m.IsDefined(typeof(MessageHandlerAttribute), true)) + .Select(m => (MethodInfo)m)) + { + var parameters = method.GetParameters(); + + if (parameters.Length != 1 || !parameters[0].ParameterType.IsClass) + throw new ArgumentException($"Method {0} does not have a single object parameter", method.Name); + + var messageType = parameters[0].ParameterType; + + if (method.ReturnType == typeof(void)) + add(messageType, CreateSyncMessageHandler(method)); + else if (method.ReturnType == typeof(Task)) + add(messageType, CreateAsyncMessageHandler(method)); + else + throw new ArgumentException($"Method {0} needs to return void or a Task", method.Name); + } + } + + + protected IEnumerable GetMessageTypes() + { + return messageHandlers.Keys; + } + + + public abstract void ApplyTopology(IModel channel); + + + public bool Accept(object message) + { + return messageHandlers.ContainsKey(message.GetType()); + } + + + public Task Visit(object message) + { + var registeredHandlers = messageHandlers[message.GetType()]; + if (registeredHandlers != null) + return Task.WhenAll(registeredHandlers.Select(messageHandler => messageHandler(message))); + + return Task.CompletedTask; + } + + + protected virtual MessageHandlerAction CreateSyncMessageHandler(MethodInfo method) + { + return message => + { + var controller = controllerFactory.CreateController(controllerType); + method.Invoke(controller, new[] { message }); + + return Task.CompletedTask; + }; + } + + + protected virtual MessageHandlerAction CreateAsyncMessageHandler(MethodInfo method) + { + return message => + { + var controller = controllerFactory.CreateController(controllerType); + return (Task)method.Invoke(controller, new[] { message }); + }; + } + } +} diff --git a/Tapeti/Registration/ControllerDynamicQueueRegistration.cs b/Tapeti/Registration/ControllerDynamicQueueRegistration.cs new file mode 100644 index 0000000..a9aa5f1 --- /dev/null +++ b/Tapeti/Registration/ControllerDynamicQueueRegistration.cs @@ -0,0 +1,31 @@ +using System; +using RabbitMQ.Client; + +namespace Tapeti.Registration +{ + public class ControllerDynamicQueueRegistration : AbstractControllerRegistration + { + private readonly IRoutingKeyStrategy routingKeyStrategy; + + + public ControllerDynamicQueueRegistration(IControllerFactory controllerFactory, IRoutingKeyStrategy routingKeyStrategy, Type controllerType) + : base(controllerFactory, controllerType) + { + this.routingKeyStrategy = routingKeyStrategy; + } + + + public override void ApplyTopology(IModel channel) + { + var queue = channel.QueueDeclare(); + + foreach (var messageType in GetMessageTypes()) + { + //TODO use routing key attribute(s) for method or use strategy + //TODO use exchange attribute or default setting + + //channel.QueueBind(queue.QueueName, ); + } + } + } +} diff --git a/Tapeti/Registration/ControllerQueueRegistration.cs b/Tapeti/Registration/ControllerQueueRegistration.cs new file mode 100644 index 0000000..eb8d96c --- /dev/null +++ b/Tapeti/Registration/ControllerQueueRegistration.cs @@ -0,0 +1,21 @@ +using System; +using RabbitMQ.Client; + +namespace Tapeti.Registration +{ + public class ControllerQueueRegistration : AbstractControllerRegistration + { + private readonly string queueName; + + public ControllerQueueRegistration(IControllerFactory controllerFactory, IRoutingKeyStrategy routingKeyStrategy, Type controllerType, string queueName) : base(controllerFactory, controllerType) + { + this.queueName = queueName; + } + + + public override void ApplyTopology(IModel channel) + { + channel.QueueDeclarePassive(queueName); + } + } +} diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj new file mode 100644 index 0000000..aa5aedc --- /dev/null +++ b/Tapeti/Tapeti.csproj @@ -0,0 +1,77 @@ + + + + + Debug + AnyCPU + {8AB4FD33-4AAA-465C-8579-9DB3F3B23813} + Library + Properties + Tapeti + Tapeti + v4.6.1 + 512 + + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + packages\RabbitMQ.Client.4.1.1\lib\net451\RabbitMQ.Client.dll + True + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Tapeti/Tapeti.sln b/Tapeti/Tapeti.sln new file mode 100644 index 0000000..e895061 --- /dev/null +++ b/Tapeti/Tapeti.sln @@ -0,0 +1,22 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 14 +VisualStudioVersion = 14.0.25420.1 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti", "Tapeti.csproj", "{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs new file mode 100644 index 0000000..7e01be1 --- /dev/null +++ b/Tapeti/TapetiConnection.cs @@ -0,0 +1,138 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Threading.Tasks; +using Tapeti.Annotations; +using Tapeti.Connection; +using Tapeti.Default; +using Tapeti.Registration; + +namespace Tapeti +{ + public class TapetiConnection : IDisposable + { + public string HostName { get; set; } = "localhost"; + public int Port { get; set; } = 5672; + public string VirtualHost { get; set; } = "/"; + public string Username { get; set; } = "guest"; + public string Password { get; set; } = "guest"; + + public IControllerFactory ControllerFactory + { + get { return controllerFactory ?? (controllerFactory = new DefaultControllerFactory()); } + set { controllerFactory = value; } + } + + + public IRoutingKeyStrategy RoutingKeyStrategy + { + get { return routingKeyStrategy ?? (routingKeyStrategy = new DefaultRoutingKeyStrategy()); } + set { routingKeyStrategy = value; } + } + + + private IControllerFactory controllerFactory; + private IRoutingKeyStrategy routingKeyStrategy; + private List registrations; + private TapetiWorker worker; + + + + public TapetiConnection WithControllerFactory(IControllerFactory factory) + { + controllerFactory = factory; + return this; + } + + + public TapetiConnection RegisterController(Type type) + { + var queueAttribute = type.GetCustomAttribute(); + if (queueAttribute == null) + throw new ArgumentException("Queue attribute required on class", nameof(type)); + + if (queueAttribute.Dynamic) + { + if (!string.IsNullOrEmpty(queueAttribute.Name)) + throw new ArgumentException("Dynamic queue attributes must not have a Name"); + + GetRegistrations().Add(new ControllerDynamicQueueRegistration(controllerFactory, routingKeyStrategy, type)); + } + else + { + if (string.IsNullOrEmpty(queueAttribute.Name)) + throw new ArgumentException("Non-dynamic queue attribute must have a Name"); + + GetRegistrations().Add(new ControllerQueueRegistration(controllerFactory, routingKeyStrategy, type, queueAttribute.Name)); + } + + return this; + } + + + public TapetiConnection RegisterAllControllers(Assembly assembly) + { + foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute)))) + RegisterController(type); + + return this; + } + + + public TapetiConnection RegisterAllControllers() + { + return RegisterAllControllers(Assembly.GetCallingAssembly()); + } + + + public ISubscriber Subscribe() + { + if (registrations == null || registrations.Count == 0) + throw new ArgumentException("No controllers registered"); + + return new TapetiSubscriber(GetWorker(), registrations); + } + + + public IPublisher GetPublisher() + { + return new TapetiPublisher(GetWorker()); + } + + + public async Task Close() + { + if (worker != null) + { + await worker.Close(); + worker = null; + } + } + + + public void Dispose() + { + Close().Wait(); + } + + + protected List GetRegistrations() + { + return registrations ?? (registrations = new List()); + } + + + protected TapetiWorker GetWorker() + { + return worker ?? (worker = new TapetiWorker + { + HostName = HostName, + Port = Port, + VirtualHost = VirtualHost, + Username = Username, + Password = Password + }); + } + } +} diff --git a/Tapeti/packages.config b/Tapeti/packages.config new file mode 100644 index 0000000..0a2eac0 --- /dev/null +++ b/Tapeti/packages.config @@ -0,0 +1,4 @@ + + + + \ No newline at end of file