From 6bc6cfe2163efb5affeefe88769bab985e2a7684 Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Wed, 24 Apr 2019 18:04:30 +0200 Subject: [PATCH] MAX-911 RDB Relaties samenvoegen vanuit LEF en update ontvangen in LEF MAX-1081 POC Dictionary tasks in Web voor request Setup voor Transient request response met TODO's --- Tapeti.Flow/Default/FlowProvider.cs | 11 ++- Tapeti.Flow/Default/FlowState.cs | 5 +- Tapeti.Tests/TransientFilterMiddleware.cs | 14 +++ Tapeti.Transient/ConfigExtentions.cs | 13 +++ Tapeti.Transient/ITransientPublisher.cs | 9 ++ Tapeti.Transient/Tapeti.Transient.csproj | 11 +++ Tapeti.Transient/TransientGenericBinding.cs | 52 ++++++++++ Tapeti.Transient/TransientMiddleware.cs | 34 +++++++ Tapeti.Transient/TransientPublisher.cs | 22 +++++ Tapeti.Transient/TransientRouter.cs | 74 ++++++++++++++ Tapeti.sln | 8 +- Tapeti/Config/IConfig.cs | 1 + Tapeti/Config/ICustomBinding.cs | 31 ++++++ Tapeti/Config/ITapetiExtentionBinding.cs | 10 ++ Tapeti/Connection/TapetiWorker.cs | 3 + Tapeti/TapetiConfig.cs | 101 +++++++++++++++++--- Test/Program.cs | 9 ++ 17 files changed, 388 insertions(+), 20 deletions(-) create mode 100644 Tapeti.Tests/TransientFilterMiddleware.cs create mode 100644 Tapeti.Transient/ConfigExtentions.cs create mode 100644 Tapeti.Transient/ITransientPublisher.cs create mode 100644 Tapeti.Transient/Tapeti.Transient.csproj create mode 100644 Tapeti.Transient/TransientGenericBinding.cs create mode 100644 Tapeti.Transient/TransientMiddleware.cs create mode 100644 Tapeti.Transient/TransientPublisher.cs create mode 100644 Tapeti.Transient/TransientRouter.cs create mode 100644 Tapeti/Config/ICustomBinding.cs create mode 100644 Tapeti/Config/ITapetiExtentionBinding.cs diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 289b246..5c6d9d9 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -105,9 +105,9 @@ namespace Tapeti.Flow.Default // TODO disallow if replyto is not specified? if (reply.ReplyTo != null) - await publisher.PublishDirect(message, reply.ReplyTo, properties, true); + await publisher.PublishDirect(message, reply.ReplyTo, properties, reply.Mandatory); else - await publisher.Publish(message, properties, true); + await publisher.Publish(message, properties, reply.Mandatory); await context.Delete(); } @@ -129,8 +129,8 @@ namespace Tapeti.Flow.Default throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler)); var requestAttribute = request.GetType().GetCustomAttribute(); - if (requestAttribute?.Response != null && requestAttribute.Response != binding.MessageClass) - throw new ArgumentException($"responseHandler must accept message of type {binding.MessageClass}", nameof(responseHandler)); + if (requestAttribute?.Response != null && !binding.Accept(requestAttribute.Response)) + throw new ArgumentException($"responseHandler must accept message of type {requestAttribute.Response}", nameof(responseHandler)); var continuationAttribute = binding.Method.GetCustomAttribute(); if (continuationAttribute == null) @@ -157,7 +157,8 @@ namespace Tapeti.Flow.Default { CorrelationId = context.Properties.CorrelationId, ReplyTo = context.Properties.ReplyTo, - ResponseTypeName = requestAttribute.Response.FullName + ResponseTypeName = requestAttribute.Response.FullName, + Mandatory = context.Properties.Persistent }; } diff --git a/Tapeti.Flow/Default/FlowState.cs b/Tapeti.Flow/Default/FlowState.cs index d600a8f..e32430c 100644 --- a/Tapeti.Flow/Default/FlowState.cs +++ b/Tapeti.Flow/Default/FlowState.cs @@ -57,6 +57,8 @@ namespace Tapeti.Flow.Default public string CorrelationId { get; set; } public string ResponseTypeName { get; set; } + public bool Mandatory { get; set; } + public ReplyMetadata Clone() { @@ -64,7 +66,8 @@ namespace Tapeti.Flow.Default { ReplyTo = ReplyTo, CorrelationId = CorrelationId, - ResponseTypeName = ResponseTypeName + ResponseTypeName = ResponseTypeName, + Mandatory = Mandatory }; } } diff --git a/Tapeti.Tests/TransientFilterMiddleware.cs b/Tapeti.Tests/TransientFilterMiddleware.cs new file mode 100644 index 0000000..d311f03 --- /dev/null +++ b/Tapeti.Tests/TransientFilterMiddleware.cs @@ -0,0 +1,14 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Tests +{ + public class TransientFilterMiddleware : IMessageFilterMiddleware + { + public Task Handle(IMessageContext context, Func next) + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/Tapeti.Transient/ConfigExtentions.cs b/Tapeti.Transient/ConfigExtentions.cs new file mode 100644 index 0000000..7401578 --- /dev/null +++ b/Tapeti.Transient/ConfigExtentions.cs @@ -0,0 +1,13 @@ +using System; + +namespace Tapeti.Transient +{ + public static class ConfigExtensions + { + public static TapetiConfig WithTransient(this TapetiConfig config, TimeSpan defaultTimeout, string dynamicQueuePrefix = "transient") + { + config.Use(new TransientMiddleware(defaultTimeout, dynamicQueuePrefix)); + return config; + } + } +} \ No newline at end of file diff --git a/Tapeti.Transient/ITransientPublisher.cs b/Tapeti.Transient/ITransientPublisher.cs new file mode 100644 index 0000000..2765259 --- /dev/null +++ b/Tapeti.Transient/ITransientPublisher.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace Tapeti.Transient +{ + public interface ITransientPublisher + { + Task RequestResponse(TRequest request); + } +} \ No newline at end of file diff --git a/Tapeti.Transient/Tapeti.Transient.csproj b/Tapeti.Transient/Tapeti.Transient.csproj new file mode 100644 index 0000000..8d989f6 --- /dev/null +++ b/Tapeti.Transient/Tapeti.Transient.csproj @@ -0,0 +1,11 @@ + + + + netstandard2.0 + + + + + + + diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs new file mode 100644 index 0000000..f28643d --- /dev/null +++ b/Tapeti.Transient/TransientGenericBinding.cs @@ -0,0 +1,52 @@ +using System; +using System.Reflection; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Transient +{ + public class TransientGenericBinding : ICustomBinding + { + private readonly TransientRouter router; + + public TransientGenericBinding(TransientRouter router, string dynamicQueuePrefix) + { + this.router = router; + DynamicQueuePrefix = dynamicQueuePrefix; + Method = typeof(TransientRouter).GetMethod("GenericHandleResponse"); + } + + public Type Controller => typeof(TransientRouter); + + public MethodInfo Method { get; } + + public QueueBindingMode QueueBindingMode => QueueBindingMode.DirectToQueue; + + public string StaticQueueName => null; + + public string DynamicQueuePrefix { get; } + + public Type MessageClass => null; + + public bool Accept(Type messageClass) + { + return true; + } + + public bool Accept(IMessageContext context, object message) + { + return true; + } + + public Task Invoke(IMessageContext context, object message) + { + router.GenericHandleResponse(message, context); + return Task.CompletedTask; + } + + public void SetQueueName(string queueName) + { + router.TransientResponseQueueName = queueName; + } + } +} \ No newline at end of file diff --git a/Tapeti.Transient/TransientMiddleware.cs b/Tapeti.Transient/TransientMiddleware.cs new file mode 100644 index 0000000..24ef7f3 --- /dev/null +++ b/Tapeti.Transient/TransientMiddleware.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using Tapeti.Config; + +namespace Tapeti.Transient +{ + public class TransientMiddleware : ITapetiExtension, ITapetiExtentionBinding + { + private string dynamicQueuePrefix; + private TimeSpan defaultTimeout; + + public TransientMiddleware(TimeSpan defaultTimeout, string dynamicQueuePrefix) + { + this.dynamicQueuePrefix = dynamicQueuePrefix; + this.defaultTimeout = defaultTimeout; + } + + public void RegisterDefaults(IDependencyContainer container) + { + container.RegisterDefaultSingleton(() => new TransientRouter(container.Resolve(), defaultTimeout)); + container.RegisterDefault(); + } + + public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) + { + return new object[0]; + } + + public IEnumerable GetBindings(IDependencyResolver dependencyResolver) + { + yield return new TransientGenericBinding(dependencyResolver.Resolve(), dynamicQueuePrefix); + } + } +} \ No newline at end of file diff --git a/Tapeti.Transient/TransientPublisher.cs b/Tapeti.Transient/TransientPublisher.cs new file mode 100644 index 0000000..6bbae21 --- /dev/null +++ b/Tapeti.Transient/TransientPublisher.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti.Transient +{ + public class TransientPublisher : ITransientPublisher + { + private readonly TransientRouter router; + + public TransientPublisher(TransientRouter router) + { + this.router = router; + } + + public async Task RequestResponse(TRequest request) + { + return (TResponse)(await router.RequestResponse(request)); + } + } +} diff --git a/Tapeti.Transient/TransientRouter.cs b/Tapeti.Transient/TransientRouter.cs new file mode 100644 index 0000000..f91be75 --- /dev/null +++ b/Tapeti.Transient/TransientRouter.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using System.Threading; +using RabbitMQ.Client.Framing; +using Tapeti.Config; + +namespace Tapeti.Transient +{ + public class TransientRouter + { + private readonly TimeSpan defaultTimeout; + + private readonly ConcurrentDictionary> map = new ConcurrentDictionary>(); + + private readonly IInternalPublisher internalPublisher; + + public string TransientResponseQueueName { get; set; } + + public TransientRouter(IInternalPublisher internalPublisher, TimeSpan defaultTimeout) + { + this.internalPublisher = internalPublisher; + this.defaultTimeout = defaultTimeout; + } + + public void GenericHandleResponse(object response, IMessageContext context) + { + if (context.Properties.CorrelationId == null) + return; + + if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID)) + return; + + if (map.TryRemove(continuationID, out var tcs)) + tcs.SetResult(response); + } + + public async Task RequestResponse(object request) + { + var correlation = Guid.NewGuid(); + var tcs = map.GetOrAdd(correlation, c => new TaskCompletionSource()); + + try + { + var properties = new BasicProperties + { + CorrelationId = correlation.ToString(), + ReplyTo = TransientResponseQueueName, + Persistent = false + }; + + await internalPublisher.Publish(request, properties, false); + } + catch (Exception) + { + // Simple cleanup of the task and map dictionary. + if (map.TryRemove(correlation, out tcs)) + tcs.TrySetResult(null); + + throw; + } + + using (new Timer(TimeoutResponse, tcs, defaultTimeout, TimeSpan.MaxValue)) + { + return await tcs.Task; + } + } + + private void TimeoutResponse(object tcs) + { + ((TaskCompletionSource)tcs).SetException(new TimeoutException("Transient RequestResponse timed out at " + defaultTimeout)); + } + } +} \ No newline at end of file diff --git a/Tapeti.sln b/Tapeti.sln index 73f1486..114f0a9 100644 --- a/Tapeti.sln +++ b/Tapeti.sln @@ -19,7 +19,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Test", "Test\Test.csproj", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Tests", "Tapeti.Tests\Tapeti.Tests.csproj", "{334F3715-63CF-4D13-B09A-38E2A616D4F5}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Serilog", "Tapeti.Serilog\Tapeti.Serilog.csproj", "{43AA5DF3-49D5-4795-A290-D6511502B564}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Serilog", "Tapeti.Serilog\Tapeti.Serilog.csproj", "{43AA5DF3-49D5-4795-A290-D6511502B564}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Transient", "Tapeti.Transient\Tapeti.Transient.csproj", "{A6355E63-19AB-47EA-91FA-49B5E9B41F88}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -63,6 +65,10 @@ Global {43AA5DF3-49D5-4795-A290-D6511502B564}.Debug|Any CPU.Build.0 = Debug|Any CPU {43AA5DF3-49D5-4795-A290-D6511502B564}.Release|Any CPU.ActiveCfg = Release|Any CPU {43AA5DF3-49D5-4795-A290-D6511502B564}.Release|Any CPU.Build.0 = Release|Any CPU + {A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs index 6f7449d..1cdaad7 100644 --- a/Tapeti/Config/IConfig.cs +++ b/Tapeti/Config/IConfig.cs @@ -46,6 +46,7 @@ namespace Tapeti.Config IReadOnlyList MessageFilterMiddleware { get; } IReadOnlyList MessageMiddleware { get; } + bool Accept(Type messageClass); bool Accept(IMessageContext context, object message); Task Invoke(IMessageContext context, object message); } diff --git a/Tapeti/Config/ICustomBinding.cs b/Tapeti/Config/ICustomBinding.cs new file mode 100644 index 0000000..8b39247 --- /dev/null +++ b/Tapeti/Config/ICustomBinding.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface ICustomBinding + { + Type Controller { get; } + + MethodInfo Method { get; } + + QueueBindingMode QueueBindingMode { get; } + + string StaticQueueName { get; } + + string DynamicQueuePrefix { get; } + + Type MessageClass { get; } // Needed to get routing key information when QueueBindingMode = RoutingKey + + bool Accept(Type messageClass); + + bool Accept(IMessageContext context, object message); + + Task Invoke(IMessageContext context, object message); + + void SetQueueName(string queueName); + } +} diff --git a/Tapeti/Config/ITapetiExtentionBinding.cs b/Tapeti/Config/ITapetiExtentionBinding.cs new file mode 100644 index 0000000..5eee3a4 --- /dev/null +++ b/Tapeti/Config/ITapetiExtentionBinding.cs @@ -0,0 +1,10 @@ +using System.Collections.Generic; + +namespace Tapeti.Config +{ + public interface ITapetiExtentionBinding + { + IEnumerable GetBindings(IDependencyResolver dependencyResolver); + + } +} \ No newline at end of file diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index f9d577a..796bea1 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -109,6 +109,9 @@ namespace Tapeti.Connection { if (binding.QueueBindingMode == QueueBindingMode.RoutingKey) { + if (binding.MessageClass == null) + throw new NullReferenceException("Binding with QueueBindingMode = RoutingKey must have a MessageClass"); + var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); var exchange = exchangeStrategy.GetExchange(binding.MessageClass); diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index c785408..db83fd7 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -22,8 +22,8 @@ namespace Tapeti public class TapetiConfig { - private readonly Dictionary> staticRegistrations = new Dictionary>(); - private readonly Dictionary>> dynamicRegistrations = new Dictionary>>(); + private readonly Dictionary> staticRegistrations = new Dictionary>(); + private readonly Dictionary>> dynamicRegistrations = new Dictionary>>(); private readonly List bindingMiddleware = new List(); private readonly List messageMiddleware = new List(); @@ -79,12 +79,12 @@ namespace Tapeti // foreach (var prefixGroup in dynamicRegistrations) { - var dynamicBindings = new List>(); + var dynamicBindings = new List>(); foreach (var bindings in prefixGroup.Value.Values) { while (dynamicBindings.Count < bindings.Count) - dynamicBindings.Add(new List()); + dynamicBindings.Add(new List()); for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++) dynamicBindings[bindingIndex].Add(bindings[bindingIndex]); @@ -144,6 +144,8 @@ namespace Tapeti var middlewareBundle = extension.GetMiddleware(dependencyResolver); + (extension as ITapetiExtentionBinding)?.GetBindings(dependencyResolver); + // ReSharper disable once InvertIf if (middlewareBundle != null) { @@ -212,7 +214,8 @@ namespace Tapeti { var controllerQueueInfo = GetQueueInfo(controller); - (dependencyResolver as IDependencyContainer)?.RegisterController(controller); + if (!controller.IsInterface) + (dependencyResolver as IDependencyContainer)?.RegisterController(controller); foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance) .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) @@ -359,7 +362,7 @@ namespace Tapeti } - protected void AddStaticRegistration(IBindingContext context, Binding binding) + protected void AddStaticRegistration(IBindingContext context, IBindingQueueInfo binding) { if (staticRegistrations.ContainsKey(binding.QueueInfo.Name)) { @@ -372,23 +375,23 @@ namespace Tapeti existing.Add(binding); } else - staticRegistrations.Add(binding.QueueInfo.Name, new List { binding }); + staticRegistrations.Add(binding.QueueInfo.Name, new List { binding }); } - protected void AddDynamicRegistration(IBindingContext context, Binding binding) + protected void AddDynamicRegistration(IBindingContext context, IBindingQueueInfo binding) { var prefix = binding.QueueInfo.Name ?? ""; - if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary> prefixRegistrations)) + if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary> prefixRegistrations)) { - prefixRegistrations = new Dictionary>(); + prefixRegistrations = new Dictionary>(); dynamicRegistrations.Add(prefix, prefixRegistrations); } - if (!prefixRegistrations.TryGetValue(context.MessageClass, out List bindings)) + if (!prefixRegistrations.TryGetValue(context.MessageClass, out List bindings)) { - bindings = new List(); + bindings = new List(); prefixRegistrations.Add(context.MessageClass, bindings); } @@ -491,8 +494,12 @@ namespace Tapeti } } + protected interface IBindingQueueInfo : IBuildBinding + { + QueueInfo QueueInfo { get; } + } - protected class Binding : IBuildBinding + protected class Binding : IBindingQueueInfo { public Type Controller { get; set; } public MethodInfo Method { get; set; } @@ -523,6 +530,11 @@ namespace Tapeti } + public bool Accept(Type messageClass) + { + return MessageClass.IsAssignableFrom(messageClass); + } + public bool Accept(IMessageContext context, object message) { return message.GetType() == MessageClass; @@ -536,6 +548,69 @@ namespace Tapeti } + protected class CustomBinding : IBindingQueueInfo + { + private readonly ICustomBinding inner; + + public CustomBinding(ICustomBinding inner) + { + this.inner = inner; + + // Copy all variables to make them guaranteed readonly. + Controller = inner.Controller; + Method = inner.Method; + QueueBindingMode = inner.QueueBindingMode; + MessageClass = inner.MessageClass; + + QueueInfo = inner.StaticQueueName != null + ? new QueueInfo() + { + Dynamic = false, + Name = inner.StaticQueueName + } + : new QueueInfo() + { + Dynamic = true, + Name = inner.DynamicQueuePrefix + }; + + // Custom bindings cannot have other middleware messing with the binding. + MessageFilterMiddleware = new IMessageFilterMiddleware[0]; + MessageMiddleware = new IMessageMiddleware[0]; + } + + public Type Controller { get; } + public MethodInfo Method { get; } + public string QueueName { get; private set; } + public QueueBindingMode QueueBindingMode { get; set; } + public IReadOnlyList MessageFilterMiddleware { get; } + public IReadOnlyList MessageMiddleware { get; } + + public bool Accept(Type messageClass) + { + return inner.Accept(messageClass); + } + + public bool Accept(IMessageContext context, object message) + { + return inner.Accept(context, message); + } + + public Task Invoke(IMessageContext context, object message) + { + return inner.Invoke(context, message); + } + + public void SetQueueName(string queueName) + { + QueueName = queueName; + inner.SetQueueName(queueName); + } + + public Type MessageClass { get; } + public QueueInfo QueueInfo { get; } + } + internal interface IBindingParameterAccess { ValueFactory GetBinding(); diff --git a/Test/Program.cs b/Test/Program.cs index 62377aa..309361b 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -1,13 +1,21 @@ using System; +using System.Runtime.CompilerServices; using SimpleInjector; using Tapeti; using Tapeti.DataAnnotations; using Tapeti.Flow; using Tapeti.SimpleInjector; using System.Threading; +using Tapeti.Annotations; namespace Test { + public interface IDummy + { + [DynamicQueue("test1")] + void HandleMessage(PoloConfirmationResponseMessage msg); + } + internal class Program { private static void Main() @@ -25,6 +33,7 @@ namespace Test .WithFlow() .WithDataAnnotations() .RegisterAllControllers() + .RegisterController(typeof(IDummy)) //.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails .Build();