1
0
mirror of synced 2025-01-22 08:03:07 +01:00

Some initial ideas worked out, with a generous amount of todos

This commit is contained in:
Mark van Renswoude 2016-11-16 23:11:05 +01:00
parent a3b8b705ab
commit 97b654737a
21 changed files with 612 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
.vs/
bin/
obj/
packages/

View File

@ -0,0 +1,9 @@
using System;
namespace Tapeti.Annotations
{
[AttributeUsage(AttributeTargets.Method)]
public class MessageHandlerAttribute : Attribute
{
}
}

View File

@ -0,0 +1,11 @@
using System;
namespace Tapeti.Annotations
{
[AttributeUsage(AttributeTargets.Class)]
public class QueueAttribute : Attribute
{
public string Name { get; set; } = null;
public bool Dynamic { get; set; } = false;
}
}

View File

@ -0,0 +1,13 @@
namespace Tapeti.Connection
{
public class TapetiPublisher : IPublisher
{
private readonly TapetiWorker worker;
public TapetiPublisher(TapetiWorker worker)
{
this.worker = worker;
}
}
}

View File

@ -0,0 +1,25 @@
using System.Collections.Generic;
namespace Tapeti.Connection
{
public class TapetiSubscriber : ISubscriber
{
private readonly TapetiWorker worker;
public TapetiSubscriber(TapetiWorker worker, IEnumerable<IMessageHandlerRegistration> registrations)
{
this.worker = worker;
ApplyTopology(registrations);
}
private void ApplyTopology(IEnumerable<IMessageHandlerRegistration> registrations)
{
foreach (var registration in registrations)
registration.ApplyTopology(worker.GetChannel());
}
}
}

View File

@ -0,0 +1,58 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
namespace Tapeti.Connection
{
public class TapetiWorker
{
public string HostName { get; set; }
public int Port { get; set; }
public string VirtualHost { get; set; }
public string Username { get; set; }
public string Password { get; set; }
private IConnection connection;
private IModel channel;
public Task Close()
{
if (channel != null)
{
channel.Dispose();
channel = null;
}
if (connection != null)
{
connection.Dispose();
connection = null;
}
return Task.CompletedTask;
}
public IModel GetChannel()
{
if (channel != null)
return channel;
var connectionFactory = new ConnectionFactory
{
HostName = HostName,
Port = Port,
VirtualHost = VirtualHost,
UserName = Username,
Password = Password,
AutomaticRecoveryEnabled = true
};
connection = connectionFactory.CreateConnection();
channel = connection.CreateModel();
return channel;
}
}
}

View File

@ -0,0 +1,12 @@
using System;
namespace Tapeti.Default
{
public class DefaultControllerFactory : IControllerFactory
{
public object CreateController(Type controllerType)
{
throw new NotImplementedException();
}
}
}

View File

@ -0,0 +1,6 @@
namespace Tapeti.Default
{
public class DefaultRoutingKeyStrategy : IRoutingKeyStrategy
{
}
}

View File

@ -0,0 +1,9 @@
using System;
namespace Tapeti
{
public interface IControllerFactory
{
object CreateController(Type controllerType);
}
}

View File

@ -0,0 +1,13 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
namespace Tapeti
{
public interface IMessageHandlerRegistration
{
void ApplyTopology(IModel channel);
bool Accept(object message);
Task Visit(object message);
}
}

6
Tapeti/IPublisher.cs Normal file
View File

@ -0,0 +1,6 @@
namespace Tapeti
{
public interface IPublisher
{
}
}

View File

@ -0,0 +1,6 @@
namespace Tapeti
{
public interface IRoutingKeyStrategy
{
}
}

6
Tapeti/ISubscriber.cs Normal file
View File

@ -0,0 +1,6 @@
namespace Tapeti
{
public interface ISubscriber
{
}
}

View File

@ -0,0 +1,36 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Tapeti")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Hewlett-Packard Company")]
[assembly: AssemblyProduct("Tapeti")]
[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("8ab4fd33-4aaa-465c-8579-9db3f3b23813")]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

View File

@ -0,0 +1,105 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using RabbitMQ.Client;
using Tapeti.Annotations;
namespace Tapeti.Registration
{
using MessageHandlerAction = Func<object, Task>;
public abstract class AbstractControllerRegistration : IMessageHandlerRegistration
{
private readonly IControllerFactory controllerFactory;
private readonly Type controllerType;
private readonly Dictionary<Type, List<MessageHandlerAction>> messageHandlers = new Dictionary<Type, List<MessageHandlerAction>>();
protected AbstractControllerRegistration(IControllerFactory controllerFactory, Type controllerType)
{
this.controllerFactory = controllerFactory;
this.controllerType = controllerType;
// ReSharper disable once VirtualMemberCallInConstructor - I know. What do you think this is, C++?
GetMessageHandlers((type, handler) =>
{
if (!messageHandlers.ContainsKey(type))
messageHandlers.Add(type, new List<MessageHandlerAction> { handler });
else
messageHandlers[type].Add(handler);
});
}
protected virtual void GetMessageHandlers(Action<Type, MessageHandlerAction> add)
{
foreach (var method in GetType().GetMembers()
.Where(m => m.MemberType == MemberTypes.Method && m.IsDefined(typeof(MessageHandlerAttribute), true))
.Select(m => (MethodInfo)m))
{
var parameters = method.GetParameters();
if (parameters.Length != 1 || !parameters[0].ParameterType.IsClass)
throw new ArgumentException($"Method {0} does not have a single object parameter", method.Name);
var messageType = parameters[0].ParameterType;
if (method.ReturnType == typeof(void))
add(messageType, CreateSyncMessageHandler(method));
else if (method.ReturnType == typeof(Task))
add(messageType, CreateAsyncMessageHandler(method));
else
throw new ArgumentException($"Method {0} needs to return void or a Task", method.Name);
}
}
protected IEnumerable<Type> GetMessageTypes()
{
return messageHandlers.Keys;
}
public abstract void ApplyTopology(IModel channel);
public bool Accept(object message)
{
return messageHandlers.ContainsKey(message.GetType());
}
public Task Visit(object message)
{
var registeredHandlers = messageHandlers[message.GetType()];
if (registeredHandlers != null)
return Task.WhenAll(registeredHandlers.Select(messageHandler => messageHandler(message)));
return Task.CompletedTask;
}
protected virtual MessageHandlerAction CreateSyncMessageHandler(MethodInfo method)
{
return message =>
{
var controller = controllerFactory.CreateController(controllerType);
method.Invoke(controller, new[] { message });
return Task.CompletedTask;
};
}
protected virtual MessageHandlerAction CreateAsyncMessageHandler(MethodInfo method)
{
return message =>
{
var controller = controllerFactory.CreateController(controllerType);
return (Task)method.Invoke(controller, new[] { message });
};
}
}
}

View File

@ -0,0 +1,31 @@
using System;
using RabbitMQ.Client;
namespace Tapeti.Registration
{
public class ControllerDynamicQueueRegistration : AbstractControllerRegistration
{
private readonly IRoutingKeyStrategy routingKeyStrategy;
public ControllerDynamicQueueRegistration(IControllerFactory controllerFactory, IRoutingKeyStrategy routingKeyStrategy, Type controllerType)
: base(controllerFactory, controllerType)
{
this.routingKeyStrategy = routingKeyStrategy;
}
public override void ApplyTopology(IModel channel)
{
var queue = channel.QueueDeclare();
foreach (var messageType in GetMessageTypes())
{
//TODO use routing key attribute(s) for method or use strategy
//TODO use exchange attribute or default setting
//channel.QueueBind(queue.QueueName, );
}
}
}
}

View File

@ -0,0 +1,21 @@
using System;
using RabbitMQ.Client;
namespace Tapeti.Registration
{
public class ControllerQueueRegistration : AbstractControllerRegistration
{
private readonly string queueName;
public ControllerQueueRegistration(IControllerFactory controllerFactory, IRoutingKeyStrategy routingKeyStrategy, Type controllerType, string queueName) : base(controllerFactory, controllerType)
{
this.queueName = queueName;
}
public override void ApplyTopology(IModel channel)
{
channel.QueueDeclarePassive(queueName);
}
}
}

77
Tapeti/Tapeti.csproj Normal file
View File

@ -0,0 +1,77 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Tapeti</RootNamespace>
<AssemblyName>Tapeti</AssemblyName>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="RabbitMQ.Client, Version=4.0.0.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce, processorArchitecture=MSIL">
<HintPath>packages\RabbitMQ.Client.4.1.1\lib\net451\RabbitMQ.Client.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Annotations\MessageHandlerAttribute.cs" />
<Compile Include="Annotations\QueueAttribute.cs" />
<Compile Include="Connection\TapetiPublisher.cs" />
<Compile Include="Connection\TapetiSubscriber.cs" />
<Compile Include="Connection\TapetiWorker.cs" />
<Compile Include="Default\DefaultControllerFactory.cs" />
<Compile Include="Default\DefaultRoutingKeyStrategy.cs" />
<Compile Include="IControllerFactory.cs" />
<Compile Include="IPublisher.cs" />
<Compile Include="IRoutingKeyStrategy.cs" />
<Compile Include="IMessageHandlerRegistration.cs" />
<Compile Include="ISubscriber.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Registration\AbstractControllerRegistration.cs" />
<Compile Include="Registration\ControllerDynamicQueueRegistration.cs" />
<Compile Include="Registration\ControllerQueueRegistration.cs" />
<Compile Include="TapetiConnection.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

22
Tapeti/Tapeti.sln Normal file
View File

@ -0,0 +1,22 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 14
VisualStudioVersion = 14.0.25420.1
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti", "Tapeti.csproj", "{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal

138
Tapeti/TapetiConnection.cs Normal file
View File

@ -0,0 +1,138 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Tapeti.Annotations;
using Tapeti.Connection;
using Tapeti.Default;
using Tapeti.Registration;
namespace Tapeti
{
public class TapetiConnection : IDisposable
{
public string HostName { get; set; } = "localhost";
public int Port { get; set; } = 5672;
public string VirtualHost { get; set; } = "/";
public string Username { get; set; } = "guest";
public string Password { get; set; } = "guest";
public IControllerFactory ControllerFactory
{
get { return controllerFactory ?? (controllerFactory = new DefaultControllerFactory()); }
set { controllerFactory = value; }
}
public IRoutingKeyStrategy RoutingKeyStrategy
{
get { return routingKeyStrategy ?? (routingKeyStrategy = new DefaultRoutingKeyStrategy()); }
set { routingKeyStrategy = value; }
}
private IControllerFactory controllerFactory;
private IRoutingKeyStrategy routingKeyStrategy;
private List<IMessageHandlerRegistration> registrations;
private TapetiWorker worker;
public TapetiConnection WithControllerFactory(IControllerFactory factory)
{
controllerFactory = factory;
return this;
}
public TapetiConnection RegisterController(Type type)
{
var queueAttribute = type.GetCustomAttribute<QueueAttribute>();
if (queueAttribute == null)
throw new ArgumentException("Queue attribute required on class", nameof(type));
if (queueAttribute.Dynamic)
{
if (!string.IsNullOrEmpty(queueAttribute.Name))
throw new ArgumentException("Dynamic queue attributes must not have a Name");
GetRegistrations().Add(new ControllerDynamicQueueRegistration(controllerFactory, routingKeyStrategy, type));
}
else
{
if (string.IsNullOrEmpty(queueAttribute.Name))
throw new ArgumentException("Non-dynamic queue attribute must have a Name");
GetRegistrations().Add(new ControllerQueueRegistration(controllerFactory, routingKeyStrategy, type, queueAttribute.Name));
}
return this;
}
public TapetiConnection RegisterAllControllers(Assembly assembly)
{
foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute))))
RegisterController(type);
return this;
}
public TapetiConnection RegisterAllControllers()
{
return RegisterAllControllers(Assembly.GetCallingAssembly());
}
public ISubscriber Subscribe()
{
if (registrations == null || registrations.Count == 0)
throw new ArgumentException("No controllers registered");
return new TapetiSubscriber(GetWorker(), registrations);
}
public IPublisher GetPublisher()
{
return new TapetiPublisher(GetWorker());
}
public async Task Close()
{
if (worker != null)
{
await worker.Close();
worker = null;
}
}
public void Dispose()
{
Close().Wait();
}
protected List<IMessageHandlerRegistration> GetRegistrations()
{
return registrations ?? (registrations = new List<IMessageHandlerRegistration>());
}
protected TapetiWorker GetWorker()
{
return worker ?? (worker = new TapetiWorker
{
HostName = HostName,
Port = Port,
VirtualHost = VirtualHost,
Username = Username,
Password = Password
});
}
}
}

4
Tapeti/packages.config Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="RabbitMQ.Client" version="4.1.1" targetFramework="net461" />
</packages>