1
0
mirror of synced 2025-01-23 08:23:08 +01:00

Merge branch 'feature/flow' into develop

Conflicts:
	Config/IBindingContext.cs
	Config/IConfig.cs
	Config/IMessageContext.cs
	Connection/TapetiConsumer.cs
	Connection/TapetiPublisher.cs
	Connection/TapetiWorker.cs
	Helpers/MiddlewareHelper.cs
	IPublisher.cs
	Tapeti.Flow/Tapeti.Flow.csproj
	Tapeti.Saga/ISagaProvider.cs
	Tapeti.Saga/SagaMiddleware.cs
	Tapeti.Saga/SagaProvider.cs
	TapetiConfig.cs
	Test/MarcoController.cs
	Test/Program.cs
This commit is contained in:
Mark van Renswoude 2017-02-07 16:17:35 +01:00
commit 019684934e
64 changed files with 1665 additions and 462 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
bin/ bin/
obj/ obj/
packages/ packages/
*.user

View File

@ -1,17 +1,23 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks;
namespace Tapeti.Config namespace Tapeti.Config
{ {
public delegate object ValueFactory(IMessageContext context); public delegate object ValueFactory(IMessageContext context);
public delegate Task ResultHandler(IMessageContext context, object value);
public interface IBindingContext public interface IBindingContext
{ {
Type MessageClass { get; set; } Type MessageClass { get; set; }
IReadOnlyList<IBindingParameter> Parameters { get; }
MethodInfo Method { get; }
IReadOnlyList<IBindingParameter> Parameters { get; }
IBindingResult Result { get; }
void Use(IBindingFilter filter);
void Use(IMessageMiddleware middleware); void Use(IMessageMiddleware middleware);
} }
@ -23,4 +29,13 @@ namespace Tapeti.Config
void SetBinding(ValueFactory valueFactory); void SetBinding(ValueFactory valueFactory);
} }
public interface IBindingResult
{
ParameterInfo Info { get; }
bool HasHandler { get; }
void SetHandler(ResultHandler resultHandler);
}
} }

9
Config/IBindingFilter.cs Normal file
View File

@ -0,0 +1,9 @@
using System.Threading.Tasks;
namespace Tapeti.Config
{
public interface IBindingFilter
{
Task<bool> Accept(IMessageContext context, IBinding binding);
}
}

View File

@ -7,10 +7,11 @@ namespace Tapeti.Config
{ {
public interface IConfig public interface IConfig
{ {
string Exchange { get; }
IDependencyResolver DependencyResolver { get; } IDependencyResolver DependencyResolver { get; }
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; } IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
IEnumerable<IQueue> Queues { get; } IEnumerable<IQueue> Queues { get; }
IBinding GetBinding(Delegate method);
} }
@ -28,10 +29,18 @@ namespace Tapeti.Config
Type Controller { get; } Type Controller { get; }
MethodInfo Method { get; } MethodInfo Method { get; }
Type MessageClass { get; } Type MessageClass { get; }
string QueueName { get; }
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; } IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
IReadOnlyList<IBindingFilter> BindingFilters { get; }
bool Accept(object message); Task<bool> Accept(IMessageContext context, object message);
Task<object> Invoke(IMessageContext context, object message); Task Invoke(IMessageContext context, object message);
}
public interface IDynamicQueueBinding : IBinding
{
void SetQueueName(string queueName);
} }
} }

View File

@ -1,16 +1,29 @@
using System.Collections.Generic; using System;
using System.Collections.Generic;
using System.Reflection;
using RabbitMQ.Client; using RabbitMQ.Client;
namespace Tapeti.Config namespace Tapeti.Config
{ {
public interface IMessageContext public interface IMessageContext : IDisposable
{ {
IDependencyResolver DependencyResolver { get; } IDependencyResolver DependencyResolver { get; }
object Controller { get; } string Queue { get; }
string RoutingKey { get; }
object Message { get; } object Message { get; }
IBasicProperties Properties { get; } IBasicProperties Properties { get; }
IDictionary<string, object> Items { get; } IDictionary<string, object> Items { get; }
/// <summary>
/// Controller will be null when passed to an IBindingFilter
/// </summary>
object Controller { get; }
/// <summary>
/// Binding will be null when passed to an IBindingFilter
/// </summary>
IBinding Binding { get; }
} }
} }

View File

@ -1,7 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks;
using RabbitMQ.Client; using RabbitMQ.Client;
using Tapeti.Config; using Tapeti.Config;
using Tapeti.Helpers; using Tapeti.Helpers;
@ -11,17 +10,22 @@ namespace Tapeti.Connection
public class TapetiConsumer : DefaultBasicConsumer public class TapetiConsumer : DefaultBasicConsumer
{ {
private readonly TapetiWorker worker; private readonly TapetiWorker worker;
private readonly string queueName;
private readonly IDependencyResolver dependencyResolver; private readonly IDependencyResolver dependencyResolver;
private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware; private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware;
private readonly List<IBinding> bindings; private readonly List<IBinding> bindings;
private readonly IExceptionStrategy exceptionStrategy;
public TapetiConsumer(TapetiWorker worker, IDependencyResolver dependencyResolver, IEnumerable<IBinding> bindings, IReadOnlyList<IMessageMiddleware> messageMiddleware) public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable<IBinding> bindings, IReadOnlyList<IMessageMiddleware> messageMiddleware)
{ {
this.worker = worker; this.worker = worker;
this.queueName = queueName;
this.dependencyResolver = dependencyResolver; this.dependencyResolver = dependencyResolver;
this.messageMiddleware = messageMiddleware; this.messageMiddleware = messageMiddleware;
this.bindings = bindings.ToList(); this.bindings = bindings.ToList();
exceptionStrategy = dependencyResolver.Resolve<IExceptionStrategy>();
} }
@ -35,51 +39,91 @@ namespace Tapeti.Connection
throw new ArgumentException("Empty message"); throw new ArgumentException("Empty message");
var validMessageType = false; var validMessageType = false;
foreach (var binding in bindings.Where(b => b.Accept(message)))
using (var context = new MessageContext
{ {
var context = new MessageContext DependencyResolver = dependencyResolver,
Queue = queueName,
RoutingKey = routingKey,
Message = message,
Properties = properties
})
{
try
{ {
DependencyResolver = dependencyResolver, foreach (var binding in bindings)
Controller = dependencyResolver.Resolve(binding.Controller),
Message = message,
Properties = properties
};
MiddlewareHelper.GoAsync(binding.MessageMiddleware != null ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() : messageMiddleware,
async (handler, next) => await handler.Handle(context, next),
async () =>
{ {
var result = binding.Invoke(context, message).Result; if (!binding.Accept(context, message).Result)
if (result != null) continue;
await worker.Publish(result, null);
context.Controller = dependencyResolver.Resolve(binding.Controller);
context.Binding = binding;
// ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas
MiddlewareHelper.GoAsync(
binding.MessageMiddleware != null
? messageMiddleware.Concat(binding.MessageMiddleware).ToList()
: messageMiddleware,
async (handler, next) => await handler.Handle(context, next),
() => binding.Invoke(context, message)
).Wait();
// ReSharper restore AccessToDisposedClosure
validMessageType = true;
} }
).Wait();
validMessageType = true; if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
}
catch (Exception e)
{
worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, UnwrapException(e)));
}
} }
if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
worker.Respond(deliveryTag, ConsumeResponse.Ack); worker.Respond(deliveryTag, ConsumeResponse.Ack);
} }
catch (Exception) catch (Exception e)
{ {
worker.Respond(deliveryTag, ConsumeResponse.Requeue); worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, UnwrapException(e)));
throw;
} }
} }
private static Exception UnwrapException(Exception exception)
{
// In async/await style code this is handled similarly. For synchronous
// code using Tasks we have to unwrap these ourselves to get the proper
// exception directly instead of "Errors occured". We might lose
// some stack traces in the process though.
var aggregateException = exception as AggregateException;
if (aggregateException != null && aggregateException.InnerExceptions.Count == 1)
throw aggregateException.InnerExceptions[0];
return UnwrapException(exception);
}
protected class MessageContext : IMessageContext protected class MessageContext : IMessageContext
{ {
public IDependencyResolver DependencyResolver { get; set; } public IDependencyResolver DependencyResolver { get; set; }
public object Controller { get; set; } public object Controller { get; set; }
public IBinding Binding { get; set; }
public string Queue { get; set; }
public string RoutingKey { get; set; }
public object Message { get; set; } public object Message { get; set; }
public IBasicProperties Properties { get; set; } public IBasicProperties Properties { get; set; }
public IDictionary<string, object> Items { get; } = new Dictionary<string, object>(); public IDictionary<string, object> Items { get; } = new Dictionary<string, object>();
public void Dispose()
{
foreach (var value in Items.Values)
(value as IDisposable)?.Dispose();
}
} }
} }
} }

View File

@ -4,7 +4,7 @@ using RabbitMQ.Client;
namespace Tapeti.Connection namespace Tapeti.Connection
{ {
public class TapetiPublisher : IAdvancedPublisher public class TapetiPublisher : IInternalPublisher
{ {
private readonly Func<TapetiWorker> workerFactory; private readonly Func<TapetiWorker> workerFactory;
@ -25,5 +25,11 @@ namespace Tapeti.Connection
{ {
return workerFactory().Publish(message, properties); return workerFactory().Publish(message, properties);
} }
public Task PublishDirect(object message, string queueName, IBasicProperties properties)
{
return workerFactory().PublishDirect(message, queueName, properties);
}
} }
} }

View File

@ -12,12 +12,12 @@ namespace Tapeti.Connection
public class TapetiWorker public class TapetiWorker
{ {
public TapetiConnectionParams ConnectionParams { get; set; } public TapetiConnectionParams ConnectionParams { get; set; }
public string Exchange { get; set; }
private readonly IDependencyResolver dependencyResolver; private readonly IDependencyResolver dependencyResolver;
private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware; private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware;
private readonly IMessageSerializer messageSerializer; private readonly IMessageSerializer messageSerializer;
private readonly IRoutingKeyStrategy routingKeyStrategy; private readonly IRoutingKeyStrategy routingKeyStrategy;
private readonly IExchangeStrategy exchangeStrategy;
private readonly Lazy<SingleThreadTaskQueue> taskQueue = new Lazy<SingleThreadTaskQueue>(); private readonly Lazy<SingleThreadTaskQueue> taskQueue = new Lazy<SingleThreadTaskQueue>();
private RabbitMQ.Client.IConnection connection; private RabbitMQ.Client.IConnection connection;
private IModel channelInstance; private IModel channelInstance;
@ -27,25 +27,22 @@ namespace Tapeti.Connection
{ {
this.dependencyResolver = dependencyResolver; this.dependencyResolver = dependencyResolver;
this.messageMiddleware = messageMiddleware; this.messageMiddleware = messageMiddleware;
messageSerializer = dependencyResolver.Resolve<IMessageSerializer>(); messageSerializer = dependencyResolver.Resolve<IMessageSerializer>();
routingKeyStrategy = dependencyResolver.Resolve<IRoutingKeyStrategy>(); routingKeyStrategy = dependencyResolver.Resolve<IRoutingKeyStrategy>();
exchangeStrategy = dependencyResolver.Resolve<IExchangeStrategy>();
} }
public Task Publish(object message, IBasicProperties properties) public Task Publish(object message, IBasicProperties properties)
{ {
return taskQueue.Value.Add(async () => return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType()));
{ }
var messageProperties = properties ?? new BasicProperties();
if (messageProperties.Timestamp.UnixTime == 0)
messageProperties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds());
var body = messageSerializer.Serialize(message, messageProperties);
(await GetChannel()) public Task PublishDirect(object message, string queueName, IBasicProperties properties)
.BasicPublish(Exchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false, {
messageProperties, body); return Publish(message, properties, "", queueName);
}).Unwrap();
} }
@ -53,7 +50,7 @@ namespace Tapeti.Connection
{ {
return taskQueue.Value.Add(async () => return taskQueue.Value.Add(async () =>
{ {
(await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, dependencyResolver, bindings, messageMiddleware)); (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, dependencyResolver, bindings, messageMiddleware));
}).Unwrap(); }).Unwrap();
} }
@ -71,7 +68,9 @@ namespace Tapeti.Connection
foreach (var binding in queue.Bindings) foreach (var binding in queue.Bindings)
{ {
var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass);
channel.QueueBind(dynamicQueue.QueueName, Exchange, routingKey); channel.QueueBind(dynamicQueue.QueueName, exchangeStrategy.GetExchange(binding.MessageClass), routingKey);
(binding as IDynamicQueueBinding)?.SetQueueName(dynamicQueue.QueueName);
} }
return dynamicQueue.QueueName; return dynamicQueue.QueueName;
@ -133,6 +132,22 @@ namespace Tapeti.Connection
} }
private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey)
{
return taskQueue.Value.Add(async () =>
{
var messageProperties = properties ?? new BasicProperties();
if (messageProperties.Timestamp.UnixTime == 0)
messageProperties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds());
var body = messageSerializer.Serialize(message, messageProperties);
(await GetChannel())
.BasicPublish(exchange, routingKey, false, messageProperties, body);
}).Unwrap();
}
/// <remarks> /// <remarks>
/// Only call this from a task in the taskQueue to ensure IModel is only used /// Only call this from a task in the taskQueue to ensure IModel is only used
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
@ -160,6 +175,9 @@ namespace Tapeti.Connection
connection = connectionFactory.CreateConnection(); connection = connectionFactory.CreateConnection();
channelInstance = connection.CreateModel(); channelInstance = connection.CreateModel();
if (ConnectionParams.PrefetchCount > 0)
channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false);
break; break;
} }
catch (BrokerUnreachableException) catch (BrokerUnreachableException)

View File

@ -6,21 +6,12 @@ namespace Tapeti.Default
{ {
public class DependencyResolverBinding : IBindingMiddleware public class DependencyResolverBinding : IBindingMiddleware
{ {
private readonly IDependencyResolver resolver;
public DependencyResolverBinding(IDependencyResolver resolver)
{
this.resolver = resolver;
}
public void Handle(IBindingContext context, Action next) public void Handle(IBindingContext context, Action next)
{ {
next(); next();
foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType.IsClass)) foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType.IsClass))
parameter.SetBinding(messageContext => resolver.Resolve(parameter.Info.ParameterType)); parameter.SetBinding(messageContext => messageContext.DependencyResolver.Resolve(parameter.Info.ParameterType));
} }
} }
} }

View File

@ -8,7 +8,7 @@ using RabbitMQ.Client;
namespace Tapeti.Default namespace Tapeti.Default
{ {
public class DefaultMessageSerializer : IMessageSerializer public class JsonMessageSerializer : IMessageSerializer
{ {
protected const string ContentType = "application/json"; protected const string ContentType = "application/json";
protected const string ClassTypeHeader = "classType"; protected const string ClassTypeHeader = "classType";
@ -18,7 +18,7 @@ namespace Tapeti.Default
private readonly ConcurrentDictionary<Type, string> serializedTypeNames = new ConcurrentDictionary<Type, string>(); private readonly ConcurrentDictionary<Type, string> serializedTypeNames = new ConcurrentDictionary<Type, string>();
private readonly JsonSerializerSettings serializerSettings; private readonly JsonSerializerSettings serializerSettings;
public DefaultMessageSerializer() public JsonMessageSerializer()
{ {
serializerSettings = new JsonSerializerSettings serializerSettings = new JsonSerializerSettings
{ {

View File

@ -0,0 +1,25 @@
using System;
using System.Text.RegularExpressions;
namespace Tapeti.Default
{
public class NamespaceMatchExchangeStrategy : IExchangeStrategy
{
// If the namespace starts with "Messaging.Service[.Optional.Further.Parts]", the exchange will be "Service".
// If no Messaging prefix is present, the first part of the namespace will be used instead.
private static readonly Regex NamespaceRegex = new Regex("^(Messaging\\.)?(?<exchange>[^\\.]+)", RegexOptions.Compiled | RegexOptions.Singleline);
public string GetExchange(Type messageType)
{
if (messageType.Namespace == null)
throw new ArgumentException($"{messageType.FullName} does not have a namespace");
var match = NamespaceRegex.Match(messageType.Namespace);
if (!match.Success)
throw new ArgumentException($"Namespace for {messageType.FullName} does not match the specified format");
return match.Groups["exchange"].Value.ToLower();
}
}
}

View File

@ -0,0 +1,53 @@
using System;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;
using Tapeti.Config;
using Tapeti.Helpers;
namespace Tapeti.Default
{
public class PublishResultBinding : IBindingMiddleware
{
public void Handle(IBindingContext context, Action next)
{
next();
if (context.Result.HasHandler)
return;
bool isTask;
if (context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out isTask))
{
if (isTask)
{
context.Result.SetHandler(async (messageContext, value) =>
{
var message = await (Task<object>)value;
if (message != null)
await Reply(message, messageContext);
});
}
else
context.Result.SetHandler((messageContext, value) =>
value == null ? null : Reply(value, messageContext));
}
}
private Task Reply(object message, IMessageContext messageContext)
{
var publisher = (IInternalPublisher)messageContext.DependencyResolver.Resolve<IPublisher>();
var properties = new BasicProperties();
// Only set the property if it's not null, otherwise a string reference exception can occur:
// http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html
if (messageContext.Properties.IsCorrelationIdPresent())
properties.CorrelationId = messageContext.Properties.CorrelationId;
if (messageContext.Properties.IsReplyToPresent())
return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties);
return publisher.Publish(message, properties);
}
}
}

View File

@ -0,0 +1,14 @@
using System;
using Tapeti.Config;
namespace Tapeti.Default
{
public class RequeueExceptionStrategy : IExceptionStrategy
{
public ConsumeResponse HandleException(IMessageContext context, Exception exception)
{
// TODO log exception
return ConsumeResponse.Requeue;
}
}
}

View File

@ -5,7 +5,7 @@ using System.Linq;
namespace Tapeti.Default namespace Tapeti.Default
{ {
public class DefaultRoutingKeyStrategy : IRoutingKeyStrategy public class TypeNameRoutingKeyStrategy : IRoutingKeyStrategy
{ {
private readonly ConcurrentDictionary<Type, string> routingKeyCache = new ConcurrentDictionary<Type, string>(); private readonly ConcurrentDictionary<Type, string> routingKeyCache = new ConcurrentDictionary<Type, string>();

View File

@ -1,6 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics.Eventing.Reader;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Tapeti.Helpers namespace Tapeti.Helpers

28
Helpers/TaskTypeHelper.cs Normal file
View File

@ -0,0 +1,28 @@
using System;
using System.Threading.Tasks;
namespace Tapeti.Helpers
{
public static class TaskTypeHelper
{
public static bool IsTypeOrTaskOf(this Type type, Func<Type, bool> predicate, out bool isTask)
{
if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Task<>))
{
isTask = true;
var genericArguments = type.GetGenericArguments();
return genericArguments.Length == 1 && predicate(genericArguments[0]);
}
isTask = false;
return predicate(type);
}
public static bool IsTypeOrTaskOf(this Type type, Type compareTo, out bool isTask)
{
return IsTypeOrTaskOf(type, t => t == compareTo, out isTask);
}
}
}

View File

@ -9,10 +9,15 @@ namespace Tapeti
} }
public interface IDependencyInjector : IDependencyResolver public interface IDependencyContainer : IDependencyResolver
{ {
void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService; void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService;
void RegisterPublisher(Func<IPublisher> publisher); void RegisterDefault<TService>(Func<TService> factory) where TService : class;
void RegisterDefaultSingleton<TService, TImplementation>() where TService : class where TImplementation : class, TService;
void RegisterDefaultSingleton<TService>(TService instance) where TService : class;
void RegisterDefaultSingleton<TService>(Func<TService> factory) where TService : class;
void RegisterController(Type type); void RegisterController(Type type);
} }
} }

16
IExceptionStrategy.cs Normal file
View File

@ -0,0 +1,16 @@
using System;
using Tapeti.Config;
namespace Tapeti
{
public interface IExceptionStrategy
{
/// <summary>
/// Called when an exception occurs while handling a message.
/// </summary>
/// <param name="context">The message context if available. May be null!</param>
/// <param name="exception">The exception instance</param>
/// <returns>The ConsumeResponse to determine whether to requeue, dead-letter (nack) or simply ack the message.</returns>
ConsumeResponse HandleException(IMessageContext context, Exception exception);
}
}

9
IExchangeStrategy.cs Normal file
View File

@ -0,0 +1,9 @@
using System;
namespace Tapeti
{
public interface IExchangeStrategy
{
string GetExchange(Type messageType);
}
}

View File

@ -3,14 +3,17 @@ using RabbitMQ.Client;
namespace Tapeti namespace Tapeti
{ {
// Note: Tapeti assumes every implementation of IPublisher can also be cast to an IInternalPublisher.
// The distinction is made on purpose to trigger code-smells in non-Tapeti code when casting.
public interface IPublisher public interface IPublisher
{ {
Task Publish(object message); Task Publish(object message);
} }
public interface IAdvancedPublisher : IPublisher public interface IInternalPublisher : IPublisher
{ {
Task Publish(object message, IBasicProperties properties); Task Publish(object message, IBasicProperties properties);
Task PublishDirect(object message, string queueName, IBasicProperties properties);
} }
} }

View File

@ -7,9 +7,9 @@ using System.Runtime.InteropServices;
[assembly: AssemblyTitle("Tapeti")] [assembly: AssemblyTitle("Tapeti")]
[assembly: AssemblyDescription("")] [assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")] [assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Hewlett-Packard Company")] [assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Tapeti")] [assembly: AssemblyProduct("Tapeti")]
[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] [assembly: AssemblyCopyright("")]
[assembly: AssemblyTrademark("")] [assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")] [assembly: AssemblyCulture("")]

View File

@ -0,0 +1,8 @@
using System;
namespace Tapeti.Flow.Annotations
{
public class ContinuationAttribute : Attribute
{
}
}

View File

@ -0,0 +1,9 @@
using System;
namespace Tapeti.Flow.Annotations
{
public class RequestAttribute : Attribute
{
public Type Response { get; set; }
}
}

View File

@ -0,0 +1,11 @@
namespace Tapeti.Flow
{
public static class ConfigExtensions
{
public static TapetiConfig WithFlow(this TapetiConfig config)
{
config.Use(new FlowMiddleware());
return config;
}
}
}

View File

@ -0,0 +1,7 @@
namespace Tapeti.Flow
{
public static class ContextItems
{
public const string FlowContext = "Tapeti.Flow.FlowContext";
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
internal interface IExecutableYieldPoint : IYieldPoint
{
bool StoreState { get; }
Task Execute(FlowContext context);
}
internal class DelegateYieldPoint : IYieldPoint
{
public bool StoreState { get; }
private readonly Func<FlowContext, Task> onExecute;
public DelegateYieldPoint(bool storeState, Func<FlowContext, Task> onExecute)
{
StoreState = storeState;
this.onExecute = onExecute;
}
public Task Execute(FlowContext context)
{
return onExecute(context);
}
}
}

View File

@ -0,0 +1,64 @@
using System;
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default
{
public class FlowBindingFilter : IBindingFilter
{
public async Task<bool> Accept(IMessageContext context, IBinding binding)
{
var flowContext = await GetFlowContext(context);
if (flowContext?.ContinuationMetadata == null)
return false;
return flowContext.ContinuationMetadata.MethodName == MethodSerializer.Serialize(binding.Method);
}
private static async Task<FlowContext> GetFlowContext(IMessageContext context)
{
if (context.Items.ContainsKey(ContextItems.FlowContext))
return (FlowContext)context.Items[ContextItems.FlowContext];
if (context.Properties.CorrelationId == null)
return null;
Guid continuationID;
if (!Guid.TryParse(context.Properties.CorrelationId, out continuationID))
return null;
var flowStore = context.DependencyResolver.Resolve<IFlowStore>();
var flowID = await flowStore.FindFlowID(continuationID);
if (!flowID.HasValue)
return null;
var flowStateLock = await flowStore.LockFlowState(flowID.Value);
if (flowStateLock == null)
return null;
var flowState = await flowStateLock.GetFlowState();
if (flowState == null)
return null;
ContinuationMetadata continuation;
var flowContext = new FlowContext
{
MessageContext = context,
FlowStateLock = flowStateLock,
FlowState = flowState,
ContinuationID = continuationID,
ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out continuation) ? continuation : null
};
// IDisposable items in the IMessageContext are automatically disposed
context.Items.Add(ContextItems.FlowContext, flowContext);
return flowContext;
}
}
}

View File

@ -0,0 +1,73 @@
using System;
using System.Reflection;
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Flow.Annotations;
using Tapeti.Helpers;
namespace Tapeti.Flow.Default
{
// TODO figure out a way to prevent binding on Continuation methods (which are always the target of a direct response)
internal class FlowBindingMiddleware : IBindingMiddleware
{
public void Handle(IBindingContext context, Action next)
{
RegisterContinuationFilter(context);
RegisterYieldPointResult(context);
next();
ValidateRequestResponse(context);
}
private static void RegisterContinuationFilter(IBindingContext context)
{
var continuationAttribute = context.Method.GetCustomAttribute<ContinuationAttribute>();
if (continuationAttribute == null)
return;
context.Use(new FlowBindingFilter());
context.Use(new FlowMessageMiddleware());
}
private static void RegisterYieldPointResult(IBindingContext context)
{
bool isTask;
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out isTask))
return;
if (isTask)
{
context.Result.SetHandler(async (messageContext, value) =>
{
var yieldPoint = await (Task<IYieldPoint>)value;
if (yieldPoint != null)
await HandleYieldPoint(messageContext, yieldPoint);
});
}
else
context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value));
}
private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint)
{
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(context, yieldPoint);
}
private static void ValidateRequestResponse(IBindingContext context)
{
var request = context.MessageClass?.GetCustomAttribute<RequestAttribute>();
if (request?.Response == null)
return;
bool isTask;
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out isTask))
throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}");
}
}
}

View File

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using Tapeti.Config;
namespace Tapeti.Flow.Default
{
internal class FlowContext : IDisposable
{
public IMessageContext MessageContext { get; set; }
public IFlowStateLock FlowStateLock { get; set; }
public FlowState FlowState { get; set; }
public Guid ContinuationID { get; set; }
public ContinuationMetadata ContinuationMetadata { get; set; }
public void Dispose()
{
FlowStateLock?.Dispose();
}
}
}

View File

@ -0,0 +1,24 @@
using System;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Flow.Default
{
public class FlowMessageMiddleware : IMessageMiddleware
{
public async Task Handle(IMessageContext context, Func<Task> next)
{
var flowContext = (FlowContext)context.Items[ContextItems.FlowContext];
if (flowContext != null)
{
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller);
await next();
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
}
else
await next();
}
}
}

View File

@ -0,0 +1,273 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;
using Tapeti.Config;
using Tapeti.Flow.Annotations;
using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default
{
public class FlowProvider : IFlowProvider, IFlowHandler
{
private readonly IConfig config;
private readonly IInternalPublisher publisher;
public FlowProvider(IConfig config, IPublisher publisher)
{
this.config = config;
this.publisher = (IInternalPublisher)publisher;
}
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler)
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo));
}
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler)
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo));
}
public IFlowParallelRequestBuilder YieldWithParallelRequest()
{
throw new NotImplementedException();
//return new ParallelRequestBuilder();
}
public IYieldPoint EndWithResponse<TResponse>(TResponse message)
{
return new DelegateYieldPoint(false, context => SendResponse(context, message));
}
public IYieldPoint End()
{
return new DelegateYieldPoint(false, EndFlow);
}
private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo)
{
var continuationID = Guid.NewGuid();
context.FlowState.Continuations.Add(continuationID,
new ContinuationMetadata
{
MethodName = responseHandlerInfo.MethodName,
ConvergeMethodName = null
});
var properties = new BasicProperties
{
CorrelationId = continuationID.ToString(),
ReplyTo = responseHandlerInfo.ReplyToQueue
};
await publisher.Publish(message, properties);
}
private async Task SendResponse(FlowContext context, object message)
{
var reply = context.FlowState.Metadata.Reply;
if (reply == null)
throw new YieldPointException("No response is required");
if (message.GetType().FullName != reply.ResponseTypeName)
throw new YieldPointException($"Flow must end with a response message of type {reply.ResponseTypeName}, {message.GetType().FullName} was returned instead");
var properties = new BasicProperties();
// Only set the property if it's not null, otherwise a string reference exception can occur:
// http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html
if (reply.CorrelationId != null)
properties.CorrelationId = reply.CorrelationId;
// TODO disallow if replyto is not specified?
if (context.FlowState.Metadata.Reply.ReplyTo != null)
await publisher.PublishDirect(message, reply.ReplyTo, properties);
else
await publisher.Publish(message, properties);
}
private static Task EndFlow(FlowContext context)
{
if (context.FlowState.Metadata.Reply != null)
throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}");
return Task.CompletedTask;
}
private static ResponseHandlerInfo GetResponseHandlerInfo(IConfig config, object request, Delegate responseHandler)
{
var binding = config.GetBinding(responseHandler);
if (binding == null)
throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler));
var requestAttribute = request.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response != null && requestAttribute.Response != binding.MessageClass)
throw new ArgumentException($"responseHandler must accept message of type {binding.MessageClass}", nameof(responseHandler));
var continuationAttribute = binding.Method.GetCustomAttribute<ContinuationAttribute>();
if (continuationAttribute == null)
throw new ArgumentException($"responseHandler must be marked with the Continuation attribute", nameof(responseHandler));
return new ResponseHandlerInfo
{
MethodName = MethodSerializer.Serialize(responseHandler.Method),
ReplyToQueue = binding.QueueName
};
}
private static ReplyMetadata GetReply(IMessageContext context)
{
var requestAttribute = context.Message.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response == null)
return null;
return new ReplyMetadata
{
CorrelationId = context.Properties.CorrelationId,
ReplyTo = context.Properties.ReplyTo,
ResponseTypeName = requestAttribute.Response.FullName
};
}
public async Task Execute(IMessageContext context, IYieldPoint yieldPoint)
{
var delegateYieldPoint = (DelegateYieldPoint)yieldPoint;
var storeState = delegateYieldPoint.StoreState;
FlowContext flowContext;
object flowContextItem;
if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextItem))
{
flowContext = new FlowContext
{
MessageContext = context
};
if (storeState)
{
// Initiate the flow
var flowStore = context.DependencyResolver.Resolve<IFlowStore>();
var flowID = Guid.NewGuid();
flowContext.FlowStateLock = await flowStore.LockFlowState(flowID);
if (flowContext.FlowStateLock == null)
throw new InvalidOperationException("Unable to lock a new flow");
flowContext.FlowState = await flowContext.FlowStateLock.GetFlowState();
if (flowContext.FlowState == null)
throw new InvalidOperationException("Unable to get state for new flow");
flowContext.FlowState.Metadata.Reply = GetReply(context);
}
}
else
flowContext = (FlowContext) flowContextItem;
try
{
await delegateYieldPoint.Execute(flowContext);
}
catch (YieldPointException e)
{
var controllerName = flowContext.MessageContext.Controller.GetType().FullName;
var methodName = flowContext.MessageContext.Binding.Method.Name;
throw new YieldPointException($"{e.Message} in controller {controllerName}, method {methodName}", e);
}
if (storeState)
{
flowContext.FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(context.Controller);
await flowContext.FlowStateLock.StoreFlowState(flowContext.FlowState);
}
else
{
await flowContext.FlowStateLock.DeleteFlowState();
}
}
/*
private class ParallelRequestBuilder : IFlowParallelRequestBuilder
{
internal class RequestInfo
{
public object Message { get; set; }
public ResponseHandlerInfo ResponseHandlerInfo { get; set; }
}
private readonly IConfig config;
private readonly IFlowStore flowStore;
private readonly Func<FlowContext, object, ResponseHandlerInfo, Task> sendRequest;
private readonly List<RequestInfo> requests = new List<RequestInfo>();
public ParallelRequestBuilder(IConfig config, IFlowStore flowStore, Func<FlowContext, object, ResponseHandlerInfo, Task> sendRequest)
{
this.config = config;
this.flowStore = flowStore;
this.sendRequest = sendRequest;
}
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler)
{
requests.Add(new RequestInfo
{
Message = message,
ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler)
});
return this;
}
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler)
{
requests.Add(new RequestInfo
{
Message = message,
ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler)
});
return this;
}
public IYieldPoint Yield(Func<Task<IYieldPoint>> continuation)
{
return new YieldPoint(flowStore, true, context => Task.WhenAll(requests.Select(requestInfo => sendRequest(context, requestInfo.Message, requestInfo.ResponseHandlerInfo))));
}
public IYieldPoint Yield(Func<IYieldPoint> continuation)
{
return new YieldPoint(flowStore, true, context => Task.WhenAll(requests.Select(requestInfo => sendRequest(context, requestInfo.Message, requestInfo.ResponseHandlerInfo))));
}
}*/
internal class ResponseHandlerInfo
{
public string MethodName { get; set; }
public string ReplyToQueue { get; set; }
}
}
}

View File

@ -0,0 +1,95 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace Tapeti.Flow.Default
{
public class FlowState
{
private FlowMetadata metadata;
private Dictionary<Guid, ContinuationMetadata> continuations;
public FlowMetadata Metadata
{
get { return metadata ?? (metadata = new FlowMetadata()); }
set { metadata = value; }
}
public string Data { get; set; }
public Dictionary<Guid, ContinuationMetadata> Continuations
{
get { return continuations ?? (continuations = new Dictionary<Guid, ContinuationMetadata>()); }
set { continuations = value; }
}
public void Assign(FlowState value)
{
Metadata = value.Metadata.Clone();
Data = value.Data;
Continuations = value.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value.Clone());
}
public FlowState Clone()
{
var result = new FlowState();
result.Assign(this);
return result;
}
}
public class FlowMetadata
{
public ReplyMetadata Reply { get; set; }
public FlowMetadata Clone()
{
return new FlowMetadata
{
Reply = Reply?.Clone()
};
}
}
public class ReplyMetadata
{
public string ReplyTo { get; set; }
public string CorrelationId { get; set; }
public string ResponseTypeName { get; set; }
public ReplyMetadata Clone()
{
return new ReplyMetadata
{
ReplyTo = ReplyTo,
CorrelationId = CorrelationId,
ResponseTypeName = ResponseTypeName
};
}
}
public class ContinuationMetadata
{
public string MethodName { get; set; }
public string ConvergeMethodName { get; set; }
public ContinuationMetadata Clone()
{
return new ContinuationMetadata
{
MethodName = MethodName,
ConvergeMethodName = ConvergeMethodName
};
}
}
}

View File

@ -0,0 +1,198 @@
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
public class FlowStore : IFlowStore
{
private static readonly ConcurrentDictionary<Guid, FlowState> FlowStates = new ConcurrentDictionary<Guid, FlowState>();
private static readonly ConcurrentDictionary<Guid, Guid> ContinuationLookup = new ConcurrentDictionary<Guid, Guid>();
private readonly IFlowRepository repository;
public FlowStore(IFlowRepository repository)
{
this.repository = repository;
}
public async Task Load()
{
FlowStates.Clear();
ContinuationLookup.Clear();
foreach (var flowStateRecord in await repository.GetStates())
{
var flowState = ToFlowState(flowStateRecord);
FlowStates.GetOrAdd(flowStateRecord.FlowID, flowState);
foreach (var continuation in flowStateRecord.ContinuationMetadata)
ContinuationLookup.GetOrAdd(continuation.Key, flowStateRecord.FlowID);
}
}
public Task<Guid?> FindFlowID(Guid continuationID)
{
Guid result;
return Task.FromResult(ContinuationLookup.TryGetValue(continuationID, out result) ? result : (Guid?)null);
}
public async Task<IFlowStateLock> LockFlowState(Guid flowID)
{
var isNew = false;
var flowState = FlowStates.GetOrAdd(flowID, id =>
{
isNew = true;
return new FlowState();
});
var result = new FlowStateLock(this, flowState, flowID, isNew);
await result.Lock();
return result;
}
private class FlowStateLock : IFlowStateLock
{
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1);
private readonly FlowStore owner;
private readonly FlowState flowState;
private readonly Guid flowID;
private bool isNew;
private bool isDisposed;
public FlowStateLock(FlowStore owner, FlowState flowState, Guid flowID, bool isNew)
{
this.owner = owner;
this.flowState = flowState;
this.flowID = flowID;
this.isNew = isNew;
}
public Task Lock()
{
return semaphore.WaitAsync();
}
public void Dispose()
{
lock (flowState)
{
if (!isDisposed)
{
semaphore.Release();
semaphore.Dispose();
}
isDisposed = true;
}
}
public Guid FlowID => flowID;
public Task<FlowState> GetFlowState()
{
lock (flowState)
{
if (isDisposed)
throw new ObjectDisposedException("FlowStateLock");
return Task.FromResult(flowState.Clone());
}
}
public async Task StoreFlowState(FlowState newFlowState)
{
lock (flowState)
{
if (isDisposed)
throw new ObjectDisposedException("FlowStateLock");
foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k)))
{
Guid removedValue;
ContinuationLookup.TryRemove(removedContinuation, out removedValue);
}
foreach (var addedContinuation in newFlowState.Continuations.Where(c => !flowState.Continuations.ContainsKey(c.Key)))
{
ContinuationLookup.TryAdd(addedContinuation.Key, flowID);
}
flowState.Assign(newFlowState);
}
if (isNew)
{
isNew = false;
var now = DateTime.UtcNow;
await owner.repository.CreateState(ToFlowStateRecord(flowID, flowState), now);
}
else
{
await owner.repository.UpdateState(ToFlowStateRecord(flowID, flowState));
}
}
public async Task DeleteFlowState()
{
lock (flowState)
{
if (isDisposed)
throw new ObjectDisposedException("FlowStateLock");
foreach (var removedContinuation in flowState.Continuations.Keys)
{
Guid removedValue;
ContinuationLookup.TryRemove(removedContinuation, out removedValue);
}
FlowState removedFlow;
FlowStates.TryRemove(flowID, out removedFlow);
}
if (!isNew)
await owner.repository.DeleteState(flowID);
}
}
private static FlowStateRecord ToFlowStateRecord(Guid flowID, FlowState flowState)
{
return new FlowStateRecord
{
FlowID = flowID,
Metadata = JsonConvert.SerializeObject(flowState.Metadata),
Data = flowState.Data,
ContinuationMetadata = flowState.Continuations.ToDictionary(
kv => kv.Key,
kv => JsonConvert.SerializeObject(kv.Value))
};
}
private static FlowState ToFlowState(FlowStateRecord flowStateRecord)
{
return new FlowState
{
Metadata = JsonConvert.DeserializeObject<FlowMetadata>(flowStateRecord.Metadata),
Data = flowStateRecord.Data,
Continuations = flowStateRecord.ContinuationMetadata.ToDictionary(
kv => kv.Key,
kv => JsonConvert.DeserializeObject<ContinuationMetadata>(kv.Value))
};
}
}
}

View File

@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
public class NonPersistentFlowRepository : IFlowRepository
{
public Task<IQueryable<FlowStateRecord>> GetStates()
{
return Task.FromResult(new List<FlowStateRecord>().AsQueryable());
}
public Task CreateState(FlowStateRecord stateRecord, DateTime timestamp)
{
return Task.CompletedTask;
}
public Task UpdateState(FlowStateRecord stateRecord)
{
return Task.CompletedTask;
}
public Task DeleteState(Guid flowID)
{
return Task.CompletedTask;
}
}
}

View File

@ -0,0 +1,12 @@
using System.Reflection;
namespace Tapeti.Flow.FlowHelpers
{
public static class MethodSerializer
{
public static string Serialize(MethodInfo method)
{
return method.Name + '@' + method.DeclaringType?.Assembly.GetName().Name + ':' + method.DeclaringType?.FullName;
}
}
}

View File

@ -0,0 +1,25 @@
using System.Collections.Generic;
using Tapeti.Config;
using Tapeti.Flow.Default;
namespace Tapeti.Flow
{
public class FlowMiddleware : IMiddlewareBundle
{
public IEnumerable<object> GetContents(IDependencyResolver dependencyResolver)
{
var container = dependencyResolver as IDependencyContainer;
// ReSharper disable once InvertIf
if (container != null)
{
container.RegisterDefault<IFlowProvider, FlowProvider>();
container.RegisterDefault<IFlowHandler, FlowProvider>();
container.RegisterDefault<IFlowRepository, NonPersistentFlowRepository>();
container.RegisterDefault<IFlowStore, FlowStore>();
}
return new[] { new FlowBindingMiddleware() };
}
}
}

View File

@ -0,0 +1,39 @@
using System;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Flow
{
public interface IFlowProvider
{
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler);
// One does not simply overload methods with Task vs non-Task Funcs. "Ambiguous call".
// Apparantly this is because a return type of a method is not part of its signature,
// according to: http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter
IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler);
IFlowParallelRequestBuilder YieldWithParallelRequest();
IYieldPoint EndWithResponse<TResponse>(TResponse message);
IYieldPoint End();
}
public interface IFlowHandler
{
Task Execute(IMessageContext context, IYieldPoint yieldPoint);
}
public interface IFlowParallelRequestBuilder
{
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
IYieldPoint Yield(Func<Task<IYieldPoint>> continuation);
IYieldPoint Yield(Func<IYieldPoint> continuation);
}
public interface IYieldPoint
{
}
}

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Tapeti.Flow
{
public interface IFlowRepository
{
Task<IQueryable<FlowStateRecord>> GetStates();
Task CreateState(FlowStateRecord stateRecord, DateTime timestamp);
Task UpdateState(FlowStateRecord stateRecord);
Task DeleteState(Guid flowID);
}
public class FlowStateRecord
{
public Guid FlowID;
public string Metadata;
public string Data;
public Dictionary<Guid, string> ContinuationMetadata;
}
}

23
Tapeti.Flow/IFlowStore.cs Normal file
View File

@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Tapeti.Flow.Default;
namespace Tapeti.Flow
{
public interface IFlowStore
{
Task Load();
Task<Guid?> FindFlowID(Guid continuationID);
Task<IFlowStateLock> LockFlowState(Guid flowID);
}
public interface IFlowStateLock : IDisposable
{
Guid FlowID { get; }
Task<FlowState> GetFlowState();
Task StoreFlowState(FlowState flowState);
Task DeleteFlowState();
}
}

View File

@ -1,16 +1,15 @@
using System.Reflection; using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following // General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information // set of attributes. Change these attribute values to modify the information
// associated with an assembly. // associated with an assembly.
[assembly: AssemblyTitle("Tapeti.Saga")] [assembly: AssemblyTitle("Tapeti.Flow")]
[assembly: AssemblyDescription("")] [assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")] [assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Hewlett-Packard Company")] [assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Tapeti.Saga")] [assembly: AssemblyProduct("Tapeti.Flow")]
[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] [assembly: AssemblyCopyright("")]
[assembly: AssemblyTrademark("")] [assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")] [assembly: AssemblyCulture("")]

View File

@ -0,0 +1,9 @@
using System;
namespace Tapeti.Flow
{
public class ResponseExpectedException : Exception
{
public ResponseExpectedException(string message) : base(message) { }
}
}

View File

@ -7,8 +7,8 @@
<ProjectGuid>{F84AD920-D5A1-455D-AED5-2542B3A47B85}</ProjectGuid> <ProjectGuid>{F84AD920-D5A1-455D-AED5-2542B3A47B85}</ProjectGuid>
<OutputType>Library</OutputType> <OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder> <AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Tapeti.Saga</RootNamespace> <RootNamespace>Tapeti.Flow</RootNamespace>
<AssemblyName>Tapeti.Saga</AssemblyName> <AssemblyName>Tapeti.Flow</AssemblyName>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion> <TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment> <FileAlignment>512</FileAlignment>
<TargetFrameworkProfile /> <TargetFrameworkProfile />
@ -31,7 +31,14 @@
<WarningLevel>4</WarningLevel> <WarningLevel>4</WarningLevel>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<Reference Include="RabbitMQ.Client, Version=4.0.0.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce" /> <Reference Include="Newtonsoft.Json, Version=9.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="RabbitMQ.Client, Version=4.0.0.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce, processorArchitecture=MSIL">
<HintPath>..\packages\RabbitMQ.Client.4.1.1\lib\net451\RabbitMQ.Client.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" /> <Reference Include="System" />
<Reference Include="System.Core" /> <Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" /> <Reference Include="System.Xml.Linq" />
@ -42,14 +49,27 @@
<Reference Include="System.Xml" /> <Reference Include="System.Xml" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="ISaga.cs" /> <Compile Include="Annotations\ContinuationAttribute.cs" />
<Compile Include="ISagaProvider.cs" /> <Compile Include="Annotations\RequestAttribute.cs" />
<Compile Include="ISagaStore.cs" /> <Compile Include="ContextItems.cs" />
<Compile Include="Default\FlowBindingFilter.cs" />
<Compile Include="Default\FlowBindingMiddleware.cs" />
<Compile Include="Default\FlowContext.cs" />
<Compile Include="Default\FlowMessageMiddleware.cs" />
<Compile Include="Default\FlowState.cs" />
<Compile Include="Default\NonPersistentFlowRepository.cs" />
<Compile Include="Default\DelegateYieldPoint.cs" />
<Compile Include="ConfigExtensions.cs" />
<Compile Include="FlowHelpers\MethodSerializer.cs" />
<Compile Include="FlowMiddleware.cs" />
<Compile Include="Default\FlowStore.cs" />
<Compile Include="Default\FlowProvider.cs" />
<Compile Include="IFlowRepository.cs" />
<Compile Include="IFlowStore.cs" />
<Compile Include="IFlowProvider.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="SagaMemoryStore.cs" /> <Compile Include="ResponseExpectedException.cs" />
<Compile Include="SagaMiddleware.cs" /> <Compile Include="YieldPointException.cs" />
<Compile Include="SagaProvider.cs" />
<Compile Include="SagaExtensions.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Tapeti.csproj"> <ProjectReference Include="..\Tapeti.csproj">
@ -57,6 +77,9 @@
<Name>Tapeti</Name> <Name>Tapeti</Name>
</ProjectReference> </ProjectReference>
</ItemGroup> </ItemGroup>
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it. <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets. Other similar extension points exist, see Microsoft.Common.targets.

View File

@ -0,0 +1,10 @@
using System;
namespace Tapeti.Flow
{
public class YieldPointException : Exception
{
public YieldPointException(string message) : base(message) { }
public YieldPointException(string message, Exception innerException) : base(message, innerException) { }
}
}

View File

@ -0,0 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Newtonsoft.Json" version="9.0.1" targetFramework="net461" />
<package id="RabbitMQ.Client" version="4.1.1" targetFramework="net461" />
</packages>

View File

@ -1,13 +0,0 @@
using System;
namespace Tapeti.Saga
{
public interface ISaga<out T> : IDisposable where T : class
{
string Id { get; }
T State { get; }
void ExpectResponse(string callId);
void ResolveResponse(string callId);
}
}

View File

@ -1,10 +0,0 @@
using System.Threading.Tasks;
namespace Tapeti.Saga
{
public interface ISagaProvider
{
Task<ISaga<T>> Begin<T>(T initialState) where T : class;
Task<ISaga<T>> Continue<T>(string sagaId) where T : class;
}
}

View File

@ -1,10 +0,0 @@
using System.Threading.Tasks;
namespace Tapeti.Saga
{
public interface ISagaStore
{
Task<object> Read(string sagaId);
Task Update(string sagaId, object state);
}
}

View File

@ -1,16 +0,0 @@
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;
namespace Tapeti.Saga
{
public static class SagaExtensions
{
public static Task Publish<T>(this IPublisher publisher, object message, ISaga<T> saga) where T : class
{
return ((IAdvancedPublisher)publisher).Publish(message, new BasicProperties
{
CorrelationId = saga.Id
});
}
}
}

View File

@ -1,43 +0,0 @@
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Tapeti.Saga
{
public class SagaMemoryStore : ISagaStore
{
private ISagaStore decoratedStore;
private readonly Dictionary<string, object> values = new Dictionary<string, object>();
// Not a constructor to allow standard injection to work when using only the MemoryStore
public static SagaMemoryStore AsCacheFor(ISagaStore store)
{
return new SagaMemoryStore
{
decoratedStore = store
};
}
public async Task<object> Read(string sagaId)
{
object value;
// ReSharper disable once InvertIf
if (!values.TryGetValue(sagaId, out value) && decoratedStore != null)
{
value = await decoratedStore.Read(sagaId);
values.Add(sagaId, value);
}
return value;
}
public async Task Update(string sagaId, object state)
{
values[sagaId] = state;
if (decoratedStore != null)
await decoratedStore.Update(sagaId, state);
}
}
}

View File

@ -1,70 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Saga
{
public class SagaMiddleware : IMiddlewareBundle
{
private const string SagaContextKey = "Saga";
public IEnumerable<object> GetContents(IDependencyResolver dependencyResolver)
{
(dependencyResolver as IDependencyInjector)?.RegisterDefault<ISagaProvider, SagaProvider>();
yield return new SagaBindingMiddleware();
}
protected class SagaBindingMiddleware : IBindingMiddleware
{
public void Handle(IBindingContext context, Action next)
{
var registered = false;
foreach (var parameter in context.Parameters.Where(p =>
p.Info.ParameterType.IsGenericType &&
p.Info.ParameterType.GetGenericTypeDefinition() == typeof(ISaga<>)))
{
if (!registered)
{
var sagaType = parameter.Info.ParameterType.GetGenericArguments()[0];
var middlewareType = typeof(SagaMessageMiddleware<>).MakeGenericType(sagaType);
context.Use(Activator.CreateInstance(middlewareType) as IMessageMiddleware);
registered = true;
}
parameter.SetBinding(messageContext =>
{
object saga;
return messageContext.Items.TryGetValue(SagaContextKey, out saga) ? saga : null;
});
}
next();
}
}
protected class SagaMessageMiddleware<T> : IMessageMiddleware where T : class
{
public async Task Handle(IMessageContext context, Func<Task> next)
{
if (string.IsNullOrEmpty(context.Properties.CorrelationId))
return;
var saga = await context.DependencyResolver.Resolve<ISagaProvider>().Continue<T>(context.Properties.CorrelationId);
if (saga == null)
return;
context.Items[SagaContextKey] = saga;
await next();
}
}
}
}

View File

@ -1,81 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Tapeti.Saga
{
public class SagaProvider : ISagaProvider
{
protected static readonly ConcurrentDictionary<string, SemaphoreSlim> SagaLocks = new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly ISagaStore store;
public SagaProvider(ISagaStore store)
{
this.store = store;
}
public async Task<ISaga<T>> Begin<T>(T initialState) where T : class
{
var saga = await Saga<T>.Create(() => Task.FromResult(initialState));
await store.Update(saga.Id, saga.State);
return saga;
}
public async Task<ISaga<T>> Continue<T>(string sagaId) where T : class
{
return await Saga<T>.Create(async () => await store.Read(sagaId) as T, sagaId);
}
protected class Saga<T> : ISaga<T> where T : class
{
private bool disposed;
public string Id { get; set; }
public T State { get; set; }
public static async Task<Saga<T>> Create(Func<Task<T>> getState, string id = null)
{
var sagaId = id ?? Guid.NewGuid().ToString();
await SagaLocks.GetOrAdd(sagaId, new SemaphoreSlim(1)).WaitAsync();
var saga = new Saga<T>
{
Id = sagaId,
State = await getState()
};
return saga;
}
public void Dispose()
{
if (disposed)
return;
SemaphoreSlim semaphore;
if (SagaLocks.TryGetValue(Id, out semaphore))
semaphore.Release();
disposed = true;
}
public void ExpectResponse(string callId)
{
throw new NotImplementedException();
}
public void ResolveResponse(string callId)
{
throw new NotImplementedException();
}
}
}
}

View File

@ -4,7 +4,7 @@ using SimpleInjector;
namespace Tapeti.SimpleInjector namespace Tapeti.SimpleInjector
{ {
public class SimpleInjectorDependencyResolver : IDependencyInjector public class SimpleInjectorDependencyResolver : IDependencyContainer
{ {
private readonly Container container; private readonly Container container;
@ -26,23 +26,44 @@ namespace Tapeti.SimpleInjector
public void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService public void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService
{ {
// ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates if (CanRegisterDefault<TService>())
if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(TService)))
container.Register<TService, TImplementation>(); container.Register<TService, TImplementation>();
} }
public void RegisterDefault<TService>(Func<TService> factory) where TService : class
public void RegisterPublisher(Func<IPublisher> publisher)
{ {
// ReSharper disable once SimplifyLinqExpression - still not a fan of negative predicates if (CanRegisterDefault<TService>())
if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(IPublisher))) container.Register(factory);
container.Register(publisher);
} }
public void RegisterDefaultSingleton<TService, TImplementation>() where TService : class where TImplementation : class, TService
{
if (CanRegisterDefault<TService>())
container.RegisterSingleton<TService, TImplementation>();
}
public void RegisterDefaultSingleton<TService>(TService instance) where TService : class
{
if (CanRegisterDefault<TService>())
container.RegisterSingleton(instance);
}
public void RegisterDefaultSingleton<TService>(Func<TService> factory) where TService : class
{
if (CanRegisterDefault<TService>())
container.RegisterSingleton(factory);
}
public void RegisterController(Type type) public void RegisterController(Type type)
{ {
container.Register(type); container.Register(type);
} }
private bool CanRegisterDefault<TService>() where TService : class
{
// ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates
return !container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(TService));
}
} }
} }

View File

@ -53,15 +53,24 @@
<Compile Include="Annotations\MessageControllerAttribute.cs" /> <Compile Include="Annotations\MessageControllerAttribute.cs" />
<Compile Include="Annotations\StaticQueueAttribute.cs" /> <Compile Include="Annotations\StaticQueueAttribute.cs" />
<Compile Include="Annotations\DynamicQueueAttribute.cs" /> <Compile Include="Annotations\DynamicQueueAttribute.cs" />
<Compile Include="Config\IBindingFilter.cs" />
<Compile Include="Connection\TapetiConsumer.cs" /> <Compile Include="Connection\TapetiConsumer.cs" />
<Compile Include="Connection\TapetiPublisher.cs" /> <Compile Include="Connection\TapetiPublisher.cs" />
<Compile Include="Connection\TapetiSubscriber.cs" /> <Compile Include="Connection\TapetiSubscriber.cs" />
<Compile Include="Connection\TapetiWorker.cs" /> <Compile Include="Connection\TapetiWorker.cs" />
<Compile Include="Default\ConsoleLogger.cs" /> <Compile Include="Default\ConsoleLogger.cs" />
<Compile Include="Default\DevNullLogger.cs" /> <Compile Include="Default\DevNullLogger.cs" />
<Compile Include="Default\JsonMessageSerializer.cs" />
<Compile Include="Default\PublishResultBinding.cs" />
<Compile Include="Default\NamespaceMatchExchangeStrategy.cs" />
<Compile Include="Default\RequeueExceptionStrategy.cs" />
<Compile Include="Default\TypeNameRoutingKeyStrategy.cs" />
<Compile Include="Helpers\ConsoleHelper.cs" /> <Compile Include="Helpers\ConsoleHelper.cs" />
<Compile Include="Helpers\MiddlewareHelper.cs" /> <Compile Include="Helpers\MiddlewareHelper.cs" />
<Compile Include="Helpers\TaskTypeHelper.cs" />
<Compile Include="IConnection.cs" /> <Compile Include="IConnection.cs" />
<Compile Include="IExceptionStrategy.cs" />
<Compile Include="IExchangeStrategy.cs" />
<Compile Include="ILogger.cs" /> <Compile Include="ILogger.cs" />
<Compile Include="Config\IMessageContext.cs" /> <Compile Include="Config\IMessageContext.cs" />
<Compile Include="Config\IMessageMiddleware.cs" /> <Compile Include="Config\IMessageMiddleware.cs" />
@ -71,10 +80,8 @@
<Compile Include="Config\IBindingMiddleware.cs" /> <Compile Include="Config\IBindingMiddleware.cs" />
<Compile Include="TapetiConnectionParams.cs" /> <Compile Include="TapetiConnectionParams.cs" />
<Compile Include="TapetiConfig.cs" /> <Compile Include="TapetiConfig.cs" />
<Compile Include="TapetiTypes.cs" /> <Compile Include="ConsumeResponse.cs" />
<Compile Include="Tasks\SingleThreadTaskQueue.cs" /> <Compile Include="Tasks\SingleThreadTaskQueue.cs" />
<Compile Include="Default\DefaultMessageSerializer.cs" />
<Compile Include="Default\DefaultRoutingKeyStrategy.cs" />
<Compile Include="IDependencyResolver.cs" /> <Compile Include="IDependencyResolver.cs" />
<Compile Include="IMessageSerializer.cs" /> <Compile Include="IMessageSerializer.cs" />
<Compile Include="IPublisher.cs" /> <Compile Include="IPublisher.cs" />

View File

@ -9,7 +9,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.SimpleInjector", "Ta
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Test", "Test\Test.csproj", "{90559950-1B32-4119-A78E-517E2C71EE23}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Test", "Test\Test.csproj", "{90559950-1B32-4119-A78E-517E2C71EE23}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Saga", "Tapeti.Saga\Tapeti.Saga.csproj", "{F84AD920-D5A1-455D-AED5-2542B3A47B85}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Flow", "Tapeti.Flow\Tapeti.Flow.csproj", "{F84AD920-D5A1-455D-AED5-2542B3A47B85}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution

View File

@ -16,7 +16,7 @@ namespace Tapeti
public TopologyConfigurationException(string message) : base(message) { } public TopologyConfigurationException(string message) : base(message) { }
} }
public delegate Task<object> MessageHandlerFunc(IMessageContext context, object message); public delegate Task MessageHandlerFunc(IMessageContext context, object message);
public class TapetiConfig public class TapetiConfig
@ -27,33 +27,22 @@ namespace Tapeti
private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>(); private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>();
private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>(); private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>();
private readonly string exchange;
private readonly IDependencyResolver dependencyResolver; private readonly IDependencyResolver dependencyResolver;
public TapetiConfig(string exchange, IDependencyResolver dependencyResolver) public TapetiConfig(IDependencyResolver dependencyResolver)
{ {
this.exchange = exchange;
this.dependencyResolver = dependencyResolver; this.dependencyResolver = dependencyResolver;
Use(new DependencyResolverBinding(dependencyResolver)); Use(new DependencyResolverBinding());
Use(new MessageBinding()); Use(new MessageBinding());
Use(new PublishResultBinding());
} }
public IConfig Build() public IConfig Build()
{ {
var dependencyInjector = dependencyResolver as IDependencyInjector; RegisterDefaults();
if (dependencyInjector != null)
{
if (ConsoleHelper.IsAvailable())
dependencyInjector.RegisterDefault<ILogger, ConsoleLogger>();
else
dependencyInjector.RegisterDefault<ILogger, DevNullLogger>();
dependencyInjector.RegisterDefault<IMessageSerializer, DefaultMessageSerializer>();
dependencyInjector.RegisterDefault<IRoutingKeyStrategy, DefaultRoutingKeyStrategy>();
}
var queues = new List<IQueue>(); var queues = new List<IQueue>();
queues.AddRange(staticRegistrations.Select(qb => new Queue(new QueueInfo { Dynamic = false, Name = qb.Key }, qb.Value))); queues.AddRange(staticRegistrations.Select(qb => new Queue(new QueueInfo { Dynamic = false, Name = qb.Key }, qb.Value)));
@ -72,7 +61,10 @@ namespace Tapeti
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl))); queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl)));
return new Config(exchange, dependencyResolver, messageMiddleware, queues); var config = new Config(dependencyResolver, messageMiddleware, queues);
(dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<IConfig>(config);
return config;
} }
@ -107,6 +99,24 @@ namespace Tapeti
} }
public void RegisterDefaults()
{
var container = dependencyResolver as IDependencyContainer;
if (container == null)
return;
if (ConsoleHelper.IsAvailable())
container.RegisterDefault<ILogger, ConsoleLogger>();
else
container.RegisterDefault<ILogger, DevNullLogger>();
container.RegisterDefault<IMessageSerializer, JsonMessageSerializer>();
container.RegisterDefault<IExchangeStrategy, NamespaceMatchExchangeStrategy>();
container.RegisterDefault<IRoutingKeyStrategy, TypeNameRoutingKeyStrategy>();
container.RegisterDefault<IExceptionStrategy, RequeueExceptionStrategy>();
}
public TapetiConfig RegisterController(Type controller) public TapetiConfig RegisterController(Type controller)
{ {
var controllerQueueInfo = GetQueueInfo(controller); var controllerQueueInfo = GetQueueInfo(controller);
@ -119,7 +129,7 @@ namespace Tapeti
if (!methodQueueInfo.IsValid) if (!methodQueueInfo.IsValid)
throw new TopologyConfigurationException($"Method {method.Name} or controller {controller.Name} requires a queue attribute"); throw new TopologyConfigurationException($"Method {method.Name} or controller {controller.Name} requires a queue attribute");
var context = new BindingContext(method.GetParameters().Select(p => new BindingParameter(p)).ToList()); var context = new BindingContext(method);
var messageHandler = GetMessageHandler(context, method); var messageHandler = GetMessageHandler(context, method);
var handlerInfo = new Binding var handlerInfo = new Binding
@ -129,7 +139,8 @@ namespace Tapeti
QueueInfo = methodQueueInfo, QueueInfo = methodQueueInfo,
MessageClass = context.MessageClass, MessageClass = context.MessageClass,
MessageHandler = messageHandler, MessageHandler = messageHandler,
MessageMiddleware = context.MessageMiddleware MessageMiddleware = context.MessageMiddleware,
BindingFilters = context.BindingFilters
}; };
if (methodQueueInfo.Dynamic.GetValueOrDefault()) if (methodQueueInfo.Dynamic.GetValueOrDefault())
@ -167,61 +178,59 @@ namespace Tapeti
var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList(); var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList();
// ReSharper disable once InvertIf - doesn't make the flow clearer imo // ReSharper disable once InvertIf
if (invalidBindings.Count > 0) if (invalidBindings.Count > 0)
{ {
var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name)); var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name));
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}"); throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}");
} }
return WrapMethod(method, context.Parameters.Select(p => ((IBindingParameterAccess)p).GetBinding())); var resultHandler = ((IBindingResultAccess) context.Result).GetHandler();
return WrapMethod(method, context.Parameters.Select(p => ((IBindingParameterAccess)p).GetBinding()), resultHandler);
} }
protected MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameters) protected MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameters, ResultHandler resultHandler)
{ {
if (resultHandler != null)
return WrapResultHandlerMethod(method, parameters, resultHandler);
if (method.ReturnType == typeof(void)) if (method.ReturnType == typeof(void))
return WrapNullMethod(method, parameters); return WrapNullMethod(method, parameters);
if (method.ReturnType == typeof(Task)) if (method.ReturnType == typeof(Task))
return WrapTaskMethod(method, parameters); return WrapTaskMethod(method, parameters);
if (method.ReturnType == typeof(Task<>)) if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
{
var genericArguments = method.GetGenericArguments();
if (genericArguments.Length != 1)
throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} must have exactly one generic argument to Task<>");
if (!genericArguments[0].IsClass)
throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} must have an object generic argument to Task<>");
return WrapGenericTaskMethod(method, parameters); return WrapGenericTaskMethod(method, parameters);
}
if (method.ReturnType.IsClass) return WrapObjectMethod(method, parameters);
return WrapObjectMethod(method, parameters);
throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} has an invalid return type");
} }
protected MessageHandlerFunc WrapResultHandlerMethod(MethodInfo method, IEnumerable<ValueFactory> parameters, ResultHandler resultHandler)
{
return (context, message) =>
{
var result = method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
return resultHandler(context, result);
};
}
protected MessageHandlerFunc WrapNullMethod(MethodInfo method, IEnumerable<ValueFactory> parameters) protected MessageHandlerFunc WrapNullMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
{ {
return (context, message) => return (context, message) =>
{ {
method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()); method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
return Task.FromResult<object>(null); return Task.CompletedTask;
}; };
} }
protected MessageHandlerFunc WrapTaskMethod(MethodInfo method, IEnumerable<ValueFactory> parameters) protected MessageHandlerFunc WrapTaskMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
{ {
return async (context, message) => return (context, message) => (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
{
await (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
return Task.FromResult<object>(null);
};
} }
@ -250,6 +259,8 @@ namespace Tapeti
var existing = staticRegistrations[binding.QueueInfo.Name]; var existing = staticRegistrations[binding.QueueInfo.Name];
// Technically we could easily do multicasting, but it complicates exception handling and requeueing // Technically we could easily do multicasting, but it complicates exception handling and requeueing
// TODO allow multiple, if there is a filter which guarantees uniqueness
// TODO move to independant validation middleware
if (existing.Any(h => h.MessageClass == binding.MessageClass)) if (existing.Any(h => h.MessageClass == binding.MessageClass))
throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}"); throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}");
@ -298,18 +309,27 @@ namespace Tapeti
protected class Config : IConfig protected class Config : IConfig
{ {
public string Exchange { get; }
public IDependencyResolver DependencyResolver { get; } public IDependencyResolver DependencyResolver { get; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; } public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
public IEnumerable<IQueue> Queues { get; } public IEnumerable<IQueue> Queues { get; }
private readonly Dictionary<MethodInfo, IBinding> bindingMethodLookup;
public Config(string exchange, IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware, IEnumerable<IQueue> queues)
public Config(IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware, IEnumerable<IQueue> queues)
{ {
Exchange = exchange;
DependencyResolver = dependencyResolver; DependencyResolver = dependencyResolver;
MessageMiddleware = messageMiddleware; MessageMiddleware = messageMiddleware;
Queues = queues; Queues = queues.ToList();
bindingMethodLookup = Queues.SelectMany(q => q.Bindings).ToDictionary(b => b.Method, b => b);
}
public IBinding GetBinding(Delegate method)
{
IBinding binding;
return bindingMethodLookup.TryGetValue(method.Method, out binding) ? binding : null;
} }
} }
@ -330,25 +350,55 @@ namespace Tapeti
} }
protected class Binding : IBinding protected class Binding : IDynamicQueueBinding
{ {
public Type Controller { get; set; } public Type Controller { get; set; }
public MethodInfo Method { get; set; } public MethodInfo Method { get; set; }
public Type MessageClass { get; set; } public Type MessageClass { get; set; }
public string QueueName { get; set; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; set; } public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; set; }
public IReadOnlyList<IBindingFilter> BindingFilters { get; set; }
private QueueInfo queueInfo;
public QueueInfo QueueInfo
{
get { return queueInfo; }
set
{
QueueName = (value?.Dynamic).GetValueOrDefault() ? value?.Name : null;
queueInfo = value;
}
}
public QueueInfo QueueInfo { get; set; }
public MessageHandlerFunc MessageHandler { get; set; } public MessageHandlerFunc MessageHandler { get; set; }
public bool Accept(object message) public void SetQueueName(string queueName)
{ {
return message.GetType() == MessageClass; QueueName = queueName;
} }
public Task<object> Invoke(IMessageContext context, object message) public async Task<bool> Accept(IMessageContext context, object message)
{
if (message.GetType() != MessageClass)
return false;
if (BindingFilters == null)
return true;
foreach (var filter in BindingFilters)
{
if (!await filter.Accept(context, this))
return false;
}
return true;
}
public Task Invoke(IMessageContext context, object message)
{ {
return MessageHandler(context, message); return MessageHandler(context, message);
} }
@ -361,18 +411,34 @@ namespace Tapeti
} }
internal interface IBindingResultAccess
{
ResultHandler GetHandler();
}
internal class BindingContext : IBindingContext internal class BindingContext : IBindingContext
{ {
private List<IMessageMiddleware> messageMiddleware; private List<IMessageMiddleware> messageMiddleware;
private List<IBindingFilter> bindingFilters;
public Type MessageClass { get; set; } public Type MessageClass { get; set; }
public MethodInfo Method { get; }
public IReadOnlyList<IBindingParameter> Parameters { get; } public IReadOnlyList<IBindingParameter> Parameters { get; }
public IBindingResult Result { get; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware => messageMiddleware; public IReadOnlyList<IMessageMiddleware> MessageMiddleware => messageMiddleware;
public IReadOnlyList<IBindingFilter> BindingFilters => bindingFilters;
public BindingContext(IReadOnlyList<IBindingParameter> parameters) public BindingContext(MethodInfo method)
{ {
Parameters = parameters; Method = method;
Parameters = method.GetParameters().Select(p => new BindingParameter(p)).ToList();
Result = new BindingResult(method.ReturnParameter);
} }
@ -383,6 +449,15 @@ namespace Tapeti
messageMiddleware.Add(middleware); messageMiddleware.Add(middleware);
} }
public void Use(IBindingFilter filter)
{
if (bindingFilters == null)
bindingFilters = new List<IBindingFilter>();
bindingFilters.Add(filter);
}
} }
@ -410,5 +485,31 @@ namespace Tapeti
binding = valueFactory; binding = valueFactory;
} }
} }
internal class BindingResult : IBindingResult, IBindingResultAccess
{
private ResultHandler handler;
public ParameterInfo Info { get; }
public bool HasHandler => handler != null;
public BindingResult(ParameterInfo parameter)
{
Info = parameter;
}
public ResultHandler GetHandler()
{
return handler;
}
public void SetHandler(ResultHandler resultHandler)
{
handler = resultHandler;
}
}
} }
} }

View File

@ -16,12 +16,11 @@ namespace Tapeti
public TapetiConnection(IConfig config) public TapetiConnection(IConfig config)
{ {
this.config = config; this.config = config;
(config.DependencyResolver as IDependencyInjector)?.RegisterPublisher(GetPublisher); (config.DependencyResolver as IDependencyContainer)?.RegisterDefault(GetPublisher);
worker = new Lazy<TapetiWorker>(() => new TapetiWorker(config.DependencyResolver, config.MessageMiddleware) worker = new Lazy<TapetiWorker>(() => new TapetiWorker(config.DependencyResolver, config.MessageMiddleware)
{ {
ConnectionParams = Params ?? new TapetiConnectionParams(), ConnectionParams = Params ?? new TapetiConnectionParams()
Exchange = config.Exchange
}); });
} }

View File

@ -10,6 +10,13 @@ namespace Tapeti
public string Username { get; set; } = "guest"; public string Username { get; set; } = "guest";
public string Password { get; set; } = "guest"; public string Password { get; set; } = "guest";
/// <summary>
/// The amount of message to prefetch. See http://www.rabbitmq.com/consumer-prefetch.html for more information.
///
/// If set to 0, no limit will be applied.
/// </summary>
public ushort PrefetchCount { get; set; } = 50;
public TapetiConnectionParams() public TapetiConnectionParams()
{ {

View File

@ -1,7 +1,9 @@
using System.Threading.Tasks; using System;
using System.Threading.Tasks;
using Tapeti; using Tapeti;
using Tapeti.Annotations; using Tapeti.Annotations;
using Tapeti.Saga; using Tapeti.Flow;
using Tapeti.Flow.Annotations;
namespace Test namespace Test
{ {
@ -9,74 +11,78 @@ namespace Test
public class MarcoController : MessageController public class MarcoController : MessageController
{ {
private readonly IPublisher publisher; private readonly IPublisher publisher;
private readonly ISagaProvider sagaProvider; private readonly IFlowProvider flowProvider;
private readonly Visualizer visualizer;
// Public properties are automatically stored and retrieved while in a flow
public Guid StateTestGuid;
public MarcoController(IPublisher publisher, ISagaProvider sagaProvider) public MarcoController(IPublisher publisher, IFlowProvider flowProvider, Visualizer visualizer)
{ {
this.publisher = publisher; this.publisher = publisher;
this.sagaProvider = sagaProvider; this.flowProvider = flowProvider;
this.visualizer = visualizer;
} }
/* /**
* For simple request response patterns, the return type can also be used: * The Visualizer could've been injected through the constructor, which is
* the recommended way. Just testing the injection middleware here.
public async Task<PoloMessage> Marco(MarcoMessage message, Visualizer visualizer) */
public async Task<IYieldPoint> Marco(MarcoMessage message, Visualizer myVisualizer)
{ {
visualizer.VisualizeMarco(); Console.WriteLine(">> Marco (yielding with request)");
return new PoloMessage(); ;
await myVisualizer.VisualizeMarco();
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(
new PoloConfirmationRequestMessage()
{
StoredInState = StateTestGuid
},
HandlePoloConfirmationResponse);
} }
*/
// Visualizer can also be constructor injected, just proving a point here...
public async Task Marco(MarcoMessage message, Visualizer visualizer) [Continuation]
public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage message)
{ {
visualizer.VisualizeMarco(); Console.WriteLine(">> HandlePoloConfirmationResponse (ending flow)");
using (var saga = await sagaProvider.Begin(new MarcoPoloSaga())) Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!");
// This should error, as MarcoMessage expects a PoloMessage as a response
return flowProvider.EndWithResponse(new PoloMessage());
}
/**
* For simple request response patterns, the return type can be used.
* This will automatically include the correlationId in the response and
* use the replyTo header of the request if provided.
*/
public PoloConfirmationResponseMessage PoloConfirmation(PoloConfirmationRequestMessage message)
{
Console.WriteLine(">> PoloConfirmation (returning confirmation)");
return new PoloConfirmationResponseMessage
{ {
// TODO provide publish extension with Saga support ShouldMatchState = message.StoredInState
await publisher.Publish(new PoloMessage(), saga); };
}
} }
public void Polo(PoloMessage message, Visualizer visualizer, ISaga<MarcoPoloSaga> saga)
{
if (saga.State.ReceivedPolo)
return;
saga.State.ReceivedPolo = true; public void Polo(PoloMessage message)
visualizer.VisualizePolo(); {
Console.WriteLine(">> Polo");
StateTestGuid = Guid.NewGuid();
} }
/*
[CallID("eerste")]
Implicit:
using (sagaProvider.Continue(correlatieID))
{
saga refcount--;
public void PoloColorResponse1(PoloColorResponse message, ISaga<MarcoState> saga)
{
saga.State == MarcoState
state.Color = message.Color;
if (state.Complete)
{
publisher.Publish(new PoloMessage());
}
}
*/
} }
[Request(Response = typeof(PoloMessage))]
public class MarcoMessage public class MarcoMessage
{ {
} }
@ -87,8 +93,15 @@ namespace Test
} }
public class MarcoPoloSaga [Request(Response = typeof(PoloConfirmationResponseMessage))]
public class PoloConfirmationRequestMessage
{ {
public bool ReceivedPolo; public Guid StoredInState { get; set; }
}
public class PoloConfirmationResponseMessage
{
public Guid ShouldMatchState { get; set; }
} }
} }

View File

@ -17,20 +17,33 @@ namespace Test
public async Task Run() public async Task Run()
{ {
await publisher.Publish(new MarcoMessage());
/*
var concurrent = new SemaphoreSlim(20); var concurrent = new SemaphoreSlim(20);
//for (var x = 0; x < 5000; x++)
while (true) while (true)
{ {
await concurrent.WaitAsync(); for (var x = 0; x < 200; x++)
try
{ {
await publisher.Publish(new MarcoMessage()); await concurrent.WaitAsync();
} try
finally {
{ await publisher.Publish(new MarcoMessage());
concurrent.Release(); }
finally
{
concurrent.Release();
}
} }
await Task.Delay(1000);
}
*/
while (true)
{
await Task.Delay(1000);
} }
} }
} }

View File

@ -1,7 +1,7 @@
using System; using System;
using SimpleInjector; using SimpleInjector;
using Tapeti; using Tapeti;
using Tapeti.Saga; using Tapeti.Flow;
using Tapeti.SimpleInjector; using Tapeti.SimpleInjector;
namespace Test namespace Test
@ -10,17 +10,26 @@ namespace Test
{ {
private static void Main() private static void Main()
{ {
// TODO SQL based flow store
// TODO logging
var container = new Container(); var container = new Container();
container.Register<MarcoEmitter>(); container.Register<MarcoEmitter>();
container.Register<Visualizer>(); container.Register<Visualizer>();
container.RegisterSingleton<ISagaStore, SagaMemoryStore>();
var config = new TapetiConfig("test", new SimpleInjectorDependencyResolver(container)) var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
.Use(new SagaMiddleware()) .WithFlow()
.RegisterAllControllers() .RegisterAllControllers()
.Build(); .Build();
using (var connection = new TapetiConnection(config)) using (var connection = new TapetiConnection(config)
{
Params = new TapetiConnectionParams
{
HostName = "localhost",
PrefetchCount = 200
}
})
{ {
Console.WriteLine("Subscribing..."); Console.WriteLine("Subscribing...");
connection.Subscribe().Wait(); connection.Subscribe().Wait();

View File

@ -7,9 +7,9 @@ using System.Runtime.InteropServices;
[assembly: AssemblyTitle("Test")] [assembly: AssemblyTitle("Test")]
[assembly: AssemblyDescription("")] [assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")] [assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Hewlett-Packard Company")] [assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Test")] [assembly: AssemblyProduct("Test")]
[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] [assembly: AssemblyCopyright("")]
[assembly: AssemblyTrademark("")] [assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")] [assembly: AssemblyCulture("")]

View File

@ -63,9 +63,9 @@
<Project>{8ab4fd33-4aaa-465c-8579-9db3f3b23813}</Project> <Project>{8ab4fd33-4aaa-465c-8579-9db3f3b23813}</Project>
<Name>Tapeti</Name> <Name>Tapeti</Name>
</ProjectReference> </ProjectReference>
<ProjectReference Include="..\Tapeti.Saga\Tapeti.Saga.csproj"> <ProjectReference Include="..\Tapeti.Flow\Tapeti.Flow.csproj">
<Project>{f84ad920-d5a1-455d-aed5-2542b3a47b85}</Project> <Project>{f84ad920-d5a1-455d-aed5-2542b3a47b85}</Project>
<Name>Tapeti.Saga</Name> <Name>Tapeti.Flow</Name>
</ProjectReference> </ProjectReference>
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj"> <ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj">
<Project>{d7ec6f86-eb3b-49c3-8fe7-6e8c1bb413a6}</Project> <Project>{d7ec6f86-eb3b-49c3-8fe7-6e8c1bb413a6}</Project>

View File

@ -1,17 +1,20 @@
using System; using System;
using System.Threading.Tasks;
namespace Test namespace Test
{ {
public class Visualizer public class Visualizer
{ {
public void VisualizeMarco() public Task VisualizeMarco()
{ {
Console.WriteLine("Marco!"); Console.WriteLine("Marco!");
return Task.CompletedTask;
} }
public void VisualizePolo() public Task VisualizePolo()
{ {
Console.WriteLine("Polo!"); Console.WriteLine("Polo!");
return Task.CompletedTask;
} }
} }
} }