diff --git a/.gitignore b/.gitignore index e758713..193e5b0 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ bin/ obj/ packages/ +*.user diff --git a/Config/IBindingContext.cs b/Config/IBindingContext.cs index 48818e7..d45d42e 100644 --- a/Config/IBindingContext.cs +++ b/Config/IBindingContext.cs @@ -1,17 +1,23 @@ using System; using System.Collections.Generic; using System.Reflection; +using System.Threading.Tasks; namespace Tapeti.Config { public delegate object ValueFactory(IMessageContext context); + public delegate Task ResultHandler(IMessageContext context, object value); public interface IBindingContext { Type MessageClass { get; set; } - IReadOnlyList Parameters { get; } + MethodInfo Method { get; } + IReadOnlyList Parameters { get; } + IBindingResult Result { get; } + + void Use(IBindingFilter filter); void Use(IMessageMiddleware middleware); } @@ -23,4 +29,13 @@ namespace Tapeti.Config void SetBinding(ValueFactory valueFactory); } + + + public interface IBindingResult + { + ParameterInfo Info { get; } + bool HasHandler { get; } + + void SetHandler(ResultHandler resultHandler); + } } diff --git a/Config/IBindingFilter.cs b/Config/IBindingFilter.cs new file mode 100644 index 0000000..1e2259a --- /dev/null +++ b/Config/IBindingFilter.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface IBindingFilter + { + Task Accept(IMessageContext context, IBinding binding); + } +} diff --git a/Config/IConfig.cs b/Config/IConfig.cs index 30fc3ba..ef68921 100644 --- a/Config/IConfig.cs +++ b/Config/IConfig.cs @@ -7,10 +7,11 @@ namespace Tapeti.Config { public interface IConfig { - string Exchange { get; } IDependencyResolver DependencyResolver { get; } IReadOnlyList MessageMiddleware { get; } IEnumerable Queues { get; } + + IBinding GetBinding(Delegate method); } @@ -28,10 +29,18 @@ namespace Tapeti.Config Type Controller { get; } MethodInfo Method { get; } Type MessageClass { get; } + string QueueName { get; } IReadOnlyList MessageMiddleware { get; } + IReadOnlyList BindingFilters { get; } - bool Accept(object message); - Task Invoke(IMessageContext context, object message); + Task Accept(IMessageContext context, object message); + Task Invoke(IMessageContext context, object message); + } + + + public interface IDynamicQueueBinding : IBinding + { + void SetQueueName(string queueName); } } diff --git a/Config/IMessageContext.cs b/Config/IMessageContext.cs index 0a57507..bdda93b 100644 --- a/Config/IMessageContext.cs +++ b/Config/IMessageContext.cs @@ -1,16 +1,29 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; +using System.Reflection; using RabbitMQ.Client; namespace Tapeti.Config { - public interface IMessageContext + public interface IMessageContext : IDisposable { IDependencyResolver DependencyResolver { get; } - object Controller { get; } + string Queue { get; } + string RoutingKey { get; } object Message { get; } IBasicProperties Properties { get; } IDictionary Items { get; } + + /// + /// Controller will be null when passed to an IBindingFilter + /// + object Controller { get; } + + /// + /// Binding will be null when passed to an IBindingFilter + /// + IBinding Binding { get; } } } diff --git a/Connection/TapetiConsumer.cs b/Connection/TapetiConsumer.cs index f5e2bc2..5375034 100644 --- a/Connection/TapetiConsumer.cs +++ b/Connection/TapetiConsumer.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; using RabbitMQ.Client; using Tapeti.Config; using Tapeti.Helpers; @@ -11,17 +10,22 @@ namespace Tapeti.Connection public class TapetiConsumer : DefaultBasicConsumer { private readonly TapetiWorker worker; + private readonly string queueName; private readonly IDependencyResolver dependencyResolver; private readonly IReadOnlyList messageMiddleware; private readonly List bindings; + private readonly IExceptionStrategy exceptionStrategy; - public TapetiConsumer(TapetiWorker worker, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware) + public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware) { this.worker = worker; + this.queueName = queueName; this.dependencyResolver = dependencyResolver; this.messageMiddleware = messageMiddleware; this.bindings = bindings.ToList(); + + exceptionStrategy = dependencyResolver.Resolve(); } @@ -35,51 +39,91 @@ namespace Tapeti.Connection throw new ArgumentException("Empty message"); var validMessageType = false; - foreach (var binding in bindings.Where(b => b.Accept(message))) + + using (var context = new MessageContext { - var context = new MessageContext + DependencyResolver = dependencyResolver, + Queue = queueName, + RoutingKey = routingKey, + Message = message, + Properties = properties + }) + { + try { - DependencyResolver = dependencyResolver, - Controller = dependencyResolver.Resolve(binding.Controller), - Message = message, - Properties = properties - }; - - MiddlewareHelper.GoAsync(binding.MessageMiddleware != null ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() : messageMiddleware, - async (handler, next) => await handler.Handle(context, next), - async () => + foreach (var binding in bindings) { - var result = binding.Invoke(context, message).Result; - if (result != null) - await worker.Publish(result, null); + if (!binding.Accept(context, message).Result) + continue; + + context.Controller = dependencyResolver.Resolve(binding.Controller); + context.Binding = binding; + + // ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas + MiddlewareHelper.GoAsync( + binding.MessageMiddleware != null + ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() + : messageMiddleware, + async (handler, next) => await handler.Handle(context, next), + () => binding.Invoke(context, message) + ).Wait(); + // ReSharper restore AccessToDisposedClosure + + validMessageType = true; } - ).Wait(); - validMessageType = true; + if (!validMessageType) + throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); + } + catch (Exception e) + { + worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, UnwrapException(e))); + } } - if (!validMessageType) - throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); - worker.Respond(deliveryTag, ConsumeResponse.Ack); } - catch (Exception) + catch (Exception e) { - worker.Respond(deliveryTag, ConsumeResponse.Requeue); - throw; + worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, UnwrapException(e))); } } + private static Exception UnwrapException(Exception exception) + { + // In async/await style code this is handled similarly. For synchronous + // code using Tasks we have to unwrap these ourselves to get the proper + // exception directly instead of "Errors occured". We might lose + // some stack traces in the process though. + var aggregateException = exception as AggregateException; + if (aggregateException != null && aggregateException.InnerExceptions.Count == 1) + throw aggregateException.InnerExceptions[0]; + + return UnwrapException(exception); + } + + protected class MessageContext : IMessageContext { public IDependencyResolver DependencyResolver { get; set; } public object Controller { get; set; } + public IBinding Binding { get; set; } + + public string Queue { get; set; } + public string RoutingKey { get; set; } public object Message { get; set; } public IBasicProperties Properties { get; set; } public IDictionary Items { get; } = new Dictionary(); + + + public void Dispose() + { + foreach (var value in Items.Values) + (value as IDisposable)?.Dispose(); + } } } } diff --git a/Connection/TapetiPublisher.cs b/Connection/TapetiPublisher.cs index 3b88c03..8f8c742 100644 --- a/Connection/TapetiPublisher.cs +++ b/Connection/TapetiPublisher.cs @@ -4,7 +4,7 @@ using RabbitMQ.Client; namespace Tapeti.Connection { - public class TapetiPublisher : IAdvancedPublisher + public class TapetiPublisher : IInternalPublisher { private readonly Func workerFactory; @@ -25,5 +25,11 @@ namespace Tapeti.Connection { return workerFactory().Publish(message, properties); } + + + public Task PublishDirect(object message, string queueName, IBasicProperties properties) + { + return workerFactory().PublishDirect(message, queueName, properties); + } } } diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index ba000ae..9c6bf04 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -12,12 +12,12 @@ namespace Tapeti.Connection public class TapetiWorker { public TapetiConnectionParams ConnectionParams { get; set; } - public string Exchange { get; set; } private readonly IDependencyResolver dependencyResolver; private readonly IReadOnlyList messageMiddleware; private readonly IMessageSerializer messageSerializer; private readonly IRoutingKeyStrategy routingKeyStrategy; + private readonly IExchangeStrategy exchangeStrategy; private readonly Lazy taskQueue = new Lazy(); private RabbitMQ.Client.IConnection connection; private IModel channelInstance; @@ -27,25 +27,22 @@ namespace Tapeti.Connection { this.dependencyResolver = dependencyResolver; this.messageMiddleware = messageMiddleware; + messageSerializer = dependencyResolver.Resolve(); routingKeyStrategy = dependencyResolver.Resolve(); + exchangeStrategy = dependencyResolver.Resolve(); } public Task Publish(object message, IBasicProperties properties) { - return taskQueue.Value.Add(async () => - { - var messageProperties = properties ?? new BasicProperties(); - if (messageProperties.Timestamp.UnixTime == 0) - messageProperties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()); + return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType())); + } - var body = messageSerializer.Serialize(message, messageProperties); - (await GetChannel()) - .BasicPublish(Exchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false, - messageProperties, body); - }).Unwrap(); + public Task PublishDirect(object message, string queueName, IBasicProperties properties) + { + return Publish(message, properties, "", queueName); } @@ -53,7 +50,7 @@ namespace Tapeti.Connection { return taskQueue.Value.Add(async () => { - (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, dependencyResolver, bindings, messageMiddleware)); + (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, dependencyResolver, bindings, messageMiddleware)); }).Unwrap(); } @@ -71,9 +68,11 @@ namespace Tapeti.Connection foreach (var binding in queue.Bindings) { var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); - channel.QueueBind(dynamicQueue.QueueName, Exchange, routingKey); - } + channel.QueueBind(dynamicQueue.QueueName, exchangeStrategy.GetExchange(binding.MessageClass), routingKey); + (binding as IDynamicQueueBinding)?.SetQueueName(dynamicQueue.QueueName); + } + return dynamicQueue.QueueName; } @@ -133,6 +132,22 @@ namespace Tapeti.Connection } + private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey) + { + return taskQueue.Value.Add(async () => + { + var messageProperties = properties ?? new BasicProperties(); + if (messageProperties.Timestamp.UnixTime == 0) + messageProperties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()); + + var body = messageSerializer.Serialize(message, messageProperties); + + (await GetChannel()) + .BasicPublish(exchange, routingKey, false, messageProperties, body); + }).Unwrap(); + + } + /// /// Only call this from a task in the taskQueue to ensure IModel is only used /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. @@ -160,6 +175,9 @@ namespace Tapeti.Connection connection = connectionFactory.CreateConnection(); channelInstance = connection.CreateModel(); + if (ConnectionParams.PrefetchCount > 0) + channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false); + break; } catch (BrokerUnreachableException) diff --git a/TapetiTypes.cs b/ConsumeResponse.cs similarity index 100% rename from TapetiTypes.cs rename to ConsumeResponse.cs diff --git a/Default/DependencyResolverBinding.cs b/Default/DependencyResolverBinding.cs index d9a8a0d..f1d61bb 100644 --- a/Default/DependencyResolverBinding.cs +++ b/Default/DependencyResolverBinding.cs @@ -6,21 +6,12 @@ namespace Tapeti.Default { public class DependencyResolverBinding : IBindingMiddleware { - private readonly IDependencyResolver resolver; - - - public DependencyResolverBinding(IDependencyResolver resolver) - { - this.resolver = resolver; - } - - public void Handle(IBindingContext context, Action next) { next(); foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType.IsClass)) - parameter.SetBinding(messageContext => resolver.Resolve(parameter.Info.ParameterType)); + parameter.SetBinding(messageContext => messageContext.DependencyResolver.Resolve(parameter.Info.ParameterType)); } } } diff --git a/Default/DefaultMessageSerializer.cs b/Default/JsonMessageSerializer.cs similarity index 96% rename from Default/DefaultMessageSerializer.cs rename to Default/JsonMessageSerializer.cs index e11b60e..7c62ce4 100644 --- a/Default/DefaultMessageSerializer.cs +++ b/Default/JsonMessageSerializer.cs @@ -8,7 +8,7 @@ using RabbitMQ.Client; namespace Tapeti.Default { - public class DefaultMessageSerializer : IMessageSerializer + public class JsonMessageSerializer : IMessageSerializer { protected const string ContentType = "application/json"; protected const string ClassTypeHeader = "classType"; @@ -18,7 +18,7 @@ namespace Tapeti.Default private readonly ConcurrentDictionary serializedTypeNames = new ConcurrentDictionary(); private readonly JsonSerializerSettings serializerSettings; - public DefaultMessageSerializer() + public JsonMessageSerializer() { serializerSettings = new JsonSerializerSettings { diff --git a/Default/NamespaceMatchExchangeStrategy.cs b/Default/NamespaceMatchExchangeStrategy.cs new file mode 100644 index 0000000..01ee0a6 --- /dev/null +++ b/Default/NamespaceMatchExchangeStrategy.cs @@ -0,0 +1,25 @@ +using System; +using System.Text.RegularExpressions; + +namespace Tapeti.Default +{ + public class NamespaceMatchExchangeStrategy : IExchangeStrategy + { + // If the namespace starts with "Messaging.Service[.Optional.Further.Parts]", the exchange will be "Service". + // If no Messaging prefix is present, the first part of the namespace will be used instead. + private static readonly Regex NamespaceRegex = new Regex("^(Messaging\\.)?(?[^\\.]+)", RegexOptions.Compiled | RegexOptions.Singleline); + + + public string GetExchange(Type messageType) + { + if (messageType.Namespace == null) + throw new ArgumentException($"{messageType.FullName} does not have a namespace"); + + var match = NamespaceRegex.Match(messageType.Namespace); + if (!match.Success) + throw new ArgumentException($"Namespace for {messageType.FullName} does not match the specified format"); + + return match.Groups["exchange"].Value.ToLower(); + } + } +} diff --git a/Default/PublishResultBinding.cs b/Default/PublishResultBinding.cs new file mode 100644 index 0000000..9a76dc3 --- /dev/null +++ b/Default/PublishResultBinding.cs @@ -0,0 +1,53 @@ +using System; +using System.Threading.Tasks; +using RabbitMQ.Client.Framing; +using Tapeti.Config; +using Tapeti.Helpers; + +namespace Tapeti.Default +{ + public class PublishResultBinding : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + next(); + + if (context.Result.HasHandler) + return; + + bool isTask; + if (context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out isTask)) + { + if (isTask) + { + context.Result.SetHandler(async (messageContext, value) => + { + var message = await (Task)value; + if (message != null) + await Reply(message, messageContext); + }); + } + else + context.Result.SetHandler((messageContext, value) => + value == null ? null : Reply(value, messageContext)); + } + } + + + private Task Reply(object message, IMessageContext messageContext) + { + var publisher = (IInternalPublisher)messageContext.DependencyResolver.Resolve(); + var properties = new BasicProperties(); + + // Only set the property if it's not null, otherwise a string reference exception can occur: + // http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html + if (messageContext.Properties.IsCorrelationIdPresent()) + properties.CorrelationId = messageContext.Properties.CorrelationId; + + if (messageContext.Properties.IsReplyToPresent()) + return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties); + + return publisher.Publish(message, properties); + } + } +} diff --git a/Default/RequeueExceptionStrategy.cs b/Default/RequeueExceptionStrategy.cs new file mode 100644 index 0000000..6a20ca7 --- /dev/null +++ b/Default/RequeueExceptionStrategy.cs @@ -0,0 +1,14 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class RequeueExceptionStrategy : IExceptionStrategy + { + public ConsumeResponse HandleException(IMessageContext context, Exception exception) + { + // TODO log exception + return ConsumeResponse.Requeue; + } + } +} diff --git a/Default/DefaultRoutingKeyStrategy.cs b/Default/TypeNameRoutingKeyStrategy.cs similarity index 96% rename from Default/DefaultRoutingKeyStrategy.cs rename to Default/TypeNameRoutingKeyStrategy.cs index 48e9260..b5e146d 100644 --- a/Default/DefaultRoutingKeyStrategy.cs +++ b/Default/TypeNameRoutingKeyStrategy.cs @@ -5,7 +5,7 @@ using System.Linq; namespace Tapeti.Default { - public class DefaultRoutingKeyStrategy : IRoutingKeyStrategy + public class TypeNameRoutingKeyStrategy : IRoutingKeyStrategy { private readonly ConcurrentDictionary routingKeyCache = new ConcurrentDictionary(); diff --git a/Helpers/MiddlewareHelper.cs b/Helpers/MiddlewareHelper.cs index a3b791c..edac856 100644 --- a/Helpers/MiddlewareHelper.cs +++ b/Helpers/MiddlewareHelper.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Diagnostics.Eventing.Reader; using System.Threading.Tasks; namespace Tapeti.Helpers diff --git a/Helpers/TaskTypeHelper.cs b/Helpers/TaskTypeHelper.cs new file mode 100644 index 0000000..73012d7 --- /dev/null +++ b/Helpers/TaskTypeHelper.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading.Tasks; + +namespace Tapeti.Helpers +{ + public static class TaskTypeHelper + { + public static bool IsTypeOrTaskOf(this Type type, Func predicate, out bool isTask) + { + if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Task<>)) + { + isTask = true; + + var genericArguments = type.GetGenericArguments(); + return genericArguments.Length == 1 && predicate(genericArguments[0]); + } + + isTask = false; + return predicate(type); + } + + + public static bool IsTypeOrTaskOf(this Type type, Type compareTo, out bool isTask) + { + return IsTypeOrTaskOf(type, t => t == compareTo, out isTask); + } + } +} diff --git a/IDependencyResolver.cs b/IDependencyResolver.cs index 6f2641b..f7a67eb 100644 --- a/IDependencyResolver.cs +++ b/IDependencyResolver.cs @@ -9,10 +9,15 @@ namespace Tapeti } - public interface IDependencyInjector : IDependencyResolver + public interface IDependencyContainer : IDependencyResolver { void RegisterDefault() where TService : class where TImplementation : class, TService; - void RegisterPublisher(Func publisher); + void RegisterDefault(Func factory) where TService : class; + + void RegisterDefaultSingleton() where TService : class where TImplementation : class, TService; + void RegisterDefaultSingleton(TService instance) where TService : class; + void RegisterDefaultSingleton(Func factory) where TService : class; + void RegisterController(Type type); } } diff --git a/IExceptionStrategy.cs b/IExceptionStrategy.cs new file mode 100644 index 0000000..7b46af6 --- /dev/null +++ b/IExceptionStrategy.cs @@ -0,0 +1,16 @@ +using System; +using Tapeti.Config; + +namespace Tapeti +{ + public interface IExceptionStrategy + { + /// + /// Called when an exception occurs while handling a message. + /// + /// The message context if available. May be null! + /// The exception instance + /// The ConsumeResponse to determine whether to requeue, dead-letter (nack) or simply ack the message. + ConsumeResponse HandleException(IMessageContext context, Exception exception); + } +} diff --git a/IExchangeStrategy.cs b/IExchangeStrategy.cs new file mode 100644 index 0000000..e7aaa7e --- /dev/null +++ b/IExchangeStrategy.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti +{ + public interface IExchangeStrategy + { + string GetExchange(Type messageType); + } +} diff --git a/IPublisher.cs b/IPublisher.cs index 0c6721b..f1bf689 100644 --- a/IPublisher.cs +++ b/IPublisher.cs @@ -3,14 +3,17 @@ using RabbitMQ.Client; namespace Tapeti { + // Note: Tapeti assumes every implementation of IPublisher can also be cast to an IInternalPublisher. + // The distinction is made on purpose to trigger code-smells in non-Tapeti code when casting. public interface IPublisher { Task Publish(object message); } - public interface IAdvancedPublisher : IPublisher + public interface IInternalPublisher : IPublisher { Task Publish(object message, IBasicProperties properties); + Task PublishDirect(object message, string queueName, IBasicProperties properties); } } diff --git a/Properties/AssemblyInfo.cs b/Properties/AssemblyInfo.cs index 3428fd6..1336875 100644 --- a/Properties/AssemblyInfo.cs +++ b/Properties/AssemblyInfo.cs @@ -7,9 +7,9 @@ using System.Runtime.InteropServices; [assembly: AssemblyTitle("Tapeti")] [assembly: AssemblyDescription("")] [assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("Hewlett-Packard Company")] +[assembly: AssemblyCompany("")] [assembly: AssemblyProduct("Tapeti")] -[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] +[assembly: AssemblyCopyright("")] [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] diff --git a/Tapeti.Flow/Annotations/ContinuationAttribute.cs b/Tapeti.Flow/Annotations/ContinuationAttribute.cs new file mode 100644 index 0000000..39d0914 --- /dev/null +++ b/Tapeti.Flow/Annotations/ContinuationAttribute.cs @@ -0,0 +1,8 @@ +using System; + +namespace Tapeti.Flow.Annotations +{ + public class ContinuationAttribute : Attribute + { + } +} diff --git a/Tapeti.Flow/Annotations/RequestAttribute.cs b/Tapeti.Flow/Annotations/RequestAttribute.cs new file mode 100644 index 0000000..00579ba --- /dev/null +++ b/Tapeti.Flow/Annotations/RequestAttribute.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Flow.Annotations +{ + public class RequestAttribute : Attribute + { + public Type Response { get; set; } + } +} diff --git a/Tapeti.Flow/ConfigExtensions.cs b/Tapeti.Flow/ConfigExtensions.cs new file mode 100644 index 0000000..ffaad41 --- /dev/null +++ b/Tapeti.Flow/ConfigExtensions.cs @@ -0,0 +1,11 @@ +namespace Tapeti.Flow +{ + public static class ConfigExtensions + { + public static TapetiConfig WithFlow(this TapetiConfig config) + { + config.Use(new FlowMiddleware()); + return config; + } + } +} diff --git a/Tapeti.Flow/ContextItems.cs b/Tapeti.Flow/ContextItems.cs new file mode 100644 index 0000000..452de54 --- /dev/null +++ b/Tapeti.Flow/ContextItems.cs @@ -0,0 +1,7 @@ +namespace Tapeti.Flow +{ + public static class ContextItems + { + public const string FlowContext = "Tapeti.Flow.FlowContext"; + } +} diff --git a/Tapeti.Flow/Default/DelegateYieldPoint.cs b/Tapeti.Flow/Default/DelegateYieldPoint.cs new file mode 100644 index 0000000..105492e --- /dev/null +++ b/Tapeti.Flow/Default/DelegateYieldPoint.cs @@ -0,0 +1,33 @@ +using System; +using System.Threading.Tasks; + +namespace Tapeti.Flow.Default +{ + internal interface IExecutableYieldPoint : IYieldPoint + { + bool StoreState { get; } + + Task Execute(FlowContext context); + } + + + internal class DelegateYieldPoint : IYieldPoint + { + public bool StoreState { get; } + + private readonly Func onExecute; + + + public DelegateYieldPoint(bool storeState, Func onExecute) + { + StoreState = storeState; + this.onExecute = onExecute; + } + + + public Task Execute(FlowContext context) + { + return onExecute(context); + } + } +} diff --git a/Tapeti.Flow/Default/FlowBindingFilter.cs b/Tapeti.Flow/Default/FlowBindingFilter.cs new file mode 100644 index 0000000..727e773 --- /dev/null +++ b/Tapeti.Flow/Default/FlowBindingFilter.cs @@ -0,0 +1,64 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Config; +using Tapeti.Flow.FlowHelpers; + +namespace Tapeti.Flow.Default +{ + public class FlowBindingFilter : IBindingFilter + { + public async Task Accept(IMessageContext context, IBinding binding) + { + var flowContext = await GetFlowContext(context); + if (flowContext?.ContinuationMetadata == null) + return false; + + return flowContext.ContinuationMetadata.MethodName == MethodSerializer.Serialize(binding.Method); + } + + + private static async Task GetFlowContext(IMessageContext context) + { + if (context.Items.ContainsKey(ContextItems.FlowContext)) + return (FlowContext)context.Items[ContextItems.FlowContext]; + + if (context.Properties.CorrelationId == null) + return null; + + Guid continuationID; + if (!Guid.TryParse(context.Properties.CorrelationId, out continuationID)) + return null; + + var flowStore = context.DependencyResolver.Resolve(); + + var flowID = await flowStore.FindFlowID(continuationID); + if (!flowID.HasValue) + return null; + + var flowStateLock = await flowStore.LockFlowState(flowID.Value); + if (flowStateLock == null) + return null; + + var flowState = await flowStateLock.GetFlowState(); + if (flowState == null) + return null; + + ContinuationMetadata continuation; + + var flowContext = new FlowContext + { + MessageContext = context, + + FlowStateLock = flowStateLock, + FlowState = flowState, + + ContinuationID = continuationID, + ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out continuation) ? continuation : null + }; + + // IDisposable items in the IMessageContext are automatically disposed + context.Items.Add(ContextItems.FlowContext, flowContext); + return flowContext; + } + } +} diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs new file mode 100644 index 0000000..6b91e95 --- /dev/null +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -0,0 +1,73 @@ +using System; +using System.Reflection; +using System.Threading.Tasks; +using Tapeti.Config; +using Tapeti.Flow.Annotations; +using Tapeti.Helpers; + +namespace Tapeti.Flow.Default +{ + // TODO figure out a way to prevent binding on Continuation methods (which are always the target of a direct response) + internal class FlowBindingMiddleware : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + RegisterContinuationFilter(context); + RegisterYieldPointResult(context); + + next(); + + ValidateRequestResponse(context); + } + + + private static void RegisterContinuationFilter(IBindingContext context) + { + var continuationAttribute = context.Method.GetCustomAttribute(); + if (continuationAttribute == null) + return; + + context.Use(new FlowBindingFilter()); + context.Use(new FlowMessageMiddleware()); + } + + + private static void RegisterYieldPointResult(IBindingContext context) + { + bool isTask; + if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out isTask)) + return; + + if (isTask) + { + context.Result.SetHandler(async (messageContext, value) => + { + var yieldPoint = await (Task)value; + if (yieldPoint != null) + await HandleYieldPoint(messageContext, yieldPoint); + }); + } + else + context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value)); + } + + + private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint) + { + var flowHandler = context.DependencyResolver.Resolve(); + return flowHandler.Execute(context, yieldPoint); + } + + + private static void ValidateRequestResponse(IBindingContext context) + { + var request = context.MessageClass?.GetCustomAttribute(); + if (request?.Response == null) + return; + + bool isTask; + if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out isTask)) + throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}"); + } + } +} diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs new file mode 100644 index 0000000..dac86f6 --- /dev/null +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using Tapeti.Config; + +namespace Tapeti.Flow.Default +{ + internal class FlowContext : IDisposable + { + public IMessageContext MessageContext { get; set; } + public IFlowStateLock FlowStateLock { get; set; } + public FlowState FlowState { get; set; } + + public Guid ContinuationID { get; set; } + public ContinuationMetadata ContinuationMetadata { get; set; } + + public void Dispose() + { + FlowStateLock?.Dispose(); + } + } +} diff --git a/Tapeti.Flow/Default/FlowMessageMiddleware.cs b/Tapeti.Flow/Default/FlowMessageMiddleware.cs new file mode 100644 index 0000000..85f0925 --- /dev/null +++ b/Tapeti.Flow/Default/FlowMessageMiddleware.cs @@ -0,0 +1,24 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Flow.Default +{ + public class FlowMessageMiddleware : IMessageMiddleware + { + public async Task Handle(IMessageContext context, Func next) + { + var flowContext = (FlowContext)context.Items[ContextItems.FlowContext]; + if (flowContext != null) + { + Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller); + + await next(); + + flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); + } + else + await next(); + } + } +} diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs new file mode 100644 index 0000000..232d11a --- /dev/null +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -0,0 +1,273 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Threading.Tasks; +using RabbitMQ.Client.Framing; +using Tapeti.Config; +using Tapeti.Flow.Annotations; +using Tapeti.Flow.FlowHelpers; + +namespace Tapeti.Flow.Default +{ + public class FlowProvider : IFlowProvider, IFlowHandler + { + private readonly IConfig config; + private readonly IInternalPublisher publisher; + + + public FlowProvider(IConfig config, IPublisher publisher) + { + this.config = config; + this.publisher = (IInternalPublisher)publisher; + } + + + public IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo)); + } + + public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo)); + } + + public IFlowParallelRequestBuilder YieldWithParallelRequest() + { + throw new NotImplementedException(); + //return new ParallelRequestBuilder(); + } + + public IYieldPoint EndWithResponse(TResponse message) + { + return new DelegateYieldPoint(false, context => SendResponse(context, message)); + } + + public IYieldPoint End() + { + return new DelegateYieldPoint(false, EndFlow); + } + + + private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo) + { + var continuationID = Guid.NewGuid(); + + context.FlowState.Continuations.Add(continuationID, + new ContinuationMetadata + { + MethodName = responseHandlerInfo.MethodName, + ConvergeMethodName = null + }); + + var properties = new BasicProperties + { + CorrelationId = continuationID.ToString(), + ReplyTo = responseHandlerInfo.ReplyToQueue + }; + + await publisher.Publish(message, properties); + } + + + private async Task SendResponse(FlowContext context, object message) + { + var reply = context.FlowState.Metadata.Reply; + if (reply == null) + throw new YieldPointException("No response is required"); + + if (message.GetType().FullName != reply.ResponseTypeName) + throw new YieldPointException($"Flow must end with a response message of type {reply.ResponseTypeName}, {message.GetType().FullName} was returned instead"); + + var properties = new BasicProperties(); + + // Only set the property if it's not null, otherwise a string reference exception can occur: + // http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html + if (reply.CorrelationId != null) + properties.CorrelationId = reply.CorrelationId; + + // TODO disallow if replyto is not specified? + if (context.FlowState.Metadata.Reply.ReplyTo != null) + await publisher.PublishDirect(message, reply.ReplyTo, properties); + else + await publisher.Publish(message, properties); + } + + + private static Task EndFlow(FlowContext context) + { + if (context.FlowState.Metadata.Reply != null) + throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); + + return Task.CompletedTask; + } + + + private static ResponseHandlerInfo GetResponseHandlerInfo(IConfig config, object request, Delegate responseHandler) + { + var binding = config.GetBinding(responseHandler); + if (binding == null) + throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler)); + + var requestAttribute = request.GetType().GetCustomAttribute(); + if (requestAttribute?.Response != null && requestAttribute.Response != binding.MessageClass) + throw new ArgumentException($"responseHandler must accept message of type {binding.MessageClass}", nameof(responseHandler)); + + var continuationAttribute = binding.Method.GetCustomAttribute(); + if (continuationAttribute == null) + throw new ArgumentException($"responseHandler must be marked with the Continuation attribute", nameof(responseHandler)); + + return new ResponseHandlerInfo + { + MethodName = MethodSerializer.Serialize(responseHandler.Method), + ReplyToQueue = binding.QueueName + }; + } + + + private static ReplyMetadata GetReply(IMessageContext context) + { + var requestAttribute = context.Message.GetType().GetCustomAttribute(); + if (requestAttribute?.Response == null) + return null; + + return new ReplyMetadata + { + CorrelationId = context.Properties.CorrelationId, + ReplyTo = context.Properties.ReplyTo, + ResponseTypeName = requestAttribute.Response.FullName + }; + } + + + public async Task Execute(IMessageContext context, IYieldPoint yieldPoint) + { + var delegateYieldPoint = (DelegateYieldPoint)yieldPoint; + var storeState = delegateYieldPoint.StoreState; + + FlowContext flowContext; + object flowContextItem; + + if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextItem)) + { + flowContext = new FlowContext + { + MessageContext = context + }; + + if (storeState) + { + // Initiate the flow + var flowStore = context.DependencyResolver.Resolve(); + + var flowID = Guid.NewGuid(); + flowContext.FlowStateLock = await flowStore.LockFlowState(flowID); + + if (flowContext.FlowStateLock == null) + throw new InvalidOperationException("Unable to lock a new flow"); + + flowContext.FlowState = await flowContext.FlowStateLock.GetFlowState(); + if (flowContext.FlowState == null) + throw new InvalidOperationException("Unable to get state for new flow"); + + flowContext.FlowState.Metadata.Reply = GetReply(context); + } + } + else + flowContext = (FlowContext) flowContextItem; + + + try + { + await delegateYieldPoint.Execute(flowContext); + } + catch (YieldPointException e) + { + var controllerName = flowContext.MessageContext.Controller.GetType().FullName; + var methodName = flowContext.MessageContext.Binding.Method.Name; + + throw new YieldPointException($"{e.Message} in controller {controllerName}, method {methodName}", e); + } + + if (storeState) + { + flowContext.FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(context.Controller); + await flowContext.FlowStateLock.StoreFlowState(flowContext.FlowState); + } + else + { + await flowContext.FlowStateLock.DeleteFlowState(); + } + } + + + /* + private class ParallelRequestBuilder : IFlowParallelRequestBuilder + { + internal class RequestInfo + { + public object Message { get; set; } + public ResponseHandlerInfo ResponseHandlerInfo { get; set; } + } + + + private readonly IConfig config; + private readonly IFlowStore flowStore; + private readonly Func sendRequest; + private readonly List requests = new List(); + + + public ParallelRequestBuilder(IConfig config, IFlowStore flowStore, Func sendRequest) + { + this.config = config; + this.flowStore = flowStore; + this.sendRequest = sendRequest; + } + + + public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) + { + requests.Add(new RequestInfo + { + Message = message, + ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) + }); + + return this; + } + + + public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) + { + requests.Add(new RequestInfo + { + Message = message, + ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) + }); + + return this; + } + + + public IYieldPoint Yield(Func> continuation) + { + return new YieldPoint(flowStore, true, context => Task.WhenAll(requests.Select(requestInfo => sendRequest(context, requestInfo.Message, requestInfo.ResponseHandlerInfo)))); + } + + + public IYieldPoint Yield(Func continuation) + { + return new YieldPoint(flowStore, true, context => Task.WhenAll(requests.Select(requestInfo => sendRequest(context, requestInfo.Message, requestInfo.ResponseHandlerInfo)))); + } + }*/ + + + internal class ResponseHandlerInfo + { + public string MethodName { get; set; } + public string ReplyToQueue { get; set; } + } + } +} diff --git a/Tapeti.Flow/Default/FlowState.cs b/Tapeti.Flow/Default/FlowState.cs new file mode 100644 index 0000000..d120370 --- /dev/null +++ b/Tapeti.Flow/Default/FlowState.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Tapeti.Flow.Default +{ + public class FlowState + { + private FlowMetadata metadata; + private Dictionary continuations; + + + public FlowMetadata Metadata + { + get { return metadata ?? (metadata = new FlowMetadata()); } + set { metadata = value; } + } + + public string Data { get; set; } + + public Dictionary Continuations + { + get { return continuations ?? (continuations = new Dictionary()); } + set { continuations = value; } + } + + + public void Assign(FlowState value) + { + Metadata = value.Metadata.Clone(); + Data = value.Data; + Continuations = value.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value.Clone()); + } + + + public FlowState Clone() + { + var result = new FlowState(); + result.Assign(this); + + return result; + } + } + + + public class FlowMetadata + { + public ReplyMetadata Reply { get; set; } + + + public FlowMetadata Clone() + { + return new FlowMetadata + { + Reply = Reply?.Clone() + }; + } + } + + + public class ReplyMetadata + { + public string ReplyTo { get; set; } + public string CorrelationId { get; set; } + public string ResponseTypeName { get; set; } + + + public ReplyMetadata Clone() + { + return new ReplyMetadata + { + ReplyTo = ReplyTo, + CorrelationId = CorrelationId, + ResponseTypeName = ResponseTypeName + }; + } + } + + + public class ContinuationMetadata + { + public string MethodName { get; set; } + public string ConvergeMethodName { get; set; } + + + public ContinuationMetadata Clone() + { + return new ContinuationMetadata + { + MethodName = MethodName, + ConvergeMethodName = ConvergeMethodName + }; + } + } +} diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs new file mode 100644 index 0000000..eee3a18 --- /dev/null +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -0,0 +1,198 @@ +using Newtonsoft.Json; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Tapeti.Flow.Default +{ + public class FlowStore : IFlowStore + { + private static readonly ConcurrentDictionary FlowStates = new ConcurrentDictionary(); + private static readonly ConcurrentDictionary ContinuationLookup = new ConcurrentDictionary(); + + private readonly IFlowRepository repository; + + + public FlowStore(IFlowRepository repository) + { + this.repository = repository; + } + + + public async Task Load() + { + FlowStates.Clear(); + ContinuationLookup.Clear(); + + foreach (var flowStateRecord in await repository.GetStates()) + { + var flowState = ToFlowState(flowStateRecord); + FlowStates.GetOrAdd(flowStateRecord.FlowID, flowState); + + foreach (var continuation in flowStateRecord.ContinuationMetadata) + ContinuationLookup.GetOrAdd(continuation.Key, flowStateRecord.FlowID); + } + } + + + public Task FindFlowID(Guid continuationID) + { + Guid result; + return Task.FromResult(ContinuationLookup.TryGetValue(continuationID, out result) ? result : (Guid?)null); + } + + + public async Task LockFlowState(Guid flowID) + { + var isNew = false; + var flowState = FlowStates.GetOrAdd(flowID, id => + { + isNew = true; + return new FlowState(); + }); + + var result = new FlowStateLock(this, flowState, flowID, isNew); + await result.Lock(); + + return result; + } + + + private class FlowStateLock : IFlowStateLock + { + private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1); + + private readonly FlowStore owner; + private readonly FlowState flowState; + private readonly Guid flowID; + private bool isNew; + private bool isDisposed; + + + public FlowStateLock(FlowStore owner, FlowState flowState, Guid flowID, bool isNew) + { + this.owner = owner; + this.flowState = flowState; + this.flowID = flowID; + this.isNew = isNew; + } + + + public Task Lock() + { + return semaphore.WaitAsync(); + } + + + public void Dispose() + { + lock (flowState) + { + if (!isDisposed) + { + semaphore.Release(); + semaphore.Dispose(); + } + + isDisposed = true; + } + } + + public Guid FlowID => flowID; + + public Task GetFlowState() + { + lock (flowState) + { + if (isDisposed) + throw new ObjectDisposedException("FlowStateLock"); + + return Task.FromResult(flowState.Clone()); + } + } + + public async Task StoreFlowState(FlowState newFlowState) + { + lock (flowState) + { + if (isDisposed) + throw new ObjectDisposedException("FlowStateLock"); + + foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k))) + { + Guid removedValue; + ContinuationLookup.TryRemove(removedContinuation, out removedValue); + } + + foreach (var addedContinuation in newFlowState.Continuations.Where(c => !flowState.Continuations.ContainsKey(c.Key))) + { + ContinuationLookup.TryAdd(addedContinuation.Key, flowID); + } + + flowState.Assign(newFlowState); + } + + if (isNew) + { + isNew = false; + var now = DateTime.UtcNow; + await owner.repository.CreateState(ToFlowStateRecord(flowID, flowState), now); + } + else + { + await owner.repository.UpdateState(ToFlowStateRecord(flowID, flowState)); + } + } + + public async Task DeleteFlowState() + { + lock (flowState) + { + if (isDisposed) + throw new ObjectDisposedException("FlowStateLock"); + + foreach (var removedContinuation in flowState.Continuations.Keys) + { + Guid removedValue; + ContinuationLookup.TryRemove(removedContinuation, out removedValue); + } + + FlowState removedFlow; + FlowStates.TryRemove(flowID, out removedFlow); + } + + if (!isNew) + await owner.repository.DeleteState(flowID); + } + } + + + private static FlowStateRecord ToFlowStateRecord(Guid flowID, FlowState flowState) + { + return new FlowStateRecord + { + FlowID = flowID, + Metadata = JsonConvert.SerializeObject(flowState.Metadata), + Data = flowState.Data, + ContinuationMetadata = flowState.Continuations.ToDictionary( + kv => kv.Key, + kv => JsonConvert.SerializeObject(kv.Value)) + }; + } + + private static FlowState ToFlowState(FlowStateRecord flowStateRecord) + { + return new FlowState + { + Metadata = JsonConvert.DeserializeObject(flowStateRecord.Metadata), + Data = flowStateRecord.Data, + Continuations = flowStateRecord.ContinuationMetadata.ToDictionary( + kv => kv.Key, + kv => JsonConvert.DeserializeObject(kv.Value)) + }; + } + } +} diff --git a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs new file mode 100644 index 0000000..304899a --- /dev/null +++ b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs @@ -0,0 +1,30 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Tapeti.Flow.Default +{ + public class NonPersistentFlowRepository : IFlowRepository + { + public Task> GetStates() + { + return Task.FromResult(new List().AsQueryable()); + } + + public Task CreateState(FlowStateRecord stateRecord, DateTime timestamp) + { + return Task.CompletedTask; + } + + public Task UpdateState(FlowStateRecord stateRecord) + { + return Task.CompletedTask; + } + + public Task DeleteState(Guid flowID) + { + return Task.CompletedTask; + } + } +} diff --git a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs b/Tapeti.Flow/FlowHelpers/MethodSerializer.cs new file mode 100644 index 0000000..5635249 --- /dev/null +++ b/Tapeti.Flow/FlowHelpers/MethodSerializer.cs @@ -0,0 +1,12 @@ +using System.Reflection; + +namespace Tapeti.Flow.FlowHelpers +{ + public static class MethodSerializer + { + public static string Serialize(MethodInfo method) + { + return method.Name + '@' + method.DeclaringType?.Assembly.GetName().Name + ':' + method.DeclaringType?.FullName; + } + } +} diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowMiddleware.cs new file mode 100644 index 0000000..6e696d8 --- /dev/null +++ b/Tapeti.Flow/FlowMiddleware.cs @@ -0,0 +1,25 @@ +using System.Collections.Generic; +using Tapeti.Config; +using Tapeti.Flow.Default; + +namespace Tapeti.Flow +{ + public class FlowMiddleware : IMiddlewareBundle + { + public IEnumerable GetContents(IDependencyResolver dependencyResolver) + { + var container = dependencyResolver as IDependencyContainer; + + // ReSharper disable once InvertIf + if (container != null) + { + container.RegisterDefault(); + container.RegisterDefault(); + container.RegisterDefault(); + container.RegisterDefault(); + } + + return new[] { new FlowBindingMiddleware() }; + } + } +} diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs new file mode 100644 index 0000000..0b619e3 --- /dev/null +++ b/Tapeti.Flow/IFlowProvider.cs @@ -0,0 +1,39 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Flow +{ + public interface IFlowProvider + { + IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler); + + // One does not simply overload methods with Task vs non-Task Funcs. "Ambiguous call". + // Apparantly this is because a return type of a method is not part of its signature, + // according to: http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter + IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler); + + IFlowParallelRequestBuilder YieldWithParallelRequest(); + + IYieldPoint EndWithResponse(TResponse message); + IYieldPoint End(); + } + + public interface IFlowHandler + { + Task Execute(IMessageContext context, IYieldPoint yieldPoint); + } + + public interface IFlowParallelRequestBuilder + { + IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler); + + IYieldPoint Yield(Func> continuation); + IYieldPoint Yield(Func continuation); + } + + public interface IYieldPoint + { + } +} diff --git a/Tapeti.Flow/IFlowRepository.cs b/Tapeti.Flow/IFlowRepository.cs new file mode 100644 index 0000000..476623a --- /dev/null +++ b/Tapeti.Flow/IFlowRepository.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Tapeti.Flow +{ + public interface IFlowRepository + { + Task> GetStates(); + Task CreateState(FlowStateRecord stateRecord, DateTime timestamp); + Task UpdateState(FlowStateRecord stateRecord); + Task DeleteState(Guid flowID); + } + + + public class FlowStateRecord + { + public Guid FlowID; + public string Metadata; + public string Data; + public Dictionary ContinuationMetadata; + } +} diff --git a/Tapeti.Flow/IFlowStore.cs b/Tapeti.Flow/IFlowStore.cs new file mode 100644 index 0000000..7af9513 --- /dev/null +++ b/Tapeti.Flow/IFlowStore.cs @@ -0,0 +1,23 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Tapeti.Flow.Default; + +namespace Tapeti.Flow +{ + public interface IFlowStore + { + Task Load(); + Task FindFlowID(Guid continuationID); + Task LockFlowState(Guid flowID); + } + + public interface IFlowStateLock : IDisposable + { + Guid FlowID { get; } + + Task GetFlowState(); + Task StoreFlowState(FlowState flowState); + Task DeleteFlowState(); + } +} diff --git a/Tapeti.Saga/Properties/AssemblyInfo.cs b/Tapeti.Flow/Properties/AssemblyInfo.cs similarity index 82% rename from Tapeti.Saga/Properties/AssemblyInfo.cs rename to Tapeti.Flow/Properties/AssemblyInfo.cs index 785eee1..91c0d83 100644 --- a/Tapeti.Saga/Properties/AssemblyInfo.cs +++ b/Tapeti.Flow/Properties/AssemblyInfo.cs @@ -1,16 +1,15 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following // set of attributes. Change these attribute values to modify the information // associated with an assembly. -[assembly: AssemblyTitle("Tapeti.Saga")] +[assembly: AssemblyTitle("Tapeti.Flow")] [assembly: AssemblyDescription("")] [assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("Hewlett-Packard Company")] -[assembly: AssemblyProduct("Tapeti.Saga")] -[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Tapeti.Flow")] +[assembly: AssemblyCopyright("")] [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] diff --git a/Tapeti.Flow/ResponseExpectedException.cs b/Tapeti.Flow/ResponseExpectedException.cs new file mode 100644 index 0000000..c636be3 --- /dev/null +++ b/Tapeti.Flow/ResponseExpectedException.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Flow +{ + public class ResponseExpectedException : Exception + { + public ResponseExpectedException(string message) : base(message) { } + } +} diff --git a/Tapeti.Saga/Tapeti.Saga.csproj b/Tapeti.Flow/Tapeti.Flow.csproj similarity index 60% rename from Tapeti.Saga/Tapeti.Saga.csproj rename to Tapeti.Flow/Tapeti.Flow.csproj index 8f8f98d..4586611 100644 --- a/Tapeti.Saga/Tapeti.Saga.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -7,8 +7,8 @@ {F84AD920-D5A1-455D-AED5-2542B3A47B85} Library Properties - Tapeti.Saga - Tapeti.Saga + Tapeti.Flow + Tapeti.Flow v4.6.1 512 @@ -31,7 +31,14 @@ 4 - + + ..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll + True + + + ..\packages\RabbitMQ.Client.4.1.1\lib\net451\RabbitMQ.Client.dll + True + @@ -42,14 +49,27 @@ - - - + + + + + + + + + + + + + + + + + + - - - - + + @@ -57,6 +77,9 @@ Tapeti + + +