1
0
mirror of synced 2024-11-24 01:33:50 +00:00

Implemented consumer, serializer, routing key strategy and single-threaded task queue (first working version basically)

This commit is contained in:
Mark van Renswoude 2016-11-20 14:34:50 +01:00
parent 77de28d8b8
commit 646ba22e63
36 changed files with 1103 additions and 137 deletions

View File

@ -0,0 +1,15 @@
using System;
namespace Tapeti.Annotations
{
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
public class ExchangeAttribute : Attribute
{
public string Name { get; set; }
public ExchangeAttribute(string name)
{
Name = name;
}
}
}

View File

@ -1,9 +0,0 @@
using System;
namespace Tapeti.Annotations
{
[AttributeUsage(AttributeTargets.Method)]
public class MessageHandlerAttribute : Attribute
{
}
}

View File

@ -5,7 +5,14 @@ namespace Tapeti.Annotations
[AttributeUsage(AttributeTargets.Class)]
public class QueueAttribute : Attribute
{
public string Name { get; set; } = null;
public bool Dynamic { get; set; } = false;
public string Name { get; set; }
public bool Dynamic { get; set; }
public QueueAttribute(string name = null)
{
Name = name;
Dynamic = (name == null);
}
}
}

View File

@ -0,0 +1,45 @@
using System;
using System.Diagnostics.Eventing.Reader;
using RabbitMQ.Client;
namespace Tapeti.Connection
{
public class TapetiConsumer : DefaultBasicConsumer
{
private readonly TapetiWorker worker;
private readonly IMessageSerializer messageSerializer;
private readonly IQueueRegistration queueRegistration;
public TapetiConsumer(TapetiWorker worker, IMessageSerializer messageSerializer, IQueueRegistration queueRegistration)
{
this.worker = worker;
this.messageSerializer = messageSerializer;
this.queueRegistration = queueRegistration;
}
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IBasicProperties properties, byte[] body)
{
try
{
var message = messageSerializer.Deserialize(body, properties);
if (message == null)
throw new ArgumentException("Empty message");
if (queueRegistration.Accept(message))
queueRegistration.Visit(message);
else
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
worker.Respond(deliveryTag, ConsumeResponse.Ack);
}
catch (Exception)
{
//TODO pluggable exception handling
worker.Respond(deliveryTag, ConsumeResponse.Nack);
}
}
}
}

View File

@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Tapeti.Connection
@ -8,18 +9,15 @@ namespace Tapeti.Connection
private readonly TapetiWorker worker;
public TapetiSubscriber(TapetiWorker worker, IEnumerable<IMessageHandlerRegistration> registrations)
public TapetiSubscriber(TapetiWorker worker)
{
this.worker = worker;
ApplyTopology(registrations);
}
private void ApplyTopology(IEnumerable<IMessageHandlerRegistration> registrations)
public async Task BindQueues(IEnumerable<IQueueRegistration> registrations)
{
foreach (var registration in registrations)
worker.ApplyTopology(registration);
await Task.WhenAll(registrations.Select(registration => worker.Subscribe(registration)).ToList());
}
}
}

View File

@ -1,6 +1,9 @@
using System;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing;
using Tapeti.Tasks;
namespace Tapeti.Connection
{
@ -11,29 +14,86 @@ namespace Tapeti.Connection
public string VirtualHost { get; set; }
public string Username { get; set; }
public string Password { get; set; }
public string PublishExchange { get; set; }
private readonly IMessageSerializer messageSerializer;
private readonly IRoutingKeyStrategy routingKeyStrategy;
private readonly Lazy<SingleThreadTaskQueue> taskQueue = new Lazy<SingleThreadTaskQueue>();
private IConnection connection;
private IModel channel;
private readonly Lazy<TaskQueue> taskQueue = new Lazy<TaskQueue>();
public TapetiWorker(IMessageSerializer messageSerializer, IRoutingKeyStrategy routingKeyStrategy)
{
this.messageSerializer = messageSerializer;
this.routingKeyStrategy = routingKeyStrategy;
}
public Task Publish(object message)
{
return taskQueue.Value.Add(() =>
return taskQueue.Value.Add(async () =>
{
//GetChannel().BasicPublish();
});
var properties = new BasicProperties();
var body = messageSerializer.Serialize(message, properties);
(await GetChannel())
.BasicPublish(PublishExchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false,
properties, body);
}).Unwrap();
}
public void ApplyTopology(IMessageHandlerRegistration registration)
public Task Subscribe(string queueName, IQueueRegistration queueRegistration)
{
registration.ApplyTopology(GetChannel());
return taskQueue.Value.Add(async () =>
{
(await GetChannel())
.BasicConsume(queueName, false, new TapetiConsumer(this, messageSerializer, queueRegistration));
}).Unwrap();
}
public async Task Subscribe(IQueueRegistration registration)
{
var queueName = await taskQueue.Value.Add(async () =>
registration.BindQueue(await GetChannel()))
.Unwrap();
await Subscribe(queueName, registration);
}
public Task Respond(ulong deliveryTag, ConsumeResponse response)
{
return taskQueue.Value.Add(async () =>
{
switch (response)
{
case ConsumeResponse.Ack:
(await GetChannel()).BasicAck(deliveryTag, false);
break;
case ConsumeResponse.Nack:
(await GetChannel()).BasicNack(deliveryTag, false, false);
break;
case ConsumeResponse.Requeue:
(await GetChannel()).BasicNack(deliveryTag, false, true);
break;
}
}).Unwrap();
}
public Task Close()
{
if (!taskQueue.IsValueCreated)
return Task.CompletedTask;
return taskQueue.Value.Add(() =>
{
if (channel != null)
{
@ -48,11 +108,16 @@ namespace Tapeti.Connection
connection = null;
}
return Task.CompletedTask;
taskQueue.Value.Dispose();
});
}
private IModel GetChannel()
/// <remarks>
/// Only call this from a task in the taskQueue to ensure IModel is only used
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
/// </remarks>
private async Task<IModel> GetChannel()
{
if (channel != null)
return channel;
@ -64,19 +129,26 @@ namespace Tapeti.Connection
VirtualHost = VirtualHost,
UserName = Username,
Password = Password,
AutomaticRecoveryEnabled = true
AutomaticRecoveryEnabled = true,
RequestedHeartbeat = 30
};
while (true)
{
try
{
connection = connectionFactory.CreateConnection();
channel = connection.CreateModel();
break;
}
catch (BrokerUnreachableException)
{
await Task.Delay(5000);
}
}
return channel;
}
private class ScheduledWorkItem
{
}
}
}

View File

@ -1,24 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Tapeti.Connection
{
public class TaskQueue
{
private readonly object previousTaskLock = new object();
private Task previousTask = Task.CompletedTask;
public Task Add(Action action)
{
lock (previousTaskLock)
{
previousTask = previousTask.ContinueWith(t => action(), CancellationToken.None
, TaskContinuationOptions.None
, TaskScheduler.Default);
return previousTask;
}
}
}
}

View File

@ -1,12 +1,62 @@
using System;
using System.Collections.Generic;
using System.Reflection;
namespace Tapeti.Default
{
public class DefaultControllerFactory : IControllerFactory
{
private readonly Dictionary<Type, Func<object>> controllerConstructors = new Dictionary<Type, Func<object>>();
private readonly Func<IPublisher> publisherFactory;
public DefaultControllerFactory(Func<IPublisher> publisherFactory)
{
this.publisherFactory = publisherFactory;
}
public object CreateController(Type controllerType)
{
throw new NotImplementedException();
Func<object> constructor;
if (!controllerConstructors.TryGetValue(controllerType, out constructor))
throw new ArgumentException($"Can not create unregistered controller {controllerType.FullName}");
return constructor();
}
public void RegisterController(Type type)
{
controllerConstructors.Add(type, GetConstructor(type));
}
protected Func<object> GetConstructor(Type type)
{
var constructors = type.GetConstructors();
ConstructorInfo publisherConstructor = null;
ConstructorInfo emptyConstructor = null;
foreach (var constructor in constructors)
{
var parameters = constructor.GetParameters();
if (parameters.Length > 0)
{
if (parameters.Length == 1 && parameters[0].ParameterType == typeof(IPublisher))
publisherConstructor = constructor;
}
else
emptyConstructor = constructor;
}
if (publisherConstructor != null)
return () => publisherConstructor.Invoke(new object[] { publisherFactory() });
if (emptyConstructor != null)
return () => emptyConstructor.Invoke(null);
throw new ArgumentException($"Unable to construct type {type.Name}, a parameterless constructor or one with only an IPublisher parameter is required");
}
}
}

View File

@ -0,0 +1,46 @@
using System;
namespace Tapeti.Default
{
/**
* !! IoC Container 9000 !!
*
* ...you probably want to replace this one as soon as possible.
*
* A Simple Injector implementation is provided in the Tapeti.SimpleInjector package.
*/
public class DefaultDependencyResolver : IDependencyInjector
{
private readonly Lazy<DefaultControllerFactory> controllerFactory;
private readonly Lazy<DefaultRoutingKeyStrategy> routingKeyStrategy = new Lazy<DefaultRoutingKeyStrategy>();
private readonly Lazy<DefaultMessageSerializer> messageSerializer = new Lazy<DefaultMessageSerializer>();
public DefaultDependencyResolver(Func<IPublisher> publisherFactory)
{
controllerFactory = new Lazy<DefaultControllerFactory>(() => new DefaultControllerFactory(publisherFactory));
}
public T Resolve<T>() where T : class
{
if (typeof(T) == typeof(IControllerFactory))
return (T)(controllerFactory.Value as IControllerFactory);
if (typeof(T) == typeof(IRoutingKeyStrategy))
return (T)(routingKeyStrategy.Value as IRoutingKeyStrategy);
if (typeof(T) == typeof(IMessageSerializer))
return (T)(messageSerializer.Value as IMessageSerializer);
return default(T);
}
public void RegisterController(Type type)
{
controllerFactory.Value.RegisterController(type);
}
}
}

View File

@ -0,0 +1,84 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using RabbitMQ.Client;
namespace Tapeti.Default
{
public class DefaultMessageSerializer : IMessageSerializer
{
protected const string ContentType = "application/json";
protected const string ClassTypeHeader = "classType";
private readonly ConcurrentDictionary<string, Type> deserializedTypeNames = new ConcurrentDictionary<string, Type>();
private readonly ConcurrentDictionary<Type, string> serializedTypeNames = new ConcurrentDictionary<Type, string>();
private readonly JsonSerializerSettings serializerSettings;
public DefaultMessageSerializer()
{
serializerSettings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
};
serializerSettings.Converters.Add(new StringEnumConverter());
}
public byte[] Serialize(object message, IBasicProperties properties)
{
if (properties.Headers == null)
properties.Headers = new Dictionary<string, object>();
var typeName = serializedTypeNames.GetOrAdd(message.GetType(), SerializeTypeName);
properties.Headers.Add(ClassTypeHeader, Encoding.UTF8.GetBytes(typeName));
properties.ContentType = ContentType;
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message, serializerSettings));
}
public object Deserialize(byte[] body, IBasicProperties properties)
{
object typeName;
if (!properties.ContentType.Equals(ContentType))
throw new ArgumentException("content_type must be {ContentType}");
if (properties.Headers == null || !properties.Headers.TryGetValue(ClassTypeHeader, out typeName))
throw new ArgumentException($"{ClassTypeHeader} header not present");
var messageType = deserializedTypeNames.GetOrAdd(Encoding.UTF8.GetString((byte[])typeName), DeserializeTypeName);
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body), messageType);
}
public virtual Type DeserializeTypeName(string typeName)
{
var parts = typeName.Split(':');
if (parts.Length != 2)
throw new ArgumentException($"Invalid type name {typeName}");
var type = Type.GetType(parts[0] + "," + parts[1]);
if (type == null)
throw new ArgumentException($"Unable to resolve type {typeName}");
return type;
}
public virtual string SerializeTypeName(Type type)
{
var typeName = type.FullName + ":" + type.Assembly.GetName().Name;
if (typeName.Length > 255)
throw new ArgumentException($"Type name {typeName} exceeds AMQP 255 character limit");
return typeName;
}
}
}

View File

@ -1,6 +1,59 @@
namespace Tapeti.Default
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
namespace Tapeti.Default
{
public class DefaultRoutingKeyStrategy : IRoutingKeyStrategy
{
private readonly ConcurrentDictionary<Type, string> routingKeyCache = new ConcurrentDictionary<Type, string>();
public string GetRoutingKey(Type messageType)
{
return routingKeyCache.GetOrAdd(messageType, BuildRoutingKey);
}
protected virtual string BuildRoutingKey(Type messageType)
{
// Split PascalCase into dot-separated parts. If the class name ends in "Message" leave that out.
var words = SplitUpperCase(messageType.Name);
if (words.Count > 1 && words.Last().Equals("Message", StringComparison.InvariantCulture))
words.RemoveAt(words.Count - 1);
return string.Join(".", words.Select(s => s.ToLower()));
}
protected static List<string> SplitUpperCase(string source)
{
var words = new List<string>();
if (string.IsNullOrEmpty(source))
return words;
var wordStartIndex = 0;
var letters = source.ToCharArray();
var previousChar = char.MinValue;
// Intentionally skip the first character
for (var charIndex = 1; charIndex < letters.Length; charIndex++)
{
if (char.IsUpper(letters[charIndex]) && !char.IsWhiteSpace(previousChar))
{
words.Add(new string(letters, wordStartIndex, charIndex - wordStartIndex));
wordStartIndex = charIndex;
}
previousChar = letters[charIndex];
}
words.Add(new string(letters, wordStartIndex, letters.Length - wordStartIndex));
return words;
}
}
}

15
IDependencyResolver.cs Normal file
View File

@ -0,0 +1,15 @@
using System;
namespace Tapeti
{
public interface IDependencyResolver
{
T Resolve<T>() where T : class;
}
public interface IDependencyInjector : IDependencyResolver
{
void RegisterController(Type type);
}
}

10
IMessageSerializer.cs Normal file
View File

@ -0,0 +1,10 @@
using RabbitMQ.Client;
namespace Tapeti
{
public interface IMessageSerializer
{
byte[] Serialize(object message, IBasicProperties properties);
object Deserialize(byte[] body, IBasicProperties properties);
}
}

View File

@ -3,9 +3,9 @@ using RabbitMQ.Client;
namespace Tapeti
{
public interface IMessageHandlerRegistration
public interface IQueueRegistration
{
void ApplyTopology(IModel channel);
string BindQueue(IModel channel);
bool Accept(object message);
Task Visit(object message);

View File

@ -1,6 +1,9 @@
namespace Tapeti
using System;
namespace Tapeti
{
public interface IRoutingKeyStrategy
{
string GetRoutingKey(Type messageType);
}
}

View File

@ -1,4 +1,6 @@
namespace Tapeti
using System;
namespace Tapeti
{
public interface ISubscriber
{

View File

@ -10,49 +10,74 @@ namespace Tapeti.Registration
{
using MessageHandlerAction = Func<object, Task>;
public abstract class AbstractControllerRegistration : IMessageHandlerRegistration
public struct MessageHandler
{
private readonly IControllerFactory controllerFactory;
public MessageHandlerAction Action;
public string Exchange;
public string RoutingKey;
}
public abstract class AbstractControllerRegistration : IQueueRegistration
{
private readonly Func<IControllerFactory> controllerFactoryFactory;
private readonly Type controllerType;
private readonly Dictionary<Type, List<MessageHandlerAction>> messageHandlers = new Dictionary<Type, List<MessageHandlerAction>>();
private readonly string defaultExchange;
private readonly Dictionary<Type, List<MessageHandler>> messageHandlers = new Dictionary<Type, List<MessageHandler>>();
protected AbstractControllerRegistration(IControllerFactory controllerFactory, Type controllerType)
protected AbstractControllerRegistration(Func<IControllerFactory> controllerFactoryFactory, Type controllerType, string defaultExchange)
{
this.controllerFactory = controllerFactory;
this.controllerFactoryFactory = controllerFactoryFactory;
this.controllerType = controllerType;
this.defaultExchange = defaultExchange;
// ReSharper disable once VirtualMemberCallInConstructor - I know. What do you think this is, C++?
GetMessageHandlers((type, handler) =>
GetMessageHandlers(controllerType, (type, handler) =>
{
if (!messageHandlers.ContainsKey(type))
messageHandlers.Add(type, new List<MessageHandlerAction> { handler });
messageHandlers.Add(type, new List<MessageHandler> { handler });
else
messageHandlers[type].Add(handler);
});
}
protected virtual void GetMessageHandlers(Action<Type, MessageHandlerAction> add)
protected virtual void GetMessageHandlers(Type type, Action<Type, MessageHandler> add)
{
foreach (var method in GetType().GetMembers()
.Where(m => m.MemberType == MemberTypes.Method && m.IsDefined(typeof(MessageHandlerAttribute), true))
foreach (var method in type.GetMembers(BindingFlags.Public | BindingFlags.Instance)
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object))
.Select(m => (MethodInfo)m))
{
Type messageType;
var messageHandler = GetMessageHandler(method, out messageType);
add(messageType, messageHandler);
}
}
protected virtual MessageHandler GetMessageHandler(MethodInfo method, out Type messageType)
{
var parameters = method.GetParameters();
if (parameters.Length != 1 || !parameters[0].ParameterType.IsClass)
throw new ArgumentException($"Method {0} does not have a single object parameter", method.Name);
throw new ArgumentException($"Method {method.Name} does not have a single object parameter");
var messageType = parameters[0].ParameterType;
messageType = parameters[0].ParameterType;
var messageHandler = new MessageHandler();
if (method.ReturnType == typeof(void))
add(messageType, CreateSyncMessageHandler(method));
messageHandler.Action = CreateSyncMessageHandler(method);
else if (method.ReturnType == typeof(Task))
add(messageType, CreateAsyncMessageHandler(method));
messageHandler.Action = CreateAsyncMessageHandler(method);
else
throw new ArgumentException($"Method {0} needs to return void or a Task", method.Name);
}
throw new ArgumentException($"Method {method.Name} needs to return void or a Task");
var exchangeAttribute = method.GetCustomAttribute<ExchangeAttribute>() ?? method.DeclaringType.GetCustomAttribute<ExchangeAttribute>();
messageHandler.Exchange = exchangeAttribute?.Name;
return messageHandler;
}
@ -62,7 +87,19 @@ namespace Tapeti.Registration
}
public abstract void ApplyTopology(IModel channel);
protected IEnumerable<string> GetMessageExchanges(Type type)
{
var exchanges = messageHandlers[type]
.Where(h => h.Exchange != null)
.Select(h => h.Exchange)
.Distinct(StringComparer.InvariantCulture)
.ToArray();
return exchanges.Length > 0 ? exchanges : new[] { defaultExchange };
}
public abstract string BindQueue(IModel channel);
public bool Accept(object message)
@ -75,7 +112,7 @@ namespace Tapeti.Registration
{
var registeredHandlers = messageHandlers[message.GetType()];
if (registeredHandlers != null)
return Task.WhenAll(registeredHandlers.Select(messageHandler => messageHandler(message)));
return Task.WhenAll(registeredHandlers.Select(messageHandler => messageHandler.Action(message)));
return Task.CompletedTask;
}
@ -85,7 +122,7 @@ namespace Tapeti.Registration
{
return message =>
{
var controller = controllerFactory.CreateController(controllerType);
var controller = controllerFactoryFactory().CreateController(controllerType);
method.Invoke(controller, new[] { message });
return Task.CompletedTask;
@ -97,7 +134,7 @@ namespace Tapeti.Registration
{
return message =>
{
var controller = controllerFactory.CreateController(controllerType);
var controller = controllerFactoryFactory().CreateController(controllerType);
return (Task)method.Invoke(controller, new[] { message });
};
}

View File

@ -5,27 +5,29 @@ namespace Tapeti.Registration
{
public class ControllerDynamicQueueRegistration : AbstractControllerRegistration
{
private readonly IRoutingKeyStrategy routingKeyStrategy;
private readonly Func<IRoutingKeyStrategy> routingKeyStrategyFactory;
public ControllerDynamicQueueRegistration(IControllerFactory controllerFactory, IRoutingKeyStrategy routingKeyStrategy, Type controllerType)
: base(controllerFactory, controllerType)
public ControllerDynamicQueueRegistration(Func<IControllerFactory> controllerFactoryFactory, Func<IRoutingKeyStrategy> routingKeyStrategyFactory, Type controllerType, string defaultExchange)
: base(controllerFactoryFactory, controllerType, defaultExchange)
{
this.routingKeyStrategy = routingKeyStrategy;
this.routingKeyStrategyFactory = routingKeyStrategyFactory;
}
public override void ApplyTopology(IModel channel)
public override string BindQueue(IModel channel)
{
var queue = channel.QueueDeclare();
foreach (var messageType in GetMessageTypes())
{
//TODO use routing key attribute(s) for method or use strategy
//TODO use exchange attribute or default setting
var routingKey = routingKeyStrategyFactory().GetRoutingKey(messageType);
//channel.QueueBind(queue.QueueName, );
}
foreach (var exchange in GetMessageExchanges(messageType))
channel.QueueBind(queue.QueueName, exchange, routingKey);
}
return queue.QueueName;
}
}
}

View File

@ -7,15 +7,15 @@ namespace Tapeti.Registration
{
private readonly string queueName;
public ControllerQueueRegistration(IControllerFactory controllerFactory, IRoutingKeyStrategy routingKeyStrategy, Type controllerType, string queueName) : base(controllerFactory, controllerType)
public ControllerQueueRegistration(Func<IControllerFactory> controllerFactoryFactory, Type controllerType, string defaultExchange, string queueName) : base(controllerFactoryFactory, controllerType, defaultExchange)
{
this.queueName = queueName;
}
public override void ApplyTopology(IModel channel)
public override string BindQueue(IModel channel)
{
channel.QueueDeclarePassive(queueName);
return channel.QueueDeclarePassive(queueName).QueueName;
}
}
}

View File

@ -0,0 +1,36 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Tapeti.SimpleInjector")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Hewlett-Packard Company")]
[assembly: AssemblyProduct("Tapeti.SimpleInjector")]
[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("d7ec6f86-eb3b-49c3-8fe7-6e8c1bb413a6")]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

View File

@ -0,0 +1,22 @@
using System;
using SimpleInjector;
namespace Tapeti.SimpleInjector
{
public class SimpleInjectorControllerFactory : IControllerFactory
{
private readonly Container container;
public SimpleInjectorControllerFactory(Container container)
{
this.container = container;
}
public object CreateController(Type controllerType)
{
return container.GetInstance(controllerType);
}
}
}

View File

@ -0,0 +1,56 @@
using System.Linq;
using System.Reflection;
using SimpleInjector;
using Tapeti.Annotations;
using Tapeti.Default;
using System.Collections.Generic;
namespace Tapeti.SimpleInjector
{
public class SimpleInjectorDependencyResolver : IDependencyResolver
{
private readonly Container container;
public SimpleInjectorDependencyResolver(Container container, bool registerDefaults = true)
{
this.container = container;
if (registerDefaults)
RegisterDefaults();
}
public T Resolve<T>() where T : class
{
return container.GetInstance<T>();
}
public SimpleInjectorDependencyResolver RegisterDefaults()
{
var currentRegistrations = container.GetCurrentRegistrations();
IfUnregistered<IControllerFactory, SimpleInjectorControllerFactory>(currentRegistrations);
IfUnregistered<IMessageSerializer, DefaultMessageSerializer>(currentRegistrations);
IfUnregistered<IRoutingKeyStrategy, DefaultRoutingKeyStrategy>(currentRegistrations);
return this;
}
public SimpleInjectorDependencyResolver RegisterAllControllers(Assembly assembly)
{
foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute))))
container.Register(type);
return this;
}
private void IfUnregistered<TService, TImplementation>(IEnumerable<InstanceProducer> currentRegistrations) where TService : class where TImplementation: class, TService
{
// ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates
if (!currentRegistrations.Any(ip => ip.ServiceType == typeof(TService)))
container.Register<TService, TImplementation>();
}
}
}

View File

@ -0,0 +1,69 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Tapeti.SimpleInjector</RootNamespace>
<AssemblyName>Tapeti.SimpleInjector</AssemblyName>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="SimpleInjector, Version=3.2.7.0, Culture=neutral, PublicKeyToken=984cb50dea722e99, processorArchitecture=MSIL">
<HintPath>..\packages\SimpleInjector.3.2.7\lib\net45\SimpleInjector.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="SimpleInjectorControllerFactory.cs" />
<Compile Include="SimpleInjectorDependencyResolver.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti.csproj">
<Project>{8ab4fd33-4aaa-465c-8579-9db3f3b23813}</Project>
<Name>Tapeti</Name>
</ProjectReference>
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="SimpleInjector" version="3.2.7" targetFramework="net452" />
</packages>

View File

@ -31,6 +31,10 @@
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="Newtonsoft.Json, Version=9.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="RabbitMQ.Client, Version=4.0.0.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce, processorArchitecture=MSIL">
<HintPath>packages\RabbitMQ.Client.4.1.1\lib\net451\RabbitMQ.Client.dll</HintPath>
<Private>True</Private>
@ -45,18 +49,24 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Annotations\MessageHandlerAttribute.cs" />
<Compile Include="Annotations\ExchangeAttribute.cs" />
<Compile Include="Annotations\QueueAttribute.cs" />
<Compile Include="Connection\TapetiConsumer.cs" />
<Compile Include="Connection\TapetiPublisher.cs" />
<Compile Include="Connection\TapetiSubscriber.cs" />
<Compile Include="Connection\TapetiWorker.cs" />
<Compile Include="Connection\TaskQueue.cs" />
<Compile Include="TapetiTypes.cs" />
<Compile Include="Tasks\SingleThreadTaskQueue.cs" />
<Compile Include="Default\DefaultControllerFactory.cs" />
<Compile Include="Default\DefaultDependencyResolver.cs" />
<Compile Include="Default\DefaultMessageSerializer.cs" />
<Compile Include="Default\DefaultRoutingKeyStrategy.cs" />
<Compile Include="IControllerFactory.cs" />
<Compile Include="IDependencyResolver.cs" />
<Compile Include="IMessageSerializer.cs" />
<Compile Include="IPublisher.cs" />
<Compile Include="IRoutingKeyStrategy.cs" />
<Compile Include="IMessageHandlerRegistration.cs" />
<Compile Include="IQueueRegistration.cs" />
<Compile Include="ISubscriber.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Registration\AbstractControllerRegistration.cs" />

View File

@ -5,6 +5,10 @@ VisualStudioVersion = 14.0.25420.1
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti", "Tapeti.csproj", "{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.SimpleInjector", "Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj", "{D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Test", "Test\Test.csproj", "{90559950-1B32-4119-A78E-517E2C71EE23}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -15,6 +19,14 @@ Global
{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8AB4FD33-4AAA-465C-8579-9DB3F3B23813}.Release|Any CPU.Build.0 = Release|Any CPU
{D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D7EC6F86-EB3B-49C3-8FE7-6E8C1BB413A6}.Release|Any CPU.Build.0 = Release|Any CPU
{90559950-1B32-4119-A78E-517E2C71EE23}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{90559950-1B32-4119-A78E-517E2C71EE23}.Debug|Any CPU.Build.0 = Debug|Any CPU
{90559950-1B32-4119-A78E-517E2C71EE23}.Release|Any CPU.ActiveCfg = Release|Any CPU
{90559950-1B32-4119-A78E-517E2C71EE23}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

View File

@ -17,31 +17,26 @@ namespace Tapeti
public string VirtualHost { get; set; } = "/";
public string Username { get; set; } = "guest";
public string Password { get; set; } = "guest";
public string PublishExchange { get; set; } = "";
public string SubscribeExchange { get; set; } = "";
public IControllerFactory ControllerFactory
public IDependencyResolver DependencyResolver
{
get { return controllerFactory ?? (controllerFactory = new DefaultControllerFactory()); }
set { controllerFactory = value; }
get { return dependencyResolver ?? (dependencyResolver = new DefaultDependencyResolver(GetPublisher)); }
set { dependencyResolver = value; }
}
public IRoutingKeyStrategy RoutingKeyStrategy
{
get { return routingKeyStrategy ?? (routingKeyStrategy = new DefaultRoutingKeyStrategy()); }
set { routingKeyStrategy = value; }
}
private IControllerFactory controllerFactory;
private IRoutingKeyStrategy routingKeyStrategy;
private List<IMessageHandlerRegistration> registrations;
private IDependencyResolver dependencyResolver;
private List<IQueueRegistration> registrations;
private TapetiWorker worker;
public TapetiConnection WithControllerFactory(IControllerFactory factory)
public TapetiConnection WithDependencyResolver(IDependencyResolver resolver)
{
controllerFactory = factory;
dependencyResolver = resolver;
return this;
}
@ -57,16 +52,22 @@ namespace Tapeti
if (!string.IsNullOrEmpty(queueAttribute.Name))
throw new ArgumentException("Dynamic queue attributes must not have a Name");
GetRegistrations().Add(new ControllerDynamicQueueRegistration(controllerFactory, routingKeyStrategy, type));
GetRegistrations().Add(new ControllerDynamicQueueRegistration(
DependencyResolver.Resolve<IControllerFactory>,
DependencyResolver.Resolve<IRoutingKeyStrategy>,
type, SubscribeExchange));
}
else
{
if (string.IsNullOrEmpty(queueAttribute.Name))
throw new ArgumentException("Non-dynamic queue attribute must have a Name");
GetRegistrations().Add(new ControllerQueueRegistration(controllerFactory, routingKeyStrategy, type, queueAttribute.Name));
GetRegistrations().Add(new ControllerQueueRegistration(
DependencyResolver.Resolve<IControllerFactory>,
type, SubscribeExchange, queueAttribute.Name));
}
(DependencyResolver as IDependencyInjector)?.RegisterController(type);
return this;
}
@ -86,12 +87,15 @@ namespace Tapeti
}
public ISubscriber Subscribe()
public async Task<ISubscriber> Subscribe()
{
if (registrations == null || registrations.Count == 0)
throw new ArgumentException("No controllers registered");
return new TapetiSubscriber(GetWorker(), registrations);
var subscriber = new TapetiSubscriber(GetWorker());
await subscriber.BindQueues(registrations);
return subscriber;
}
@ -117,21 +121,24 @@ namespace Tapeti
}
protected List<IMessageHandlerRegistration> GetRegistrations()
protected List<IQueueRegistration> GetRegistrations()
{
return registrations ?? (registrations = new List<IMessageHandlerRegistration>());
return registrations ?? (registrations = new List<IQueueRegistration>());
}
protected TapetiWorker GetWorker()
{
return worker ?? (worker = new TapetiWorker
return worker ?? (worker = new TapetiWorker(
DependencyResolver.Resolve<IMessageSerializer>(),
DependencyResolver.Resolve<IRoutingKeyStrategy>())
{
HostName = HostName,
Port = Port,
VirtualHost = VirtualHost,
Username = Username,
Password = Password
Password = Password,
PublishExchange = PublishExchange
});
}
}

9
TapetiTypes.cs Normal file
View File

@ -0,0 +1,9 @@
namespace Tapeti
{
public enum ConsumeResponse
{
Ack,
Nack,
Requeue
}
}

View File

@ -0,0 +1,129 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Tapeti.Tasks
{
public class SingleThreadTaskQueue : IDisposable
{
private readonly object previousTaskLock = new object();
private Task previousTask = Task.CompletedTask;
private readonly Lazy<SingleThreadTaskScheduler> singleThreadScheduler = new Lazy<SingleThreadTaskScheduler>();
public Task Add(Action action)
{
lock (previousTaskLock)
{
previousTask = previousTask.ContinueWith(t => action(), CancellationToken.None
, TaskContinuationOptions.None
, singleThreadScheduler.Value);
return previousTask;
}
}
public Task<T> Add<T>(Func<T> func)
{
lock (previousTaskLock)
{
var task = previousTask.ContinueWith(t => func(), CancellationToken.None
, TaskContinuationOptions.None
, singleThreadScheduler.Value);
previousTask = task;
return task;
}
}
public void Dispose()
{
if (singleThreadScheduler.IsValueCreated)
singleThreadScheduler.Value.Dispose();
}
}
public class SingleThreadTaskScheduler : TaskScheduler, IDisposable
{
public override int MaximumConcurrencyLevel => 1;
private readonly Queue<Task> scheduledTasks = new Queue<Task>();
private bool disposed;
public SingleThreadTaskScheduler()
{
// ReSharper disable once ObjectCreationAsStatement - fire and forget!
new Thread(WorkerThread).Start();
}
public void Dispose()
{
lock (scheduledTasks)
{
disposed = true;
Monitor.PulseAll(scheduledTasks);
}
}
protected override void QueueTask(Task task)
{
if (disposed) return;
lock (scheduledTasks)
{
scheduledTasks.Enqueue(task);
Monitor.Pulse(scheduledTasks);
}
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
protected override IEnumerable<Task> GetScheduledTasks()
{
lock (scheduledTasks)
{
return scheduledTasks.ToList();
}
}
private void WorkerThread()
{
while(true)
{
Task task;
lock (scheduledTasks)
{
task = WaitAndDequeueTask();
}
if (task == null)
break;
TryExecuteTask(task);
}
}
private Task WaitAndDequeueTask()
{
while (!scheduledTasks.Any() && !disposed)
Monitor.Wait(scheduledTasks);
return disposed ? null : scheduledTasks.Dequeue();
}
}
}

6
Test/App.config Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.1"/>
</startup>
</configuration>

38
Test/Program.cs Normal file
View File

@ -0,0 +1,38 @@
using System;
using SimpleInjector;
using Tapeti;
using Tapeti.SimpleInjector;
namespace Test
{
internal class Program
{
private static void Main()
{
var container = new Container();
using (var connection = new TapetiConnection
{
PublishExchange = "test",
SubscribeExchange = "test"
}
.WithDependencyResolver(new SimpleInjectorDependencyResolver(container))
.RegisterAllControllers(typeof(Program).Assembly))
{
container.Register(() => connection.GetPublisher());
Console.WriteLine("Subscribing...");
connection.Subscribe().Wait();
Console.WriteLine("Done!");
var publisher = connection.GetPublisher();
//for (var x = 0; x < 5000; x++)
while(true)
publisher.Publish(new MarcoMessage()).Wait();
Console.ReadLine();
}
}
}
}

View File

@ -0,0 +1,36 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Test")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Hewlett-Packard Company")]
[assembly: AssemblyProduct("Test")]
[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("90559950-1b32-4119-a78e-517e2c71ee23")]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

77
Test/Test.csproj Normal file
View File

@ -0,0 +1,77 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{90559950-1B32-4119-A78E-517E2C71EE23}</ProjectGuid>
<OutputType>Exe</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Test</RootNamespace>
<AssemblyName>Test</AssemblyName>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<TargetFrameworkProfile />
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="SimpleInjector, Version=3.2.7.0, Culture=neutral, PublicKeyToken=984cb50dea722e99, processorArchitecture=MSIL">
<HintPath>..\packages\SimpleInjector.3.2.7\lib\net45\SimpleInjector.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TestQueueController.cs" />
</ItemGroup>
<ItemGroup>
<None Include="App.config" />
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti.csproj">
<Project>{8ab4fd33-4aaa-465c-8579-9db3f3b23813}</Project>
<Name>Tapeti</Name>
</ProjectReference>
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj">
<Project>{d7ec6f86-eb3b-49c3-8fe7-6e8c1bb413a6}</Project>
<Name>Tapeti.SimpleInjector</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

View File

@ -0,0 +1,44 @@
using System;
using System.Threading.Tasks;
using Tapeti;
using Tapeti.Annotations;
namespace Test
{
//[Exchange("myexchange")]
//[Queue("staticqueue")]
[Queue]
public class TestQueueController
{
private readonly IPublisher publisher;
public TestQueueController(IPublisher publisher)
{
this.publisher = publisher;
}
public async Task Marco(MarcoMessage message)
{
Console.WriteLine("Marco!");
await publisher.Publish(new PoloMessage());
}
public void Polo(PoloMessage message)
{
Console.WriteLine("Polo!");
}
}
public class MarcoMessage
{
}
public class PoloMessage
{
}
}

4
Test/packages.config Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="SimpleInjector" version="3.2.7" targetFramework="net461" />
</packages>

View File

@ -1,4 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Newtonsoft.Json" version="9.0.1" targetFramework="net461" />
<package id="RabbitMQ.Client" version="4.1.1" targetFramework="net461" />
</packages>