diff --git a/Config/IBindingContext.cs b/Config/IBindingContext.cs index df3444e..d45d42e 100644 --- a/Config/IBindingContext.cs +++ b/Config/IBindingContext.cs @@ -1,16 +1,24 @@ using System; using System.Collections.Generic; using System.Reflection; +using System.Threading.Tasks; namespace Tapeti.Config { public delegate object ValueFactory(IMessageContext context); + public delegate Task ResultHandler(IMessageContext context, object value); public interface IBindingContext { Type MessageClass { get; set; } + + MethodInfo Method { get; } IReadOnlyList Parameters { get; } + IBindingResult Result { get; } + + void Use(IBindingFilter filter); + void Use(IMessageMiddleware middleware); } @@ -21,4 +29,13 @@ namespace Tapeti.Config void SetBinding(ValueFactory valueFactory); } + + + public interface IBindingResult + { + ParameterInfo Info { get; } + bool HasHandler { get; } + + void SetHandler(ResultHandler resultHandler); + } } diff --git a/Config/IBindingFilter.cs b/Config/IBindingFilter.cs new file mode 100644 index 0000000..1e2259a --- /dev/null +++ b/Config/IBindingFilter.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface IBindingFilter + { + Task Accept(IMessageContext context, IBinding binding); + } +} diff --git a/Config/IConfig.cs b/Config/IConfig.cs index 8752c97..bdbbb67 100644 --- a/Config/IConfig.cs +++ b/Config/IConfig.cs @@ -11,6 +11,8 @@ namespace Tapeti.Config IDependencyResolver DependencyResolver { get; } IReadOnlyList MessageMiddleware { get; } IEnumerable Queues { get; } + + IBinding GetBinding(Delegate method); } @@ -28,8 +30,17 @@ namespace Tapeti.Config Type Controller { get; } MethodInfo Method { get; } Type MessageClass { get; } + string QueueName { get; } + + IReadOnlyList MessageMiddleware { get; } bool Accept(object message); Task Invoke(IMessageContext context, object message); } + + + public interface IDynamicQueueBinding : IBinding + { + void SetQueueName(string queueName); + } } diff --git a/Config/IMessageContext.cs b/Config/IMessageContext.cs index 6bf8101..a602a86 100644 --- a/Config/IMessageContext.cs +++ b/Config/IMessageContext.cs @@ -1,11 +1,23 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; +using RabbitMQ.Client; namespace Tapeti.Config { - public interface IMessageContext + public interface IMessageContext : IDisposable { - object Controller { get; } + IDependencyResolver DependencyResolver { get; } + + string Queue { get; } + string RoutingKey { get; } object Message { get; } + IBasicProperties Properties { get; } + IDictionary Items { get; } + + /// + /// Controller will be null when passed to an IBindingFilter + /// + object Controller { get; } } } diff --git a/Config/IMessageMiddleware.cs b/Config/IMessageMiddleware.cs index aaec7eb..38ee22b 100644 --- a/Config/IMessageMiddleware.cs +++ b/Config/IMessageMiddleware.cs @@ -1,9 +1,10 @@ using System; +using System.Threading.Tasks; namespace Tapeti.Config { public interface IMessageMiddleware { - void Handle(IMessageContext context, Action next); + Task Handle(IMessageContext context, Func next); } } diff --git a/Connection/TapetiConsumer.cs b/Connection/TapetiConsumer.cs index 1687d24..ea4f6e9 100644 --- a/Connection/TapetiConsumer.cs +++ b/Connection/TapetiConsumer.cs @@ -10,14 +10,16 @@ namespace Tapeti.Connection public class TapetiConsumer : DefaultBasicConsumer { private readonly TapetiWorker worker; + private readonly string queueName; private readonly IDependencyResolver dependencyResolver; private readonly IReadOnlyList messageMiddleware; private readonly List bindings; - public TapetiConsumer(TapetiWorker worker, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware) + public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware) { this.worker = worker; + this.queueName = queueName; this.dependencyResolver = dependencyResolver; this.messageMiddleware = messageMiddleware; this.bindings = bindings.ToList(); @@ -33,25 +35,41 @@ namespace Tapeti.Connection if (message == null) throw new ArgumentException("Empty message"); - var handled = false; + var validMessageType = false; foreach (var binding in bindings.Where(b => b.Accept(message))) { - var context = new MessageContext + using (var context = new MessageContext { + DependencyResolver = dependencyResolver, Controller = dependencyResolver.Resolve(binding.Controller), - Message = message - }; + Queue = queueName, + RoutingKey = routingKey, + Message = message, + Properties = properties + }) + { + // ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas + MiddlewareHelper.GoAsync( + binding.MessageMiddleware != null + ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() + : messageMiddleware, + async (handler, next) => await handler.Handle(context, next), + async () => + { + var result = binding.Invoke(context, message).Result; - MiddlewareHelper.Go(messageMiddleware, (handler, next) => handler.Handle(context, next)); + // TODO change to result handler + if (result != null) + await worker.Publish(result, null); + } + ).Wait(); + // ReSharper restore AccessToDisposedClosure + } - var result = binding.Invoke(context, message).Result; - if (result != null) - worker.Publish(result); - - handled = true; + validMessageType = true; } - if (!handled) + if (!validMessageType) throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); worker.Respond(deliveryTag, ConsumeResponse.Ack); @@ -66,9 +84,23 @@ namespace Tapeti.Connection protected class MessageContext : IMessageContext { + public IDependencyResolver DependencyResolver { get; set; } + public object Controller { get; set; } + + public string Queue { get; set; } + public string RoutingKey { get; set; } public object Message { get; set; } + public IBasicProperties Properties { get; set; } + public IDictionary Items { get; } = new Dictionary(); + + + public void Dispose() + { + foreach (var value in Items.Values) + (value as IDisposable)?.Dispose(); + } } } } diff --git a/Connection/TapetiPublisher.cs b/Connection/TapetiPublisher.cs index 4143cf5..db7b3cd 100644 --- a/Connection/TapetiPublisher.cs +++ b/Connection/TapetiPublisher.cs @@ -1,9 +1,10 @@ using System; using System.Threading.Tasks; +using RabbitMQ.Client; namespace Tapeti.Connection { - public class TapetiPublisher : IPublisher + public class TapetiPublisher : IAdvancedPublisher { private readonly Func workerFactory; @@ -16,7 +17,19 @@ namespace Tapeti.Connection public Task Publish(object message) { - return workerFactory().Publish(message); + return workerFactory().Publish(message, null); + } + + + public Task Publish(object message, IBasicProperties properties) + { + return workerFactory().Publish(message, properties); + } + + + public Task PublishDirect(object message, string queueName, IBasicProperties properties) + { + return workerFactory().PublishDirect(message, queueName, properties); } } } diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index f5ee254..5f0adae 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -32,17 +32,15 @@ namespace Tapeti.Connection } - public Task Publish(object message) + public Task Publish(object message, IBasicProperties properties) { - return taskQueue.Value.Add(async () => - { - var properties = new BasicProperties(); - var body = messageSerializer.Serialize(message, properties); + return Publish(message, properties, Exchange, routingKeyStrategy.GetRoutingKey(message.GetType())); + } - (await GetChannel()) - .BasicPublish(Exchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false, - properties, body); - }).Unwrap(); + + public Task PublishDirect(object message, string queueName, IBasicProperties properties) + { + return Publish(message, properties, "", queueName); } @@ -50,7 +48,7 @@ namespace Tapeti.Connection { return taskQueue.Value.Add(async () => { - (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, dependencyResolver, bindings, messageMiddleware)); + (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, dependencyResolver, bindings, messageMiddleware)); }).Unwrap(); } @@ -69,8 +67,10 @@ namespace Tapeti.Connection { var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); channel.QueueBind(dynamicQueue.QueueName, Exchange, routingKey); - } + (binding as IDynamicQueueBinding)?.SetQueueName(dynamicQueue.QueueName); + } + return dynamicQueue.QueueName; } @@ -130,6 +130,22 @@ namespace Tapeti.Connection } + private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey) + { + return taskQueue.Value.Add(async () => + { + var messageProperties = properties ?? new BasicProperties(); + if (messageProperties.Timestamp.UnixTime == 0) + messageProperties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()); + + var body = messageSerializer.Serialize(message, messageProperties); + + (await GetChannel()) + .BasicPublish(exchange, routingKey, false, messageProperties, body); + }).Unwrap(); + + } + /// /// Only call this from a task in the taskQueue to ensure IModel is only used /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. diff --git a/Default/BindingBufferStop.cs b/Default/BindingBufferStop.cs deleted file mode 100644 index 8d55a7c..0000000 --- a/Default/BindingBufferStop.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using Tapeti.Config; - -namespace Tapeti.Default -{ - // End of the line... - public class BindingBufferStop : IBindingMiddleware - { - public void Handle(IBindingContext context, Action next) - { - } - } -} diff --git a/Default/DependencyResolverBinding.cs b/Default/DependencyResolverBinding.cs index d9a8a0d..f1d61bb 100644 --- a/Default/DependencyResolverBinding.cs +++ b/Default/DependencyResolverBinding.cs @@ -6,21 +6,12 @@ namespace Tapeti.Default { public class DependencyResolverBinding : IBindingMiddleware { - private readonly IDependencyResolver resolver; - - - public DependencyResolverBinding(IDependencyResolver resolver) - { - this.resolver = resolver; - } - - public void Handle(IBindingContext context, Action next) { next(); foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType.IsClass)) - parameter.SetBinding(messageContext => resolver.Resolve(parameter.Info.ParameterType)); + parameter.SetBinding(messageContext => messageContext.DependencyResolver.Resolve(parameter.Info.ParameterType)); } } } diff --git a/Default/DefaultMessageSerializer.cs b/Default/JsonMessageSerializer.cs similarity index 90% rename from Default/DefaultMessageSerializer.cs rename to Default/JsonMessageSerializer.cs index 37fde89..7c62ce4 100644 --- a/Default/DefaultMessageSerializer.cs +++ b/Default/JsonMessageSerializer.cs @@ -8,7 +8,7 @@ using RabbitMQ.Client; namespace Tapeti.Default { - public class DefaultMessageSerializer : IMessageSerializer + public class JsonMessageSerializer : IMessageSerializer { protected const string ContentType = "application/json"; protected const string ClassTypeHeader = "classType"; @@ -18,7 +18,7 @@ namespace Tapeti.Default private readonly ConcurrentDictionary serializedTypeNames = new ConcurrentDictionary(); private readonly JsonSerializerSettings serializerSettings; - public DefaultMessageSerializer() + public JsonMessageSerializer() { serializerSettings = new JsonSerializerSettings { @@ -47,8 +47,8 @@ namespace Tapeti.Default { object typeName; - if (!properties.ContentType.Equals(ContentType)) - throw new ArgumentException("content_type must be {ContentType}"); + if (properties.ContentType == null || !properties.ContentType.Equals(ContentType)) + throw new ArgumentException($"content_type must be {ContentType}"); if (properties.Headers == null || !properties.Headers.TryGetValue(ClassTypeHeader, out typeName)) throw new ArgumentException($"{ClassTypeHeader} header not present"); diff --git a/Default/NamespaceMatchExchangeStrategy.cs b/Default/NamespaceMatchExchangeStrategy.cs new file mode 100644 index 0000000..bd8bec5 --- /dev/null +++ b/Default/NamespaceMatchExchangeStrategy.cs @@ -0,0 +1,31 @@ +using System; +using System.Text.RegularExpressions; + +namespace Tapeti.Default +{ + public class NamespaceMatchExchangeStrategy : IExchangeStrategy + { + public const string DefaultFormat = "^Messaging\\.(.[^\\.]+)"; + + private readonly Regex namespaceRegEx; + + + public NamespaceMatchExchangeStrategy(string namespaceFormat = DefaultFormat) + { + namespaceRegEx = new Regex(namespaceFormat, RegexOptions.Compiled | RegexOptions.Singleline); + } + + + public string GetExchange(Type messageType) + { + if (messageType.Namespace == null) + throw new ArgumentException($"{messageType.FullName} does not have a namespace"); + + var match = namespaceRegEx.Match(messageType.Namespace); + if (!match.Success) + throw new ArgumentException($"Namespace for {messageType.FullName} does not match the specified format"); + + return match.Groups[1].Value.ToLower(); + } + } +} diff --git a/Default/DefaultRoutingKeyStrategy.cs b/Default/TypeNameRoutingKeyStrategy.cs similarity index 96% rename from Default/DefaultRoutingKeyStrategy.cs rename to Default/TypeNameRoutingKeyStrategy.cs index 48e9260..b5e146d 100644 --- a/Default/DefaultRoutingKeyStrategy.cs +++ b/Default/TypeNameRoutingKeyStrategy.cs @@ -5,7 +5,7 @@ using System.Linq; namespace Tapeti.Default { - public class DefaultRoutingKeyStrategy : IRoutingKeyStrategy + public class TypeNameRoutingKeyStrategy : IRoutingKeyStrategy { private readonly ConcurrentDictionary routingKeyCache = new ConcurrentDictionary(); diff --git a/Helpers/MiddlewareHelper.cs b/Helpers/MiddlewareHelper.cs index 4e813b1..a3b791c 100644 --- a/Helpers/MiddlewareHelper.cs +++ b/Helpers/MiddlewareHelper.cs @@ -1,15 +1,20 @@ using System; using System.Collections.Generic; +using System.Diagnostics.Eventing.Reader; +using System.Threading.Tasks; namespace Tapeti.Helpers { public static class MiddlewareHelper { - public static void Go(IReadOnlyList middleware, Action handle) + public static void Go(IReadOnlyList middleware, Action handle, Action lastHandler) { var handlerIndex = middleware.Count - 1; if (handlerIndex == -1) + { + lastHandler(); return; + } Action handleNext = null; @@ -18,9 +23,35 @@ namespace Tapeti.Helpers handlerIndex--; if (handlerIndex >= 0) handle(middleware[handlerIndex], handleNext); + else + lastHandler(); }; handle(middleware[handlerIndex], handleNext); } + + + public static async Task GoAsync(IReadOnlyList middleware, Func, Task> handle, Func lastHandler) + { + var handlerIndex = middleware.Count - 1; + if (handlerIndex == -1) + { + await lastHandler(); + return; + } + + Func handleNext = null; + + handleNext = async () => + { + handlerIndex--; + if (handlerIndex >= 0) + await handle(middleware[handlerIndex], handleNext); + else + await lastHandler(); + }; + + await handle(middleware[handlerIndex], handleNext); + } } } diff --git a/IDependencyResolver.cs b/IDependencyResolver.cs index 6f2641b..e777f56 100644 --- a/IDependencyResolver.cs +++ b/IDependencyResolver.cs @@ -1,4 +1,5 @@ using System; +using Tapeti.Config; namespace Tapeti { @@ -9,10 +10,11 @@ namespace Tapeti } - public interface IDependencyInjector : IDependencyResolver + public interface IDependencyContainer : IDependencyResolver { void RegisterDefault() where TService : class where TImplementation : class, TService; void RegisterPublisher(Func publisher); + void RegisterConfig(IConfig config); void RegisterController(Type type); } } diff --git a/IExchangeStrategy.cs b/IExchangeStrategy.cs new file mode 100644 index 0000000..e7aaa7e --- /dev/null +++ b/IExchangeStrategy.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti +{ + public interface IExchangeStrategy + { + string GetExchange(Type messageType); + } +} diff --git a/IPublisher.cs b/IPublisher.cs index f54128f..b8b0e91 100644 --- a/IPublisher.cs +++ b/IPublisher.cs @@ -1,4 +1,5 @@ using System.Threading.Tasks; +using RabbitMQ.Client; namespace Tapeti { @@ -6,4 +7,11 @@ namespace Tapeti { Task Publish(object message); } + + + public interface IAdvancedPublisher : IPublisher + { + Task Publish(object message, IBasicProperties properties); + Task PublishDirect(object message, string queueName, IBasicProperties properties); + } } diff --git a/Tapeti.Flow/Annotations/ContinuationAttribute.cs b/Tapeti.Flow/Annotations/ContinuationAttribute.cs new file mode 100644 index 0000000..39d0914 --- /dev/null +++ b/Tapeti.Flow/Annotations/ContinuationAttribute.cs @@ -0,0 +1,8 @@ +using System; + +namespace Tapeti.Flow.Annotations +{ + public class ContinuationAttribute : Attribute + { + } +} diff --git a/Tapeti.Flow/Annotations/RequestAttribute.cs b/Tapeti.Flow/Annotations/RequestAttribute.cs new file mode 100644 index 0000000..00579ba --- /dev/null +++ b/Tapeti.Flow/Annotations/RequestAttribute.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tapeti.Flow.Annotations +{ + public class RequestAttribute : Attribute + { + public Type Response { get; set; } + } +} diff --git a/Tapeti.Flow/ContextItems.cs b/Tapeti.Flow/ContextItems.cs new file mode 100644 index 0000000..452de54 --- /dev/null +++ b/Tapeti.Flow/ContextItems.cs @@ -0,0 +1,7 @@ +namespace Tapeti.Flow +{ + public static class ContextItems + { + public const string FlowContext = "Tapeti.Flow.FlowContext"; + } +} diff --git a/Tapeti.Flow/Default/DelegateYieldPoint.cs b/Tapeti.Flow/Default/DelegateYieldPoint.cs new file mode 100644 index 0000000..105492e --- /dev/null +++ b/Tapeti.Flow/Default/DelegateYieldPoint.cs @@ -0,0 +1,33 @@ +using System; +using System.Threading.Tasks; + +namespace Tapeti.Flow.Default +{ + internal interface IExecutableYieldPoint : IYieldPoint + { + bool StoreState { get; } + + Task Execute(FlowContext context); + } + + + internal class DelegateYieldPoint : IYieldPoint + { + public bool StoreState { get; } + + private readonly Func onExecute; + + + public DelegateYieldPoint(bool storeState, Func onExecute) + { + StoreState = storeState; + this.onExecute = onExecute; + } + + + public Task Execute(FlowContext context) + { + return onExecute(context); + } + } +} diff --git a/Tapeti.Flow/Default/FlowBindingFilter.cs b/Tapeti.Flow/Default/FlowBindingFilter.cs new file mode 100644 index 0000000..a78b03f --- /dev/null +++ b/Tapeti.Flow/Default/FlowBindingFilter.cs @@ -0,0 +1,66 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Config; +using Tapeti.Flow.FlowHelpers; + +namespace Tapeti.Flow.Default +{ + public class FlowBindingFilter : IBindingFilter + { + public async Task Accept(IMessageContext context, IBinding binding) + { + var flowContext = await GetFlowContext(context); + if (flowContext?.FlowState == null) + return false; + + string continuation; + if (!flowContext.FlowState.Continuations.TryGetValue(flowContext.ContinuationID, out continuation)) + return false; + + return continuation == MethodSerializer.Serialize(binding.Method); + } + + + private static async Task GetFlowContext(IMessageContext context) + { + if (context.Items.ContainsKey(ContextItems.FlowContext)) + return (FlowContext)context.Items[ContextItems.FlowContext]; + + if (context.Properties.CorrelationId == null) + return null; + + Guid continuationID; + if (!Guid.TryParse(context.Properties.CorrelationId, out continuationID)) + return null; + + var flowStore = context.DependencyResolver.Resolve(); + + var flowStateID = await flowStore.FindFlowStateID(continuationID); + if (!flowStateID.HasValue) + return null; + + var flowStateLock = await flowStore.LockFlowState(flowStateID.Value); + if (flowStateLock == null) + return null; + + var flowState = await flowStateLock.GetFlowState(); + + + var flowMetadata = flowState != null ? Newtonsoft.Json.JsonConvert.DeserializeObject(flowState.Metadata) : null; + //var continuationMetaData = Newtonsoft.Json.JsonConvert.DeserializeObject(continuation.MetaData); + + var flowContext = new FlowContext + { + MessageContext = context, + ContinuationID = continuationID, + FlowStateLock = flowStateLock, + FlowState = flowState, + Reply = flowMetadata?.Reply + }; + + // IDisposable items in the IMessageContext are automatically disposed + context.Items.Add(ContextItems.FlowContext, flowContext); + return flowContext; + } + } +} diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs new file mode 100644 index 0000000..29b166d --- /dev/null +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -0,0 +1,44 @@ +using System; +using Tapeti.Config; + +namespace Tapeti.Flow.Default +{ + internal class FlowContext : IDisposable + { + public IMessageContext MessageContext { get; set; } + public IFlowStateLock FlowStateLock { get; set; } + public FlowState FlowState { get; set; } + public Guid ContinuationID { get; set; } + + public FlowReplyMetadata Reply { get; set; } + + + public void Dispose() + { + MessageContext?.Dispose(); + FlowStateLock?.Dispose(); + } + } + + + internal class FlowReplyMetadata + { + public string ReplyTo { get; set; } + public string CorrelationId { get; set; } + public string ResponseTypeName { get; set; } + } + + + internal class FlowMetadata + { + public string ControllerTypeName { get; set; } + public FlowReplyMetadata Reply { get; set; } + } + + + internal class ContinuationMetadata + { + public string MethodName { get; set; } + public string ConvergeMethodName { get; set; } + } +} diff --git a/Tapeti.Flow/Default/FlowMessageMiddleware.cs b/Tapeti.Flow/Default/FlowMessageMiddleware.cs new file mode 100644 index 0000000..bed6cb1 --- /dev/null +++ b/Tapeti.Flow/Default/FlowMessageMiddleware.cs @@ -0,0 +1,25 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Config; +using Tapeti.Flow.FlowHelpers; + +namespace Tapeti.Flow.Default +{ + public class FlowMessageMiddleware : IMessageMiddleware + { + public async Task Handle(IMessageContext context, Func next) + { + var flowContext = (FlowContext)context.Items[ContextItems.FlowContext]; + if (flowContext != null) + { + Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller); + + await next(); + + flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); + } + else + await next(); + } + } +} diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs new file mode 100644 index 0000000..a02e7a8 --- /dev/null +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -0,0 +1,262 @@ +using System; +using System.Reflection; +using System.Threading.Tasks; +using RabbitMQ.Client.Framing; +using Tapeti.Config; +using Tapeti.Flow.Annotations; +using Tapeti.Flow.FlowHelpers; + +namespace Tapeti.Flow.Default +{ + public class FlowProvider : IFlowProvider, IFlowHandler + { + private readonly IConfig config; + private readonly IAdvancedPublisher publisher; + + + public FlowProvider(IConfig config, IPublisher publisher) + { + this.config = config; + this.publisher = (IAdvancedPublisher)publisher; + } + + + public IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo)); + } + + public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) + { + var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo)); + } + + public IFlowParallelRequestBuilder YieldWithParallelRequest() + { + throw new NotImplementedException(); + //return new ParallelRequestBuilder(); + } + + public IYieldPoint EndWithResponse(TResponse message) + { + return new DelegateYieldPoint(false, context => SendResponse(context, message)); + } + + public IYieldPoint End() + { + return new DelegateYieldPoint(false, EndFlow); + } + + + private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo) + { + var continuationID = Guid.NewGuid(); + + context.FlowState.Continuations.Add(continuationID, + Newtonsoft.Json.JsonConvert.SerializeObject(new ContinuationMetadata + { + MethodName = responseHandlerInfo.MethodName, + ConvergeMethodName = null + })); + + var properties = new BasicProperties + { + CorrelationId = continuationID.ToString(), + ReplyTo = responseHandlerInfo.ReplyToQueue + }; + + await publisher.Publish(message, properties); + } + + + private async Task SendResponse(FlowContext context, object message) + { + if (context.Reply == null) + throw new InvalidOperationException("No response is required"); + + if (message.GetType().FullName != context.Reply.ResponseTypeName) + throw new InvalidOperationException($"Flow must end with a response message of type {context.Reply.ResponseTypeName}, {message.GetType().FullName} was returned instead"); + + var properties = new BasicProperties + { + CorrelationId = context.Reply.CorrelationId + }; + + await publisher.PublishDirect(message, context.Reply.ReplyTo, properties); + } + + + private static Task EndFlow(FlowContext context) + { + if (context.Reply != null) + throw new InvalidOperationException($"Flow must end with a response message of type {context.Reply.ResponseTypeName}"); + + return Task.CompletedTask; + } + + + private static ResponseHandlerInfo GetResponseHandlerInfo(IConfig config, object request, Delegate responseHandler) + { + var binding = config.GetBinding(responseHandler); + if (binding == null) + throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler)); + + var requestAttribute = request.GetType().GetCustomAttribute(); + if (requestAttribute?.Response != null && requestAttribute.Response != binding.MessageClass) + throw new ArgumentException($"responseHandler must accept message of type {binding.MessageClass}", nameof(responseHandler)); + + return new ResponseHandlerInfo + { + MethodName = MethodSerializer.Serialize(responseHandler.Method), + ReplyToQueue = binding.QueueName + }; + } + + + public async Task Execute(IMessageContext context, IYieldPoint yieldPoint) + { + var flowContext = (FlowContext)context.Items[ContextItems.FlowContext]; + if (flowContext == null) + return; + + var delegateYieldPoint = (DelegateYieldPoint)yieldPoint; + await delegateYieldPoint.Execute(flowContext); + + if (delegateYieldPoint.StoreState) + { + flowContext.FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(context.Controller); + await flowContext.FlowStateLock.StoreFlowState(flowContext.FlowState); + } + else + { + await flowContext.FlowStateLock.DeleteFlowState(); + } + } + + + /* + private class ParallelRequestBuilder : IFlowParallelRequestBuilder + { + internal class RequestInfo + { + public object Message { get; set; } + public ResponseHandlerInfo ResponseHandlerInfo { get; set; } + } + + + private readonly IConfig config; + private readonly IFlowStore flowStore; + private readonly Func sendRequest; + private readonly List requests = new List(); + + + public ParallelRequestBuilder(IConfig config, IFlowStore flowStore, Func sendRequest) + { + this.config = config; + this.flowStore = flowStore; + this.sendRequest = sendRequest; + } + + + public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) + { + requests.Add(new RequestInfo + { + Message = message, + ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) + }); + + return this; + } + + + public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) + { + requests.Add(new RequestInfo + { + Message = message, + ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) + }); + + return this; + } + + + public IYieldPoint Yield(Func> continuation) + { + return new YieldPoint(flowStore, true, context => Task.WhenAll(requests.Select(requestInfo => sendRequest(context, requestInfo.Message, requestInfo.ResponseHandlerInfo)))); + } + + + public IYieldPoint Yield(Func continuation) + { + return new YieldPoint(flowStore, true, context => Task.WhenAll(requests.Select(requestInfo => sendRequest(context, requestInfo.Message, requestInfo.ResponseHandlerInfo)))); + } + }*/ + + + internal class ResponseHandlerInfo + { + public string MethodName { get; set; } + public string ReplyToQueue { get; set; } + } + + + + /* + * Handle response (correlationId known) + internal async Task HandleMessage(object message, string correlationID) + { + var continuationID = Guid.Parse(correlationID); + var flowStateID = await owner.flowStore.FindFlowStateID(continuationID); + + if (!flowStateID.HasValue) + return; + + using (flowStateLock = await owner.flowStore.LockFlowState(flowStateID.Value)) + { + flowState = await flowStateLock.GetFlowState(); + + continuation = flowState.Continuations[continuationID]; + if (continuation != null) + await HandleContinuation(message); + } + } + + private async Task HandleContinuation(object message) + { + var flowMetaData = Newtonsoft.Json.JsonConvert.DeserializeObject(flowState.MetaData); + var continuationMetaData = + Newtonsoft.Json.JsonConvert.DeserializeObject(continuation.MetaData); + + reply = flowMetaData.Reply; + controllerType = owner.GetControllerType(flowMetaData.ControllerTypeName); + method = controllerType.GetMethod(continuationMetaData.MethodName); + + controller = owner.container.GetInstance(controllerType); + + Newtonsoft.Json.JsonConvert.PopulateObject(flowState.Data, controller); + + var yieldPoint = (AbstractYieldPoint) await owner.CallFlowController(controller, method, message); + + await yieldPoint.Execute(this); + + if (yieldPoint.Store) + { + flowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(controller); + flowState.Continuations.Remove(continuation); + + await flowStateLock.StoreFlowState(flowState); + } + else + { + await flowStateLock.DeleteFlowState(); + } + } + + } + */ + } +} diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs new file mode 100644 index 0000000..d312933 --- /dev/null +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -0,0 +1,186 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Tapeti.Flow +{ + public class FlowStore : IFlowStore + { + private readonly IFlowRepository repository; + private readonly ConcurrentDictionary flowStates = new ConcurrentDictionary(); + private readonly ConcurrentDictionary continuationLookup = new ConcurrentDictionary(); + + + public FlowStore(IFlowRepository repository) + { + this.repository = repository; + } + + + public async Task Load() + { + flowStates.Clear(); + continuationLookup.Clear(); + + foreach (var state in await repository.GetAllStates()) + { + flowStates.GetOrAdd(state.FlowID, new FlowState + { + Metadata = state.Metadata, + Data = state.Data, + Continuations = state.Continuations + }); + + foreach (var continuation in state.Continuations) + continuationLookup.GetOrAdd(continuation.Key, state.FlowID); + } + } + + + public Task FindFlowStateID(Guid continuationID) + { + Guid result; + return Task.FromResult(continuationLookup.TryGetValue(continuationID, out result) ? result : (Guid?)null); + } + + + public async Task LockFlowState(Guid flowStateID) + { + var isNew = false; + var flowState = flowStates.GetOrAdd(flowStateID, id => + { + isNew = true; + return new FlowState(); + }); + + var result = new FlowStateLock(this, flowState, flowStateID, isNew); + await result.Lock(); + + return result; + } + + + private class FlowStateLock : IFlowStateLock + { + private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1); + + private readonly FlowStore owner; + private readonly FlowState flowState; + private readonly Guid flowID; + private bool isNew; + private bool isDisposed; + + + public FlowStateLock(FlowStore owner, FlowState flowState, Guid flowID, bool isNew) + { + this.owner = owner; + this.flowState = flowState; + this.flowID = flowID; + this.isNew = isNew; + } + + + public Task Lock() + { + return semaphore.WaitAsync(); + } + + + public void Dispose() + { + lock (flowState) + { + if (!isDisposed) + { + semaphore.Release(); + semaphore.Dispose(); + } + + isDisposed = true; + } + } + + public Guid FlowStateID => flowID; + + public Task GetFlowState() + { + lock (flowState) + { + if (isDisposed) + throw new ObjectDisposedException("FlowStateLock"); + + return Task.FromResult(new FlowState + { + Data = flowState.Data, + Metadata = flowState.Metadata, + Continuations = flowState.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value) + }); + } + } + + public async Task StoreFlowState(FlowState newFlowState) + { + lock (flowState) + { + if (isDisposed) + throw new ObjectDisposedException("FlowStateLock"); + + foreach ( + var removedContinuation in + flowState.Continuations.Keys.Where( + k => !newFlowState.Continuations.ContainsKey(k))) + { + Guid removedValue; + owner.continuationLookup.TryRemove(removedContinuation, out removedValue); + } + + foreach ( + var addedContinuation in + newFlowState.Continuations.Where( + c => !flowState.Continuations.ContainsKey(c.Key))) + { + owner.continuationLookup.TryAdd(addedContinuation.Key, flowID); + } + + flowState.Metadata = newFlowState.Metadata; + flowState.Data = newFlowState.Data; + flowState.Continuations = newFlowState.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value); + } + if (isNew) + { + isNew = false; + var now = DateTime.UtcNow; + await + owner.repository.CreateState(flowID, now, flowState.Metadata, flowState.Data, flowState.Continuations); + } + else + { + await owner.repository.UpdateState(flowID, flowState.Metadata, flowState.Data, flowState.Continuations); + } + } + + public async Task DeleteFlowState() + { + lock (flowState) + { + if (isDisposed) + throw new ObjectDisposedException("FlowStateLock"); + + foreach (var removedContinuation in flowState.Continuations.Keys) + { + Guid removedValue; + owner.continuationLookup.TryRemove(removedContinuation, out removedValue); + } + FlowState removedFlow; + owner.flowStates.TryRemove(flowID, out removedFlow); + } + if (!isNew) + await owner.repository.DeleteState(flowID); + } + + } + } +} diff --git a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs new file mode 100644 index 0000000..9b6aacc --- /dev/null +++ b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs @@ -0,0 +1,30 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Tapeti.Flow.Default +{ + public class NonPersistentFlowRepository : IFlowRepository + { + public Task> GetAllStates() + { + return Task.FromResult(new List().AsQueryable()); + } + + public Task CreateState(Guid flowID, DateTime timestamp, string metadata, string data, IDictionary continuations) + { + return Task.CompletedTask; + } + + public Task UpdateState(Guid flowID, string metadata, string data, IDictionary continuations) + { + return Task.CompletedTask; + } + + public Task DeleteState(Guid flowID) + { + return Task.CompletedTask; + } + } +} diff --git a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs b/Tapeti.Flow/FlowHelpers/MethodSerializer.cs new file mode 100644 index 0000000..5635249 --- /dev/null +++ b/Tapeti.Flow/FlowHelpers/MethodSerializer.cs @@ -0,0 +1,12 @@ +using System.Reflection; + +namespace Tapeti.Flow.FlowHelpers +{ + public static class MethodSerializer + { + public static string Serialize(MethodInfo method) + { + return method.Name + '@' + method.DeclaringType?.Assembly.GetName().Name + ':' + method.DeclaringType?.FullName; + } + } +} diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowMiddleware.cs new file mode 100644 index 0000000..b78574e --- /dev/null +++ b/Tapeti.Flow/FlowMiddleware.cs @@ -0,0 +1,77 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Threading.Tasks; +using Tapeti.Config; +using Tapeti.Flow.Annotations; +using Tapeti.Flow.Default; + +namespace Tapeti.Flow +{ + public class FlowMiddleware : IMiddlewareBundle + { + public IEnumerable GetContents(IDependencyResolver dependencyResolver) + { + var container = dependencyResolver as IDependencyContainer; + if (container != null) + { + container.RegisterDefault(); + container.RegisterDefault(); + // TODO singleton + container.RegisterDefault(); + container.RegisterDefault(); + } + + return new[] { new FlowBindingMiddleware() }; + } + + + internal class FlowBindingMiddleware : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + HandleContinuationFilter(context); + HandleYieldPointResult(context); + + next(); + } + + + private static void HandleContinuationFilter(IBindingContext context) + { + var continuationAttribute = context.Method.GetCustomAttribute(); + if (continuationAttribute != null) + { + context.Use(new FlowBindingFilter()); + context.Use(new FlowMessageMiddleware()); + } + } + + + private static void HandleYieldPointResult(IBindingContext context) + { + if (context.Result.Info.ParameterType == typeof(IYieldPoint)) + context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value)); + + else if (context.Result.Info.ParameterType == typeof(Task<>)) + { + var genericArguments = context.Result.Info.ParameterType.GetGenericArguments(); + if (genericArguments.Length == 1 && genericArguments[0] == typeof(IYieldPoint)) + context.Result.SetHandler(async (messageContext, value) => + { + var yieldPoint = await (Task)value; + if (yieldPoint != null) + await HandleYieldPoint(messageContext, yieldPoint); + }); + } + } + + + private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint) + { + var flowHandler = context.DependencyResolver.Resolve(); + return flowHandler.Execute(context, yieldPoint); + } + } + } +} diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs new file mode 100644 index 0000000..0b619e3 --- /dev/null +++ b/Tapeti.Flow/IFlowProvider.cs @@ -0,0 +1,39 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Flow +{ + public interface IFlowProvider + { + IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler); + + // One does not simply overload methods with Task vs non-Task Funcs. "Ambiguous call". + // Apparantly this is because a return type of a method is not part of its signature, + // according to: http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter + IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler); + + IFlowParallelRequestBuilder YieldWithParallelRequest(); + + IYieldPoint EndWithResponse(TResponse message); + IYieldPoint End(); + } + + public interface IFlowHandler + { + Task Execute(IMessageContext context, IYieldPoint yieldPoint); + } + + public interface IFlowParallelRequestBuilder + { + IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler); + + IYieldPoint Yield(Func> continuation); + IYieldPoint Yield(Func continuation); + } + + public interface IYieldPoint + { + } +} diff --git a/Tapeti.Flow/IFlowRepository.cs b/Tapeti.Flow/IFlowRepository.cs new file mode 100644 index 0000000..3408a22 --- /dev/null +++ b/Tapeti.Flow/IFlowRepository.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Tapeti.Flow +{ + public interface IFlowRepository + { + Task> GetAllStates(); + Task CreateState(Guid flowID, DateTime timestamp, string metadata, string data, IDictionary continuations); + Task UpdateState(Guid flowID, string metadata, string data, IDictionary continuations); + Task DeleteState(Guid flowID); + } + + + public class FlowStateRecord + { + public Guid FlowID; + public string Metadata; + public string Data; + public Dictionary Continuations; + } +} diff --git a/Tapeti.Flow/IFlowStore.cs b/Tapeti.Flow/IFlowStore.cs new file mode 100644 index 0000000..f1f9530 --- /dev/null +++ b/Tapeti.Flow/IFlowStore.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Tapeti.Flow +{ + public interface IFlowStore + { + Task Load(); + Task FindFlowStateID(Guid continuationID); + Task LockFlowState(Guid flowStateID); + } + + public interface IFlowStateLock : IDisposable + { + Guid FlowStateID { get; } + Task GetFlowState(); + Task StoreFlowState(FlowState flowState); + Task DeleteFlowState(); + } + + public class FlowState + { + public string Metadata { get; set; } + public string Data { get; set; } + public Dictionary Continuations { get; set; } + } +} diff --git a/Tapeti.Saga/Properties/AssemblyInfo.cs b/Tapeti.Flow/Properties/AssemblyInfo.cs similarity index 85% rename from Tapeti.Saga/Properties/AssemblyInfo.cs rename to Tapeti.Flow/Properties/AssemblyInfo.cs index 785eee1..7158452 100644 --- a/Tapeti.Saga/Properties/AssemblyInfo.cs +++ b/Tapeti.Flow/Properties/AssemblyInfo.cs @@ -5,12 +5,12 @@ using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following // set of attributes. Change these attribute values to modify the information // associated with an assembly. -[assembly: AssemblyTitle("Tapeti.Saga")] +[assembly: AssemblyTitle("Tapeti.Flow")] [assembly: AssemblyDescription("")] [assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("Hewlett-Packard Company")] -[assembly: AssemblyProduct("Tapeti.Saga")] -[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Tapeti.Flow")] +[assembly: AssemblyCopyright("")] [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] diff --git a/Tapeti.Saga/Tapeti.Saga.csproj b/Tapeti.Flow/Tapeti.Flow.csproj similarity index 62% rename from Tapeti.Saga/Tapeti.Saga.csproj rename to Tapeti.Flow/Tapeti.Flow.csproj index 3de0a04..5ee20bc 100644 --- a/Tapeti.Saga/Tapeti.Saga.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -7,8 +7,8 @@ {F84AD920-D5A1-455D-AED5-2542B3A47B85} Library Properties - Tapeti.Saga - Tapeti.Saga + Tapeti.Flow + Tapeti.Flow v4.6.1 512 @@ -31,6 +31,14 @@ 4 + + ..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll + True + + + ..\packages\RabbitMQ.Client.4.1.1\lib\net451\RabbitMQ.Client.dll + True + @@ -41,15 +49,22 @@ - - - + + + + + + + + + + + + + + + - - - - - @@ -57,6 +72,9 @@ Tapeti + + +