From a7b1ea85e518f94d7152f92981e72495ef62b9a9 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Sun, 5 Feb 2017 23:22:34 +0100 Subject: [PATCH] First working prototype of the flow port --- .gitignore | 1 + Config/IConfig.cs | 7 +- Config/IMessageContext.cs | 6 + Connection/TapetiConsumer.cs | 48 ++--- Connection/TapetiPublisher.cs | 2 +- Connection/TapetiWorker.cs | 10 +- Default/NamespaceMatchExchangeStrategy.cs | 4 +- Default/PublishResultBinding.cs | 53 ++++++ Helpers/MiddlewareHelper.cs | 1 - Helpers/TaskTypeHelper.cs | 28 +++ IDependencyResolver.cs | 9 +- IPublisher.cs | 4 +- Properties/AssemblyInfo.cs | 4 +- Tapeti.Flow/ConfigExtensions.cs | 11 ++ Tapeti.Flow/Default/FlowBindingFilter.cs | 26 ++- Tapeti.Flow/Default/FlowContext.cs | 29 +-- Tapeti.Flow/Default/FlowMessageMiddleware.cs | 1 - Tapeti.Flow/Default/FlowProvider.cs | 165 ++++++++++-------- Tapeti.Flow/Default/FlowState.cs | 95 ++++++++++ Tapeti.Flow/Default/FlowStore.cs | 102 ++++++----- .../Default/NonPersistentFlowRepository.cs | 6 +- Tapeti.Flow/FlowMiddleware.cs | 47 ++--- Tapeti.Flow/IFlowRepository.cs | 8 +- Tapeti.Flow/IFlowStore.cs | 15 +- Tapeti.Flow/Properties/AssemblyInfo.cs | 1 - Tapeti.Flow/Tapeti.Flow.csproj | 3 + Tapeti.Flow/YieldPointException.cs | 10 ++ .../SimpleInjectorDependencyResolver.cs | 36 ++-- Tapeti.csproj | 2 + Tapeti.sln.DotSettings.user | 5 - TapetiConfig.cs | 85 +++++---- TapetiConnection.cs | 4 +- TapetiConnectionParams.cs | 7 + Test/MarcoController.cs | 38 ++-- Test/MarcoEmitter.cs | 29 ++- Test/Program.cs | 14 +- Test/Properties/AssemblyInfo.cs | 4 +- Test/Visualizer.cs | 4 +- 38 files changed, 601 insertions(+), 323 deletions(-) create mode 100644 Default/PublishResultBinding.cs create mode 100644 Helpers/TaskTypeHelper.cs create mode 100644 Tapeti.Flow/ConfigExtensions.cs create mode 100644 Tapeti.Flow/Default/FlowState.cs create mode 100644 Tapeti.Flow/YieldPointException.cs delete mode 100644 Tapeti.sln.DotSettings.user diff --git a/.gitignore b/.gitignore index e758713..193e5b0 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ bin/ obj/ packages/ +*.user diff --git a/Config/IConfig.cs b/Config/IConfig.cs index bdbbb67..c04fedf 100644 --- a/Config/IConfig.cs +++ b/Config/IConfig.cs @@ -7,7 +7,7 @@ namespace Tapeti.Config { public interface IConfig { - string Exchange { get; } + string SubscribeExchange { get; } IDependencyResolver DependencyResolver { get; } IReadOnlyList MessageMiddleware { get; } IEnumerable Queues { get; } @@ -33,9 +33,10 @@ namespace Tapeti.Config string QueueName { get; } IReadOnlyList MessageMiddleware { get; } + IReadOnlyList BindingFilters { get; } - bool Accept(object message); - Task Invoke(IMessageContext context, object message); + Task Accept(IMessageContext context, object message); + Task Invoke(IMessageContext context, object message); } diff --git a/Config/IMessageContext.cs b/Config/IMessageContext.cs index a602a86..bdda93b 100644 --- a/Config/IMessageContext.cs +++ b/Config/IMessageContext.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Reflection; using RabbitMQ.Client; namespace Tapeti.Config @@ -19,5 +20,10 @@ namespace Tapeti.Config /// Controller will be null when passed to an IBindingFilter /// object Controller { get; } + + /// + /// Binding will be null when passed to an IBindingFilter + /// + IBinding Binding { get; } } } diff --git a/Connection/TapetiConsumer.cs b/Connection/TapetiConsumer.cs index ea4f6e9..b8b7065 100644 --- a/Connection/TapetiConsumer.cs +++ b/Connection/TapetiConsumer.cs @@ -36,37 +36,36 @@ namespace Tapeti.Connection throw new ArgumentException("Empty message"); var validMessageType = false; - foreach (var binding in bindings.Where(b => b.Accept(message))) + + using (var context = new MessageContext { - using (var context = new MessageContext - { - DependencyResolver = dependencyResolver, - Controller = dependencyResolver.Resolve(binding.Controller), - Queue = queueName, - RoutingKey = routingKey, - Message = message, - Properties = properties - }) + DependencyResolver = dependencyResolver, + Queue = queueName, + RoutingKey = routingKey, + Message = message, + Properties = properties + }) + { + foreach (var binding in bindings) { + if (!binding.Accept(context, message).Result) + continue; + + context.Controller = dependencyResolver.Resolve(binding.Controller); + context.Binding = binding; + // ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas MiddlewareHelper.GoAsync( binding.MessageMiddleware != null ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() : messageMiddleware, async (handler, next) => await handler.Handle(context, next), - async () => - { - var result = binding.Invoke(context, message).Result; - - // TODO change to result handler - if (result != null) - await worker.Publish(result, null); - } + () => binding.Invoke(context, message) ).Wait(); // ReSharper restore AccessToDisposedClosure - } - validMessageType = true; + validMessageType = true; + } } if (!validMessageType) @@ -74,9 +73,15 @@ namespace Tapeti.Connection worker.Respond(deliveryTag, ConsumeResponse.Ack); } - catch (Exception) + catch (Exception e) { + // TODO allow different exception handling depending on exception type worker.Respond(deliveryTag, ConsumeResponse.Requeue); + + var aggregateException = e as AggregateException; + if (aggregateException != null && aggregateException.InnerExceptions.Count == 1) + throw aggregateException.InnerExceptions[0]; + throw; } } @@ -87,6 +92,7 @@ namespace Tapeti.Connection public IDependencyResolver DependencyResolver { get; set; } public object Controller { get; set; } + public IBinding Binding { get; set; } public string Queue { get; set; } public string RoutingKey { get; set; } diff --git a/Connection/TapetiPublisher.cs b/Connection/TapetiPublisher.cs index db7b3cd..8f8c742 100644 --- a/Connection/TapetiPublisher.cs +++ b/Connection/TapetiPublisher.cs @@ -4,7 +4,7 @@ using RabbitMQ.Client; namespace Tapeti.Connection { - public class TapetiPublisher : IAdvancedPublisher + public class TapetiPublisher : IInternalPublisher { private readonly Func workerFactory; diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index 5f0adae..b3778b9 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -12,7 +12,7 @@ namespace Tapeti.Connection public class TapetiWorker { public TapetiConnectionParams ConnectionParams { get; set; } - public string Exchange { get; set; } + public string SubscribeExchange { get; set; } private readonly IDependencyResolver dependencyResolver; private readonly IReadOnlyList messageMiddleware; @@ -34,7 +34,8 @@ namespace Tapeti.Connection public Task Publish(object message, IBasicProperties properties) { - return Publish(message, properties, Exchange, routingKeyStrategy.GetRoutingKey(message.GetType())); + // TODO use exchange strategy! + return Publish(message, properties, SubscribeExchange, routingKeyStrategy.GetRoutingKey(message.GetType())); } @@ -66,7 +67,7 @@ namespace Tapeti.Connection foreach (var binding in queue.Bindings) { var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); - channel.QueueBind(dynamicQueue.QueueName, Exchange, routingKey); + channel.QueueBind(dynamicQueue.QueueName, SubscribeExchange, routingKey); (binding as IDynamicQueueBinding)?.SetQueueName(dynamicQueue.QueueName); } @@ -173,6 +174,9 @@ namespace Tapeti.Connection connection = connectionFactory.CreateConnection(); channelInstance = connection.CreateModel(); + if (ConnectionParams.PrefetchCount > 0) + channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false); + break; } catch (BrokerUnreachableException) diff --git a/Default/NamespaceMatchExchangeStrategy.cs b/Default/NamespaceMatchExchangeStrategy.cs index bd8bec5..27498dd 100644 --- a/Default/NamespaceMatchExchangeStrategy.cs +++ b/Default/NamespaceMatchExchangeStrategy.cs @@ -10,9 +10,9 @@ namespace Tapeti.Default private readonly Regex namespaceRegEx; - public NamespaceMatchExchangeStrategy(string namespaceFormat = DefaultFormat) + public NamespaceMatchExchangeStrategy() { - namespaceRegEx = new Regex(namespaceFormat, RegexOptions.Compiled | RegexOptions.Singleline); + namespaceRegEx = new Regex(DefaultFormat, RegexOptions.Compiled | RegexOptions.Singleline); } diff --git a/Default/PublishResultBinding.cs b/Default/PublishResultBinding.cs new file mode 100644 index 0000000..9a76dc3 --- /dev/null +++ b/Default/PublishResultBinding.cs @@ -0,0 +1,53 @@ +using System; +using System.Threading.Tasks; +using RabbitMQ.Client.Framing; +using Tapeti.Config; +using Tapeti.Helpers; + +namespace Tapeti.Default +{ + public class PublishResultBinding : IBindingMiddleware + { + public void Handle(IBindingContext context, Action next) + { + next(); + + if (context.Result.HasHandler) + return; + + bool isTask; + if (context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out isTask)) + { + if (isTask) + { + context.Result.SetHandler(async (messageContext, value) => + { + var message = await (Task)value; + if (message != null) + await Reply(message, messageContext); + }); + } + else + context.Result.SetHandler((messageContext, value) => + value == null ? null : Reply(value, messageContext)); + } + } + + + private Task Reply(object message, IMessageContext messageContext) + { + var publisher = (IInternalPublisher)messageContext.DependencyResolver.Resolve(); + var properties = new BasicProperties(); + + // Only set the property if it's not null, otherwise a string reference exception can occur: + // http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html + if (messageContext.Properties.IsCorrelationIdPresent()) + properties.CorrelationId = messageContext.Properties.CorrelationId; + + if (messageContext.Properties.IsReplyToPresent()) + return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties); + + return publisher.Publish(message, properties); + } + } +} diff --git a/Helpers/MiddlewareHelper.cs b/Helpers/MiddlewareHelper.cs index a3b791c..edac856 100644 --- a/Helpers/MiddlewareHelper.cs +++ b/Helpers/MiddlewareHelper.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Diagnostics.Eventing.Reader; using System.Threading.Tasks; namespace Tapeti.Helpers diff --git a/Helpers/TaskTypeHelper.cs b/Helpers/TaskTypeHelper.cs new file mode 100644 index 0000000..73012d7 --- /dev/null +++ b/Helpers/TaskTypeHelper.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading.Tasks; + +namespace Tapeti.Helpers +{ + public static class TaskTypeHelper + { + public static bool IsTypeOrTaskOf(this Type type, Func predicate, out bool isTask) + { + if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Task<>)) + { + isTask = true; + + var genericArguments = type.GetGenericArguments(); + return genericArguments.Length == 1 && predicate(genericArguments[0]); + } + + isTask = false; + return predicate(type); + } + + + public static bool IsTypeOrTaskOf(this Type type, Type compareTo, out bool isTask) + { + return IsTypeOrTaskOf(type, t => t == compareTo, out isTask); + } + } +} diff --git a/IDependencyResolver.cs b/IDependencyResolver.cs index e777f56..f7a67eb 100644 --- a/IDependencyResolver.cs +++ b/IDependencyResolver.cs @@ -1,5 +1,4 @@ using System; -using Tapeti.Config; namespace Tapeti { @@ -13,8 +12,12 @@ namespace Tapeti public interface IDependencyContainer : IDependencyResolver { void RegisterDefault() where TService : class where TImplementation : class, TService; - void RegisterPublisher(Func publisher); - void RegisterConfig(IConfig config); + void RegisterDefault(Func factory) where TService : class; + + void RegisterDefaultSingleton() where TService : class where TImplementation : class, TService; + void RegisterDefaultSingleton(TService instance) where TService : class; + void RegisterDefaultSingleton(Func factory) where TService : class; + void RegisterController(Type type); } } diff --git a/IPublisher.cs b/IPublisher.cs index b8b0e91..f1bf689 100644 --- a/IPublisher.cs +++ b/IPublisher.cs @@ -3,13 +3,15 @@ using RabbitMQ.Client; namespace Tapeti { + // Note: Tapeti assumes every implementation of IPublisher can also be cast to an IInternalPublisher. + // The distinction is made on purpose to trigger code-smells in non-Tapeti code when casting. public interface IPublisher { Task Publish(object message); } - public interface IAdvancedPublisher : IPublisher + public interface IInternalPublisher : IPublisher { Task Publish(object message, IBasicProperties properties); Task PublishDirect(object message, string queueName, IBasicProperties properties); diff --git a/Properties/AssemblyInfo.cs b/Properties/AssemblyInfo.cs index 3428fd6..1336875 100644 --- a/Properties/AssemblyInfo.cs +++ b/Properties/AssemblyInfo.cs @@ -7,9 +7,9 @@ using System.Runtime.InteropServices; [assembly: AssemblyTitle("Tapeti")] [assembly: AssemblyDescription("")] [assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("Hewlett-Packard Company")] +[assembly: AssemblyCompany("")] [assembly: AssemblyProduct("Tapeti")] -[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] +[assembly: AssemblyCopyright("")] [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] diff --git a/Tapeti.Flow/ConfigExtensions.cs b/Tapeti.Flow/ConfigExtensions.cs new file mode 100644 index 0000000..ffaad41 --- /dev/null +++ b/Tapeti.Flow/ConfigExtensions.cs @@ -0,0 +1,11 @@ +namespace Tapeti.Flow +{ + public static class ConfigExtensions + { + public static TapetiConfig WithFlow(this TapetiConfig config) + { + config.Use(new FlowMiddleware()); + return config; + } + } +} diff --git a/Tapeti.Flow/Default/FlowBindingFilter.cs b/Tapeti.Flow/Default/FlowBindingFilter.cs index a78b03f..727e773 100644 --- a/Tapeti.Flow/Default/FlowBindingFilter.cs +++ b/Tapeti.Flow/Default/FlowBindingFilter.cs @@ -10,14 +10,10 @@ namespace Tapeti.Flow.Default public async Task Accept(IMessageContext context, IBinding binding) { var flowContext = await GetFlowContext(context); - if (flowContext?.FlowState == null) + if (flowContext?.ContinuationMetadata == null) return false; - string continuation; - if (!flowContext.FlowState.Continuations.TryGetValue(flowContext.ContinuationID, out continuation)) - return false; - - return continuation == MethodSerializer.Serialize(binding.Method); + return flowContext.ContinuationMetadata.MethodName == MethodSerializer.Serialize(binding.Method); } @@ -35,27 +31,29 @@ namespace Tapeti.Flow.Default var flowStore = context.DependencyResolver.Resolve(); - var flowStateID = await flowStore.FindFlowStateID(continuationID); - if (!flowStateID.HasValue) + var flowID = await flowStore.FindFlowID(continuationID); + if (!flowID.HasValue) return null; - var flowStateLock = await flowStore.LockFlowState(flowStateID.Value); + var flowStateLock = await flowStore.LockFlowState(flowID.Value); if (flowStateLock == null) return null; var flowState = await flowStateLock.GetFlowState(); + if (flowState == null) + return null; - - var flowMetadata = flowState != null ? Newtonsoft.Json.JsonConvert.DeserializeObject(flowState.Metadata) : null; - //var continuationMetaData = Newtonsoft.Json.JsonConvert.DeserializeObject(continuation.MetaData); + ContinuationMetadata continuation; var flowContext = new FlowContext { MessageContext = context, - ContinuationID = continuationID, + FlowStateLock = flowStateLock, FlowState = flowState, - Reply = flowMetadata?.Reply + + ContinuationID = continuationID, + ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out continuation) ? continuation : null }; // IDisposable items in the IMessageContext are automatically disposed diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs index 29b166d..dac86f6 100644 --- a/Tapeti.Flow/Default/FlowContext.cs +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using Tapeti.Config; namespace Tapeti.Flow.Default @@ -8,37 +9,13 @@ namespace Tapeti.Flow.Default public IMessageContext MessageContext { get; set; } public IFlowStateLock FlowStateLock { get; set; } public FlowState FlowState { get; set; } + public Guid ContinuationID { get; set; } - - public FlowReplyMetadata Reply { get; set; } - + public ContinuationMetadata ContinuationMetadata { get; set; } public void Dispose() { - MessageContext?.Dispose(); FlowStateLock?.Dispose(); } } - - - internal class FlowReplyMetadata - { - public string ReplyTo { get; set; } - public string CorrelationId { get; set; } - public string ResponseTypeName { get; set; } - } - - - internal class FlowMetadata - { - public string ControllerTypeName { get; set; } - public FlowReplyMetadata Reply { get; set; } - } - - - internal class ContinuationMetadata - { - public string MethodName { get; set; } - public string ConvergeMethodName { get; set; } - } } diff --git a/Tapeti.Flow/Default/FlowMessageMiddleware.cs b/Tapeti.Flow/Default/FlowMessageMiddleware.cs index bed6cb1..85f0925 100644 --- a/Tapeti.Flow/Default/FlowMessageMiddleware.cs +++ b/Tapeti.Flow/Default/FlowMessageMiddleware.cs @@ -1,7 +1,6 @@ using System; using System.Threading.Tasks; using Tapeti.Config; -using Tapeti.Flow.FlowHelpers; namespace Tapeti.Flow.Default { diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index a02e7a8..232d11a 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Reflection; using System.Threading.Tasks; using RabbitMQ.Client.Framing; @@ -11,13 +12,13 @@ namespace Tapeti.Flow.Default public class FlowProvider : IFlowProvider, IFlowHandler { private readonly IConfig config; - private readonly IAdvancedPublisher publisher; + private readonly IInternalPublisher publisher; public FlowProvider(IConfig config, IPublisher publisher) { this.config = config; - this.publisher = (IAdvancedPublisher)publisher; + this.publisher = (IInternalPublisher)publisher; } @@ -55,11 +56,11 @@ namespace Tapeti.Flow.Default var continuationID = Guid.NewGuid(); context.FlowState.Continuations.Add(continuationID, - Newtonsoft.Json.JsonConvert.SerializeObject(new ContinuationMetadata + new ContinuationMetadata { MethodName = responseHandlerInfo.MethodName, ConvergeMethodName = null - })); + }); var properties = new BasicProperties { @@ -73,25 +74,32 @@ namespace Tapeti.Flow.Default private async Task SendResponse(FlowContext context, object message) { - if (context.Reply == null) - throw new InvalidOperationException("No response is required"); + var reply = context.FlowState.Metadata.Reply; + if (reply == null) + throw new YieldPointException("No response is required"); - if (message.GetType().FullName != context.Reply.ResponseTypeName) - throw new InvalidOperationException($"Flow must end with a response message of type {context.Reply.ResponseTypeName}, {message.GetType().FullName} was returned instead"); + if (message.GetType().FullName != reply.ResponseTypeName) + throw new YieldPointException($"Flow must end with a response message of type {reply.ResponseTypeName}, {message.GetType().FullName} was returned instead"); - var properties = new BasicProperties - { - CorrelationId = context.Reply.CorrelationId - }; + var properties = new BasicProperties(); - await publisher.PublishDirect(message, context.Reply.ReplyTo, properties); + // Only set the property if it's not null, otherwise a string reference exception can occur: + // http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html + if (reply.CorrelationId != null) + properties.CorrelationId = reply.CorrelationId; + + // TODO disallow if replyto is not specified? + if (context.FlowState.Metadata.Reply.ReplyTo != null) + await publisher.PublishDirect(message, reply.ReplyTo, properties); + else + await publisher.Publish(message, properties); } private static Task EndFlow(FlowContext context) { - if (context.Reply != null) - throw new InvalidOperationException($"Flow must end with a response message of type {context.Reply.ResponseTypeName}"); + if (context.FlowState.Metadata.Reply != null) + throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); return Task.CompletedTask; } @@ -107,6 +115,10 @@ namespace Tapeti.Flow.Default if (requestAttribute?.Response != null && requestAttribute.Response != binding.MessageClass) throw new ArgumentException($"responseHandler must accept message of type {binding.MessageClass}", nameof(responseHandler)); + var continuationAttribute = binding.Method.GetCustomAttribute(); + if (continuationAttribute == null) + throw new ArgumentException($"responseHandler must be marked with the Continuation attribute", nameof(responseHandler)); + return new ResponseHandlerInfo { MethodName = MethodSerializer.Serialize(responseHandler.Method), @@ -115,16 +127,71 @@ namespace Tapeti.Flow.Default } + private static ReplyMetadata GetReply(IMessageContext context) + { + var requestAttribute = context.Message.GetType().GetCustomAttribute(); + if (requestAttribute?.Response == null) + return null; + + return new ReplyMetadata + { + CorrelationId = context.Properties.CorrelationId, + ReplyTo = context.Properties.ReplyTo, + ResponseTypeName = requestAttribute.Response.FullName + }; + } + + public async Task Execute(IMessageContext context, IYieldPoint yieldPoint) { - var flowContext = (FlowContext)context.Items[ContextItems.FlowContext]; - if (flowContext == null) - return; - var delegateYieldPoint = (DelegateYieldPoint)yieldPoint; - await delegateYieldPoint.Execute(flowContext); + var storeState = delegateYieldPoint.StoreState; - if (delegateYieldPoint.StoreState) + FlowContext flowContext; + object flowContextItem; + + if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextItem)) + { + flowContext = new FlowContext + { + MessageContext = context + }; + + if (storeState) + { + // Initiate the flow + var flowStore = context.DependencyResolver.Resolve(); + + var flowID = Guid.NewGuid(); + flowContext.FlowStateLock = await flowStore.LockFlowState(flowID); + + if (flowContext.FlowStateLock == null) + throw new InvalidOperationException("Unable to lock a new flow"); + + flowContext.FlowState = await flowContext.FlowStateLock.GetFlowState(); + if (flowContext.FlowState == null) + throw new InvalidOperationException("Unable to get state for new flow"); + + flowContext.FlowState.Metadata.Reply = GetReply(context); + } + } + else + flowContext = (FlowContext) flowContextItem; + + + try + { + await delegateYieldPoint.Execute(flowContext); + } + catch (YieldPointException e) + { + var controllerName = flowContext.MessageContext.Controller.GetType().FullName; + var methodName = flowContext.MessageContext.Binding.Method.Name; + + throw new YieldPointException($"{e.Message} in controller {controllerName}, method {methodName}", e); + } + + if (storeState) { flowContext.FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(context.Controller); await flowContext.FlowStateLock.StoreFlowState(flowContext.FlowState); @@ -202,61 +269,5 @@ namespace Tapeti.Flow.Default public string MethodName { get; set; } public string ReplyToQueue { get; set; } } - - - - /* - * Handle response (correlationId known) - internal async Task HandleMessage(object message, string correlationID) - { - var continuationID = Guid.Parse(correlationID); - var flowStateID = await owner.flowStore.FindFlowStateID(continuationID); - - if (!flowStateID.HasValue) - return; - - using (flowStateLock = await owner.flowStore.LockFlowState(flowStateID.Value)) - { - flowState = await flowStateLock.GetFlowState(); - - continuation = flowState.Continuations[continuationID]; - if (continuation != null) - await HandleContinuation(message); - } - } - - private async Task HandleContinuation(object message) - { - var flowMetaData = Newtonsoft.Json.JsonConvert.DeserializeObject(flowState.MetaData); - var continuationMetaData = - Newtonsoft.Json.JsonConvert.DeserializeObject(continuation.MetaData); - - reply = flowMetaData.Reply; - controllerType = owner.GetControllerType(flowMetaData.ControllerTypeName); - method = controllerType.GetMethod(continuationMetaData.MethodName); - - controller = owner.container.GetInstance(controllerType); - - Newtonsoft.Json.JsonConvert.PopulateObject(flowState.Data, controller); - - var yieldPoint = (AbstractYieldPoint) await owner.CallFlowController(controller, method, message); - - await yieldPoint.Execute(this); - - if (yieldPoint.Store) - { - flowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(controller); - flowState.Continuations.Remove(continuation); - - await flowStateLock.StoreFlowState(flowState); - } - else - { - await flowStateLock.DeleteFlowState(); - } - } - - } - */ } } diff --git a/Tapeti.Flow/Default/FlowState.cs b/Tapeti.Flow/Default/FlowState.cs new file mode 100644 index 0000000..d120370 --- /dev/null +++ b/Tapeti.Flow/Default/FlowState.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Tapeti.Flow.Default +{ + public class FlowState + { + private FlowMetadata metadata; + private Dictionary continuations; + + + public FlowMetadata Metadata + { + get { return metadata ?? (metadata = new FlowMetadata()); } + set { metadata = value; } + } + + public string Data { get; set; } + + public Dictionary Continuations + { + get { return continuations ?? (continuations = new Dictionary()); } + set { continuations = value; } + } + + + public void Assign(FlowState value) + { + Metadata = value.Metadata.Clone(); + Data = value.Data; + Continuations = value.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value.Clone()); + } + + + public FlowState Clone() + { + var result = new FlowState(); + result.Assign(this); + + return result; + } + } + + + public class FlowMetadata + { + public ReplyMetadata Reply { get; set; } + + + public FlowMetadata Clone() + { + return new FlowMetadata + { + Reply = Reply?.Clone() + }; + } + } + + + public class ReplyMetadata + { + public string ReplyTo { get; set; } + public string CorrelationId { get; set; } + public string ResponseTypeName { get; set; } + + + public ReplyMetadata Clone() + { + return new ReplyMetadata + { + ReplyTo = ReplyTo, + CorrelationId = CorrelationId, + ResponseTypeName = ResponseTypeName + }; + } + } + + + public class ContinuationMetadata + { + public string MethodName { get; set; } + public string ConvergeMethodName { get; set; } + + + public ContinuationMetadata Clone() + { + return new ContinuationMetadata + { + MethodName = MethodName, + ConvergeMethodName = ConvergeMethodName + }; + } + } +} diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index d312933..eee3a18 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -1,17 +1,19 @@ -using System; +using Newtonsoft.Json; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; -namespace Tapeti.Flow +namespace Tapeti.Flow.Default { public class FlowStore : IFlowStore { + private static readonly ConcurrentDictionary FlowStates = new ConcurrentDictionary(); + private static readonly ConcurrentDictionary ContinuationLookup = new ConcurrentDictionary(); + private readonly IFlowRepository repository; - private readonly ConcurrentDictionary flowStates = new ConcurrentDictionary(); - private readonly ConcurrentDictionary continuationLookup = new ConcurrentDictionary(); public FlowStore(IFlowRepository repository) @@ -22,41 +24,37 @@ namespace Tapeti.Flow public async Task Load() { - flowStates.Clear(); - continuationLookup.Clear(); + FlowStates.Clear(); + ContinuationLookup.Clear(); - foreach (var state in await repository.GetAllStates()) + foreach (var flowStateRecord in await repository.GetStates()) { - flowStates.GetOrAdd(state.FlowID, new FlowState - { - Metadata = state.Metadata, - Data = state.Data, - Continuations = state.Continuations - }); + var flowState = ToFlowState(flowStateRecord); + FlowStates.GetOrAdd(flowStateRecord.FlowID, flowState); - foreach (var continuation in state.Continuations) - continuationLookup.GetOrAdd(continuation.Key, state.FlowID); + foreach (var continuation in flowStateRecord.ContinuationMetadata) + ContinuationLookup.GetOrAdd(continuation.Key, flowStateRecord.FlowID); } } - public Task FindFlowStateID(Guid continuationID) + public Task FindFlowID(Guid continuationID) { Guid result; - return Task.FromResult(continuationLookup.TryGetValue(continuationID, out result) ? result : (Guid?)null); + return Task.FromResult(ContinuationLookup.TryGetValue(continuationID, out result) ? result : (Guid?)null); } - public async Task LockFlowState(Guid flowStateID) + public async Task LockFlowState(Guid flowID) { var isNew = false; - var flowState = flowStates.GetOrAdd(flowStateID, id => + var flowState = FlowStates.GetOrAdd(flowID, id => { isNew = true; return new FlowState(); }); - var result = new FlowStateLock(this, flowState, flowStateID, isNew); + var result = new FlowStateLock(this, flowState, flowID, isNew); await result.Lock(); return result; @@ -103,7 +101,7 @@ namespace Tapeti.Flow } } - public Guid FlowStateID => flowID; + public Guid FlowID => flowID; public Task GetFlowState() { @@ -112,12 +110,7 @@ namespace Tapeti.Flow if (isDisposed) throw new ObjectDisposedException("FlowStateLock"); - return Task.FromResult(new FlowState - { - Data = flowState.Data, - Metadata = flowState.Metadata, - Continuations = flowState.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value) - }); + return Task.FromResult(flowState.Clone()); } } @@ -128,37 +121,29 @@ namespace Tapeti.Flow if (isDisposed) throw new ObjectDisposedException("FlowStateLock"); - foreach ( - var removedContinuation in - flowState.Continuations.Keys.Where( - k => !newFlowState.Continuations.ContainsKey(k))) + foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k))) { Guid removedValue; - owner.continuationLookup.TryRemove(removedContinuation, out removedValue); + ContinuationLookup.TryRemove(removedContinuation, out removedValue); } - foreach ( - var addedContinuation in - newFlowState.Continuations.Where( - c => !flowState.Continuations.ContainsKey(c.Key))) + foreach (var addedContinuation in newFlowState.Continuations.Where(c => !flowState.Continuations.ContainsKey(c.Key))) { - owner.continuationLookup.TryAdd(addedContinuation.Key, flowID); + ContinuationLookup.TryAdd(addedContinuation.Key, flowID); } - flowState.Metadata = newFlowState.Metadata; - flowState.Data = newFlowState.Data; - flowState.Continuations = newFlowState.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value); + flowState.Assign(newFlowState); } + if (isNew) { isNew = false; var now = DateTime.UtcNow; - await - owner.repository.CreateState(flowID, now, flowState.Metadata, flowState.Data, flowState.Continuations); + await owner.repository.CreateState(ToFlowStateRecord(flowID, flowState), now); } else { - await owner.repository.UpdateState(flowID, flowState.Metadata, flowState.Data, flowState.Continuations); + await owner.repository.UpdateState(ToFlowStateRecord(flowID, flowState)); } } @@ -172,15 +157,42 @@ namespace Tapeti.Flow foreach (var removedContinuation in flowState.Continuations.Keys) { Guid removedValue; - owner.continuationLookup.TryRemove(removedContinuation, out removedValue); + ContinuationLookup.TryRemove(removedContinuation, out removedValue); } + FlowState removedFlow; - owner.flowStates.TryRemove(flowID, out removedFlow); + FlowStates.TryRemove(flowID, out removedFlow); } + if (!isNew) await owner.repository.DeleteState(flowID); } + } + + private static FlowStateRecord ToFlowStateRecord(Guid flowID, FlowState flowState) + { + return new FlowStateRecord + { + FlowID = flowID, + Metadata = JsonConvert.SerializeObject(flowState.Metadata), + Data = flowState.Data, + ContinuationMetadata = flowState.Continuations.ToDictionary( + kv => kv.Key, + kv => JsonConvert.SerializeObject(kv.Value)) + }; + } + + private static FlowState ToFlowState(FlowStateRecord flowStateRecord) + { + return new FlowState + { + Metadata = JsonConvert.DeserializeObject(flowStateRecord.Metadata), + Data = flowStateRecord.Data, + Continuations = flowStateRecord.ContinuationMetadata.ToDictionary( + kv => kv.Key, + kv => JsonConvert.DeserializeObject(kv.Value)) + }; } } } diff --git a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs index 9b6aacc..304899a 100644 --- a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs +++ b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs @@ -7,17 +7,17 @@ namespace Tapeti.Flow.Default { public class NonPersistentFlowRepository : IFlowRepository { - public Task> GetAllStates() + public Task> GetStates() { return Task.FromResult(new List().AsQueryable()); } - public Task CreateState(Guid flowID, DateTime timestamp, string metadata, string data, IDictionary continuations) + public Task CreateState(FlowStateRecord stateRecord, DateTime timestamp) { return Task.CompletedTask; } - public Task UpdateState(Guid flowID, string metadata, string data, IDictionary continuations) + public Task UpdateState(FlowStateRecord stateRecord) { return Task.CompletedTask; } diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowMiddleware.cs index b78574e..ecac5a8 100644 --- a/Tapeti.Flow/FlowMiddleware.cs +++ b/Tapeti.Flow/FlowMiddleware.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Tapeti.Config; using Tapeti.Flow.Annotations; using Tapeti.Flow.Default; +using Tapeti.Helpers; namespace Tapeti.Flow { @@ -13,13 +14,14 @@ namespace Tapeti.Flow public IEnumerable GetContents(IDependencyResolver dependencyResolver) { var container = dependencyResolver as IDependencyContainer; + + // ReSharper disable once InvertIf if (container != null) { container.RegisterDefault(); container.RegisterDefault(); - // TODO singleton - container.RegisterDefault(); container.RegisterDefault(); + container.RegisterDefault(); } return new[] { new FlowBindingMiddleware() }; @@ -30,40 +32,41 @@ namespace Tapeti.Flow { public void Handle(IBindingContext context, Action next) { - HandleContinuationFilter(context); - HandleYieldPointResult(context); + RegisterContinuationFilter(context); + RegisterYieldPointResult(context); next(); } - private static void HandleContinuationFilter(IBindingContext context) + private static void RegisterContinuationFilter(IBindingContext context) { var continuationAttribute = context.Method.GetCustomAttribute(); - if (continuationAttribute != null) - { - context.Use(new FlowBindingFilter()); - context.Use(new FlowMessageMiddleware()); - } + if (continuationAttribute == null) + return; + + context.Use(new FlowBindingFilter()); + context.Use(new FlowMessageMiddleware()); } - private static void HandleYieldPointResult(IBindingContext context) + private static void RegisterYieldPointResult(IBindingContext context) { - if (context.Result.Info.ParameterType == typeof(IYieldPoint)) - context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value)); + bool isTask; + if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out isTask)) + return; - else if (context.Result.Info.ParameterType == typeof(Task<>)) + if (isTask) { - var genericArguments = context.Result.Info.ParameterType.GetGenericArguments(); - if (genericArguments.Length == 1 && genericArguments[0] == typeof(IYieldPoint)) - context.Result.SetHandler(async (messageContext, value) => - { - var yieldPoint = await (Task)value; - if (yieldPoint != null) - await HandleYieldPoint(messageContext, yieldPoint); - }); + context.Result.SetHandler(async (messageContext, value) => + { + var yieldPoint = await (Task)value; + if (yieldPoint != null) + await HandleYieldPoint(messageContext, yieldPoint); + }); } + else + context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value)); } diff --git a/Tapeti.Flow/IFlowRepository.cs b/Tapeti.Flow/IFlowRepository.cs index 3408a22..476623a 100644 --- a/Tapeti.Flow/IFlowRepository.cs +++ b/Tapeti.Flow/IFlowRepository.cs @@ -7,9 +7,9 @@ namespace Tapeti.Flow { public interface IFlowRepository { - Task> GetAllStates(); - Task CreateState(Guid flowID, DateTime timestamp, string metadata, string data, IDictionary continuations); - Task UpdateState(Guid flowID, string metadata, string data, IDictionary continuations); + Task> GetStates(); + Task CreateState(FlowStateRecord stateRecord, DateTime timestamp); + Task UpdateState(FlowStateRecord stateRecord); Task DeleteState(Guid flowID); } @@ -19,6 +19,6 @@ namespace Tapeti.Flow public Guid FlowID; public string Metadata; public string Data; - public Dictionary Continuations; + public Dictionary ContinuationMetadata; } } diff --git a/Tapeti.Flow/IFlowStore.cs b/Tapeti.Flow/IFlowStore.cs index f1f9530..7af9513 100644 --- a/Tapeti.Flow/IFlowStore.cs +++ b/Tapeti.Flow/IFlowStore.cs @@ -1,28 +1,23 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using Tapeti.Flow.Default; namespace Tapeti.Flow { public interface IFlowStore { Task Load(); - Task FindFlowStateID(Guid continuationID); - Task LockFlowState(Guid flowStateID); + Task FindFlowID(Guid continuationID); + Task LockFlowState(Guid flowID); } public interface IFlowStateLock : IDisposable { - Guid FlowStateID { get; } + Guid FlowID { get; } + Task GetFlowState(); Task StoreFlowState(FlowState flowState); Task DeleteFlowState(); } - - public class FlowState - { - public string Metadata { get; set; } - public string Data { get; set; } - public Dictionary Continuations { get; set; } - } } diff --git a/Tapeti.Flow/Properties/AssemblyInfo.cs b/Tapeti.Flow/Properties/AssemblyInfo.cs index 7158452..91c0d83 100644 --- a/Tapeti.Flow/Properties/AssemblyInfo.cs +++ b/Tapeti.Flow/Properties/AssemblyInfo.cs @@ -1,5 +1,4 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index 5ee20bc..5e4aa8c 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -55,8 +55,10 @@ + + @@ -65,6 +67,7 @@ + diff --git a/Tapeti.Flow/YieldPointException.cs b/Tapeti.Flow/YieldPointException.cs new file mode 100644 index 0000000..5b789e2 --- /dev/null +++ b/Tapeti.Flow/YieldPointException.cs @@ -0,0 +1,10 @@ +using System; + +namespace Tapeti.Flow +{ + public class YieldPointException : Exception + { + public YieldPointException(string message) : base(message) { } + public YieldPointException(string message, Exception innerException) : base(message, innerException) { } + } +} diff --git a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs index ef01bbd..0870d34 100644 --- a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs +++ b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs @@ -1,7 +1,6 @@ using System; using System.Linq; using SimpleInjector; -using Tapeti.Config; namespace Tapeti.SimpleInjector { @@ -27,29 +26,44 @@ namespace Tapeti.SimpleInjector public void RegisterDefault() where TService : class where TImplementation : class, TService { - // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates - if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(TService))) + if (CanRegisterDefault()) container.Register(); } - - public void RegisterConfig(IConfig config) + public void RegisterDefault(Func factory) where TService : class { - container.RegisterSingleton(config); + if (CanRegisterDefault()) + container.Register(factory); } - - public void RegisterPublisher(Func publisher) + public void RegisterDefaultSingleton() where TService : class where TImplementation : class, TService { - // ReSharper disable once SimplifyLinqExpression - still not a fan of negative predicates - if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(IPublisher))) - container.Register(publisher); + if (CanRegisterDefault()) + container.RegisterSingleton(); } + public void RegisterDefaultSingleton(TService instance) where TService : class + { + if (CanRegisterDefault()) + container.RegisterSingleton(instance); + } + + public void RegisterDefaultSingleton(Func factory) where TService : class + { + if (CanRegisterDefault()) + container.RegisterSingleton(factory); + } public void RegisterController(Type type) { container.Register(type); } + + + private bool CanRegisterDefault() where TService : class + { + // ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates + return !container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(TService)); + } } } diff --git a/Tapeti.csproj b/Tapeti.csproj index c522aa5..9b8cf0f 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -61,10 +61,12 @@ + + diff --git a/Tapeti.sln.DotSettings.user b/Tapeti.sln.DotSettings.user deleted file mode 100644 index 25277f4..0000000 --- a/Tapeti.sln.DotSettings.user +++ /dev/null @@ -1,5 +0,0 @@ - - True - (Doc Ln 29 Col 51) - F84AD920-D5A1-455D-AED5-2542B3A47B85/d:Default/f:FlowProvider.cs - NumberedBookmarkManager \ No newline at end of file diff --git a/TapetiConfig.cs b/TapetiConfig.cs index 9182ec1..23d51b2 100644 --- a/TapetiConfig.cs +++ b/TapetiConfig.cs @@ -16,7 +16,7 @@ namespace Tapeti public TopologyConfigurationException(string message) : base(message) { } } - public delegate Task MessageHandlerFunc(IMessageContext context, object message); + public delegate Task MessageHandlerFunc(IMessageContext context, object message); public class TapetiConfig @@ -27,17 +27,18 @@ namespace Tapeti private readonly List bindingMiddleware = new List(); private readonly List messageMiddleware = new List(); - private readonly string exchange; + private readonly string subscribeExchange; private readonly IDependencyResolver dependencyResolver; - public TapetiConfig(string exchange, IDependencyResolver dependencyResolver) + public TapetiConfig(string subscribeExchange, IDependencyResolver dependencyResolver) { - this.exchange = exchange; + this.subscribeExchange = subscribeExchange; this.dependencyResolver = dependencyResolver; Use(new DependencyResolverBinding()); Use(new MessageBinding()); + Use(new PublishResultBinding()); } @@ -62,8 +63,8 @@ namespace Tapeti queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl))); - var config = new Config(exchange, dependencyResolver, messageMiddleware, queues); - (dependencyResolver as IDependencyContainer)?.RegisterConfig(config); + var config = new Config(subscribeExchange, dependencyResolver, messageMiddleware, queues); + (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); return config; } @@ -139,7 +140,8 @@ namespace Tapeti QueueInfo = methodQueueInfo, MessageClass = context.MessageClass, MessageHandler = messageHandler, - MessageMiddleware = context.MessageMiddleware + MessageMiddleware = context.MessageMiddleware, + BindingFilters = context.BindingFilters }; if (methodQueueInfo.Dynamic.GetValueOrDefault()) @@ -177,61 +179,59 @@ namespace Tapeti var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList(); - // ReSharper disable once InvertIf - doesn't make the flow clearer imo + // ReSharper disable once InvertIf if (invalidBindings.Count > 0) { var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name)); throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}"); } - return WrapMethod(method, context.Parameters.Select(p => ((IBindingParameterAccess)p).GetBinding())); + var resultHandler = ((IBindingResultAccess) context.Result).GetHandler(); + + return WrapMethod(method, context.Parameters.Select(p => ((IBindingParameterAccess)p).GetBinding()), resultHandler); } - protected MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable parameters) + protected MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable parameters, ResultHandler resultHandler) { + if (resultHandler != null) + return WrapResultHandlerMethod(method, parameters, resultHandler); + if (method.ReturnType == typeof(void)) return WrapNullMethod(method, parameters); if (method.ReturnType == typeof(Task)) return WrapTaskMethod(method, parameters); - if (method.ReturnType == typeof(Task<>)) - { - var genericArguments = method.GetGenericArguments(); - if (genericArguments.Length != 1) - throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} must have exactly one generic argument to Task<>"); - - if (!genericArguments[0].IsClass) - throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} must have an object generic argument to Task<>"); - + if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>)) return WrapGenericTaskMethod(method, parameters); - } - if (method.ReturnType.IsClass) - return WrapObjectMethod(method, parameters); - - throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} has an invalid return type"); + return WrapObjectMethod(method, parameters); } + protected MessageHandlerFunc WrapResultHandlerMethod(MethodInfo method, IEnumerable parameters, ResultHandler resultHandler) + { + return (context, message) => + { + var result = method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()); + return resultHandler(context, result); + }; + } + protected MessageHandlerFunc WrapNullMethod(MethodInfo method, IEnumerable parameters) { return (context, message) => { method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()); - return Task.FromResult(null); + return Task.CompletedTask; }; } protected MessageHandlerFunc WrapTaskMethod(MethodInfo method, IEnumerable parameters) { - return async (context, message) => - { - await (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()); - return Task.FromResult(null); - }; + return (context, message) => (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()); } @@ -310,7 +310,7 @@ namespace Tapeti protected class Config : IConfig { - public string Exchange { get; } + public string SubscribeExchange { get; } public IDependencyResolver DependencyResolver { get; } public IReadOnlyList MessageMiddleware { get; } public IEnumerable Queues { get; } @@ -318,9 +318,9 @@ namespace Tapeti private readonly Dictionary bindingMethodLookup; - public Config(string exchange, IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IEnumerable queues) + public Config(string subscribeExchange, IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IEnumerable queues) { - Exchange = exchange; + SubscribeExchange = subscribeExchange; DependencyResolver = dependencyResolver; MessageMiddleware = messageMiddleware; Queues = queues.ToList(); @@ -361,6 +361,7 @@ namespace Tapeti public string QueueName { get; set; } public IReadOnlyList MessageMiddleware { get; set; } + public IReadOnlyList BindingFilters { get; set; } private QueueInfo queueInfo; public QueueInfo QueueInfo @@ -382,13 +383,25 @@ namespace Tapeti } - public bool Accept(object message) + public async Task Accept(IMessageContext context, object message) { - return message.GetType() == MessageClass; + if (message.GetType() != MessageClass) + return false; + + if (BindingFilters == null) + return true; + + foreach (var filter in BindingFilters) + { + if (!await filter.Accept(context, this)) + return false; + } + + return true; } - public Task Invoke(IMessageContext context, object message) + public Task Invoke(IMessageContext context, object message) { return MessageHandler(context, message); } diff --git a/TapetiConnection.cs b/TapetiConnection.cs index 27f6de7..63d346a 100644 --- a/TapetiConnection.cs +++ b/TapetiConnection.cs @@ -16,12 +16,12 @@ namespace Tapeti public TapetiConnection(IConfig config) { this.config = config; - (config.DependencyResolver as IDependencyContainer)?.RegisterPublisher(GetPublisher); + (config.DependencyResolver as IDependencyContainer)?.RegisterDefault(GetPublisher); worker = new Lazy(() => new TapetiWorker(config.DependencyResolver, config.MessageMiddleware) { ConnectionParams = Params ?? new TapetiConnectionParams(), - Exchange = config.Exchange + SubscribeExchange = config.SubscribeExchange }); } diff --git a/TapetiConnectionParams.cs b/TapetiConnectionParams.cs index 2f7162c..31089b6 100644 --- a/TapetiConnectionParams.cs +++ b/TapetiConnectionParams.cs @@ -10,6 +10,13 @@ namespace Tapeti public string Username { get; set; } = "guest"; public string Password { get; set; } = "guest"; + /// + /// The amount of message to prefetch. See http://www.rabbitmq.com/consumer-prefetch.html for more information. + /// + /// If set to 0, no limit will be applied. + /// + public ushort PrefetchCount { get; set; } = 50; + public TapetiConnectionParams() { diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index c1f2135..d1bf818 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -30,30 +30,30 @@ namespace Test * The Visualizer could've been injected through the constructor, which is * the recommended way. Just testing the injection middleware here. */ - public async Task Marco(MarcoMessage message, Visualizer myVisualizer) + public async Task Marco(MarcoMessage message, Visualizer myVisualizer) { + Console.WriteLine(">> Marco (yielding with request)"); + await myVisualizer.VisualizeMarco(); - await publisher.Publish(new PoloMessage()); - } - - public IYieldPoint Polo(PoloMessage message) - { - StateTestGuid = Guid.NewGuid(); - - return flowProvider.YieldWithRequest( + return flowProvider.YieldWithRequestSync( new PoloConfirmationRequestMessage() { StoredInState = StateTestGuid - }, + }, HandlePoloConfirmationResponse); } - public async Task HandlePoloConfirmationResponse(PoloConfirmationResponseMessage message) + [Continuation] + public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage message) { - await visualizer.VisualizePolo(message.ShouldMatchState.Equals(StateTestGuid)); - return flowProvider.End(); + Console.WriteLine(">> HandlePoloConfirmationResponse (ending flow)"); + + Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!"); + + // This should error, as MarcoMessage expects a PoloMessage as a response + return flowProvider.EndWithResponse(new PoloMessage()); } @@ -63,16 +63,28 @@ namespace Test * use the replyTo header of the request if provided. */ + // TODO validation middleware to ensure a request message returns the specified response (already done for IYieldPoint methods) public PoloConfirmationResponseMessage PoloConfirmation(PoloConfirmationRequestMessage message) { + Console.WriteLine(">> PoloConfirmation (returning confirmation)"); + return new PoloConfirmationResponseMessage { ShouldMatchState = message.StoredInState }; } + + + + public void Polo(PoloMessage message) + { + Console.WriteLine(">> Polo"); + StateTestGuid = Guid.NewGuid(); + } } + [Request(Response = typeof(PoloMessage))] public class MarcoMessage { } diff --git a/Test/MarcoEmitter.cs b/Test/MarcoEmitter.cs index 4fb8561..593b15f 100644 --- a/Test/MarcoEmitter.cs +++ b/Test/MarcoEmitter.cs @@ -17,20 +17,33 @@ namespace Test public async Task Run() { + await publisher.Publish(new MarcoMessage()); + + /* var concurrent = new SemaphoreSlim(20); - //for (var x = 0; x < 5000; x++) while (true) { - await concurrent.WaitAsync(); - try + for (var x = 0; x < 200; x++) { - await publisher.Publish(new MarcoMessage()); - } - finally - { - concurrent.Release(); + await concurrent.WaitAsync(); + try + { + await publisher.Publish(new MarcoMessage()); + } + finally + { + concurrent.Release(); + } } + + await Task.Delay(1000); + } + */ + + while (true) + { + await Task.Delay(1000); } } } diff --git a/Test/Program.cs b/Test/Program.cs index 1c0b11e..55f872e 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -13,15 +13,21 @@ namespace Test var container = new Container(); container.Register(); container.Register(); - //container.RegisterSingleton(); var config = new TapetiConfig("test", new SimpleInjectorDependencyResolver(container)) - .Use(new FlowMiddleware()) + .WithFlow() .RegisterAllControllers() .Build(); - using (var connection = new TapetiConnection(config)) - { + using (var connection = new TapetiConnection(config) + { + Params = new TapetiConnectionParams + { + HostName = "localhost", + PrefetchCount = 200 + } + }) + { Console.WriteLine("Subscribing..."); connection.Subscribe().Wait(); Console.WriteLine("Done!"); diff --git a/Test/Properties/AssemblyInfo.cs b/Test/Properties/AssemblyInfo.cs index 35e057a..f56a1a2 100644 --- a/Test/Properties/AssemblyInfo.cs +++ b/Test/Properties/AssemblyInfo.cs @@ -7,9 +7,9 @@ using System.Runtime.InteropServices; [assembly: AssemblyTitle("Test")] [assembly: AssemblyDescription("")] [assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("Hewlett-Packard Company")] +[assembly: AssemblyCompany("")] [assembly: AssemblyProduct("Test")] -[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] +[assembly: AssemblyCopyright("")] [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] diff --git a/Test/Visualizer.cs b/Test/Visualizer.cs index f53f239..cd9eb99 100644 --- a/Test/Visualizer.cs +++ b/Test/Visualizer.cs @@ -11,9 +11,9 @@ namespace Test return Task.CompletedTask; } - public Task VisualizePolo(bool matches) + public Task VisualizePolo() { - Console.WriteLine(matches ? "Polo!" : "Oops! Mismatch!"); + Console.WriteLine("Polo!"); return Task.CompletedTask; } }