From 6bc6cfe2163efb5affeefe88769bab985e2a7684 Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Wed, 24 Apr 2019 18:04:30 +0200 Subject: [PATCH 01/10] MAX-911 RDB Relaties samenvoegen vanuit LEF en update ontvangen in LEF MAX-1081 POC Dictionary tasks in Web voor request Setup voor Transient request response met TODO's --- Tapeti.Flow/Default/FlowProvider.cs | 11 ++- Tapeti.Flow/Default/FlowState.cs | 5 +- Tapeti.Tests/TransientFilterMiddleware.cs | 14 +++ Tapeti.Transient/ConfigExtentions.cs | 13 +++ Tapeti.Transient/ITransientPublisher.cs | 9 ++ Tapeti.Transient/Tapeti.Transient.csproj | 11 +++ Tapeti.Transient/TransientGenericBinding.cs | 52 ++++++++++ Tapeti.Transient/TransientMiddleware.cs | 34 +++++++ Tapeti.Transient/TransientPublisher.cs | 22 +++++ Tapeti.Transient/TransientRouter.cs | 74 ++++++++++++++ Tapeti.sln | 8 +- Tapeti/Config/IConfig.cs | 1 + Tapeti/Config/ICustomBinding.cs | 31 ++++++ Tapeti/Config/ITapetiExtentionBinding.cs | 10 ++ Tapeti/Connection/TapetiWorker.cs | 3 + Tapeti/TapetiConfig.cs | 101 +++++++++++++++++--- Test/Program.cs | 9 ++ 17 files changed, 388 insertions(+), 20 deletions(-) create mode 100644 Tapeti.Tests/TransientFilterMiddleware.cs create mode 100644 Tapeti.Transient/ConfigExtentions.cs create mode 100644 Tapeti.Transient/ITransientPublisher.cs create mode 100644 Tapeti.Transient/Tapeti.Transient.csproj create mode 100644 Tapeti.Transient/TransientGenericBinding.cs create mode 100644 Tapeti.Transient/TransientMiddleware.cs create mode 100644 Tapeti.Transient/TransientPublisher.cs create mode 100644 Tapeti.Transient/TransientRouter.cs create mode 100644 Tapeti/Config/ICustomBinding.cs create mode 100644 Tapeti/Config/ITapetiExtentionBinding.cs diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 289b246..5c6d9d9 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -105,9 +105,9 @@ namespace Tapeti.Flow.Default // TODO disallow if replyto is not specified? if (reply.ReplyTo != null) - await publisher.PublishDirect(message, reply.ReplyTo, properties, true); + await publisher.PublishDirect(message, reply.ReplyTo, properties, reply.Mandatory); else - await publisher.Publish(message, properties, true); + await publisher.Publish(message, properties, reply.Mandatory); await context.Delete(); } @@ -129,8 +129,8 @@ namespace Tapeti.Flow.Default throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler)); var requestAttribute = request.GetType().GetCustomAttribute(); - if (requestAttribute?.Response != null && requestAttribute.Response != binding.MessageClass) - throw new ArgumentException($"responseHandler must accept message of type {binding.MessageClass}", nameof(responseHandler)); + if (requestAttribute?.Response != null && !binding.Accept(requestAttribute.Response)) + throw new ArgumentException($"responseHandler must accept message of type {requestAttribute.Response}", nameof(responseHandler)); var continuationAttribute = binding.Method.GetCustomAttribute(); if (continuationAttribute == null) @@ -157,7 +157,8 @@ namespace Tapeti.Flow.Default { CorrelationId = context.Properties.CorrelationId, ReplyTo = context.Properties.ReplyTo, - ResponseTypeName = requestAttribute.Response.FullName + ResponseTypeName = requestAttribute.Response.FullName, + Mandatory = context.Properties.Persistent }; } diff --git a/Tapeti.Flow/Default/FlowState.cs b/Tapeti.Flow/Default/FlowState.cs index d600a8f..e32430c 100644 --- a/Tapeti.Flow/Default/FlowState.cs +++ b/Tapeti.Flow/Default/FlowState.cs @@ -57,6 +57,8 @@ namespace Tapeti.Flow.Default public string CorrelationId { get; set; } public string ResponseTypeName { get; set; } + public bool Mandatory { get; set; } + public ReplyMetadata Clone() { @@ -64,7 +66,8 @@ namespace Tapeti.Flow.Default { ReplyTo = ReplyTo, CorrelationId = CorrelationId, - ResponseTypeName = ResponseTypeName + ResponseTypeName = ResponseTypeName, + Mandatory = Mandatory }; } } diff --git a/Tapeti.Tests/TransientFilterMiddleware.cs b/Tapeti.Tests/TransientFilterMiddleware.cs new file mode 100644 index 0000000..d311f03 --- /dev/null +++ b/Tapeti.Tests/TransientFilterMiddleware.cs @@ -0,0 +1,14 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Tests +{ + public class TransientFilterMiddleware : IMessageFilterMiddleware + { + public Task Handle(IMessageContext context, Func next) + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/Tapeti.Transient/ConfigExtentions.cs b/Tapeti.Transient/ConfigExtentions.cs new file mode 100644 index 0000000..7401578 --- /dev/null +++ b/Tapeti.Transient/ConfigExtentions.cs @@ -0,0 +1,13 @@ +using System; + +namespace Tapeti.Transient +{ + public static class ConfigExtensions + { + public static TapetiConfig WithTransient(this TapetiConfig config, TimeSpan defaultTimeout, string dynamicQueuePrefix = "transient") + { + config.Use(new TransientMiddleware(defaultTimeout, dynamicQueuePrefix)); + return config; + } + } +} \ No newline at end of file diff --git a/Tapeti.Transient/ITransientPublisher.cs b/Tapeti.Transient/ITransientPublisher.cs new file mode 100644 index 0000000..2765259 --- /dev/null +++ b/Tapeti.Transient/ITransientPublisher.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace Tapeti.Transient +{ + public interface ITransientPublisher + { + Task RequestResponse(TRequest request); + } +} \ No newline at end of file diff --git a/Tapeti.Transient/Tapeti.Transient.csproj b/Tapeti.Transient/Tapeti.Transient.csproj new file mode 100644 index 0000000..8d989f6 --- /dev/null +++ b/Tapeti.Transient/Tapeti.Transient.csproj @@ -0,0 +1,11 @@ + + + + netstandard2.0 + + + + + + + diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs new file mode 100644 index 0000000..f28643d --- /dev/null +++ b/Tapeti.Transient/TransientGenericBinding.cs @@ -0,0 +1,52 @@ +using System; +using System.Reflection; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Transient +{ + public class TransientGenericBinding : ICustomBinding + { + private readonly TransientRouter router; + + public TransientGenericBinding(TransientRouter router, string dynamicQueuePrefix) + { + this.router = router; + DynamicQueuePrefix = dynamicQueuePrefix; + Method = typeof(TransientRouter).GetMethod("GenericHandleResponse"); + } + + public Type Controller => typeof(TransientRouter); + + public MethodInfo Method { get; } + + public QueueBindingMode QueueBindingMode => QueueBindingMode.DirectToQueue; + + public string StaticQueueName => null; + + public string DynamicQueuePrefix { get; } + + public Type MessageClass => null; + + public bool Accept(Type messageClass) + { + return true; + } + + public bool Accept(IMessageContext context, object message) + { + return true; + } + + public Task Invoke(IMessageContext context, object message) + { + router.GenericHandleResponse(message, context); + return Task.CompletedTask; + } + + public void SetQueueName(string queueName) + { + router.TransientResponseQueueName = queueName; + } + } +} \ No newline at end of file diff --git a/Tapeti.Transient/TransientMiddleware.cs b/Tapeti.Transient/TransientMiddleware.cs new file mode 100644 index 0000000..24ef7f3 --- /dev/null +++ b/Tapeti.Transient/TransientMiddleware.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using Tapeti.Config; + +namespace Tapeti.Transient +{ + public class TransientMiddleware : ITapetiExtension, ITapetiExtentionBinding + { + private string dynamicQueuePrefix; + private TimeSpan defaultTimeout; + + public TransientMiddleware(TimeSpan defaultTimeout, string dynamicQueuePrefix) + { + this.dynamicQueuePrefix = dynamicQueuePrefix; + this.defaultTimeout = defaultTimeout; + } + + public void RegisterDefaults(IDependencyContainer container) + { + container.RegisterDefaultSingleton(() => new TransientRouter(container.Resolve(), defaultTimeout)); + container.RegisterDefault(); + } + + public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) + { + return new object[0]; + } + + public IEnumerable GetBindings(IDependencyResolver dependencyResolver) + { + yield return new TransientGenericBinding(dependencyResolver.Resolve(), dynamicQueuePrefix); + } + } +} \ No newline at end of file diff --git a/Tapeti.Transient/TransientPublisher.cs b/Tapeti.Transient/TransientPublisher.cs new file mode 100644 index 0000000..6bbae21 --- /dev/null +++ b/Tapeti.Transient/TransientPublisher.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti.Transient +{ + public class TransientPublisher : ITransientPublisher + { + private readonly TransientRouter router; + + public TransientPublisher(TransientRouter router) + { + this.router = router; + } + + public async Task RequestResponse(TRequest request) + { + return (TResponse)(await router.RequestResponse(request)); + } + } +} diff --git a/Tapeti.Transient/TransientRouter.cs b/Tapeti.Transient/TransientRouter.cs new file mode 100644 index 0000000..f91be75 --- /dev/null +++ b/Tapeti.Transient/TransientRouter.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using System.Threading; +using RabbitMQ.Client.Framing; +using Tapeti.Config; + +namespace Tapeti.Transient +{ + public class TransientRouter + { + private readonly TimeSpan defaultTimeout; + + private readonly ConcurrentDictionary> map = new ConcurrentDictionary>(); + + private readonly IInternalPublisher internalPublisher; + + public string TransientResponseQueueName { get; set; } + + public TransientRouter(IInternalPublisher internalPublisher, TimeSpan defaultTimeout) + { + this.internalPublisher = internalPublisher; + this.defaultTimeout = defaultTimeout; + } + + public void GenericHandleResponse(object response, IMessageContext context) + { + if (context.Properties.CorrelationId == null) + return; + + if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID)) + return; + + if (map.TryRemove(continuationID, out var tcs)) + tcs.SetResult(response); + } + + public async Task RequestResponse(object request) + { + var correlation = Guid.NewGuid(); + var tcs = map.GetOrAdd(correlation, c => new TaskCompletionSource()); + + try + { + var properties = new BasicProperties + { + CorrelationId = correlation.ToString(), + ReplyTo = TransientResponseQueueName, + Persistent = false + }; + + await internalPublisher.Publish(request, properties, false); + } + catch (Exception) + { + // Simple cleanup of the task and map dictionary. + if (map.TryRemove(correlation, out tcs)) + tcs.TrySetResult(null); + + throw; + } + + using (new Timer(TimeoutResponse, tcs, defaultTimeout, TimeSpan.MaxValue)) + { + return await tcs.Task; + } + } + + private void TimeoutResponse(object tcs) + { + ((TaskCompletionSource)tcs).SetException(new TimeoutException("Transient RequestResponse timed out at " + defaultTimeout)); + } + } +} \ No newline at end of file diff --git a/Tapeti.sln b/Tapeti.sln index 73f1486..114f0a9 100644 --- a/Tapeti.sln +++ b/Tapeti.sln @@ -19,7 +19,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Test", "Test\Test.csproj", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Tests", "Tapeti.Tests\Tapeti.Tests.csproj", "{334F3715-63CF-4D13-B09A-38E2A616D4F5}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Serilog", "Tapeti.Serilog\Tapeti.Serilog.csproj", "{43AA5DF3-49D5-4795-A290-D6511502B564}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Serilog", "Tapeti.Serilog\Tapeti.Serilog.csproj", "{43AA5DF3-49D5-4795-A290-D6511502B564}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Transient", "Tapeti.Transient\Tapeti.Transient.csproj", "{A6355E63-19AB-47EA-91FA-49B5E9B41F88}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -63,6 +65,10 @@ Global {43AA5DF3-49D5-4795-A290-D6511502B564}.Debug|Any CPU.Build.0 = Debug|Any CPU {43AA5DF3-49D5-4795-A290-D6511502B564}.Release|Any CPU.ActiveCfg = Release|Any CPU {43AA5DF3-49D5-4795-A290-D6511502B564}.Release|Any CPU.Build.0 = Release|Any CPU + {A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs index 6f7449d..1cdaad7 100644 --- a/Tapeti/Config/IConfig.cs +++ b/Tapeti/Config/IConfig.cs @@ -46,6 +46,7 @@ namespace Tapeti.Config IReadOnlyList MessageFilterMiddleware { get; } IReadOnlyList MessageMiddleware { get; } + bool Accept(Type messageClass); bool Accept(IMessageContext context, object message); Task Invoke(IMessageContext context, object message); } diff --git a/Tapeti/Config/ICustomBinding.cs b/Tapeti/Config/ICustomBinding.cs new file mode 100644 index 0000000..8b39247 --- /dev/null +++ b/Tapeti/Config/ICustomBinding.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface ICustomBinding + { + Type Controller { get; } + + MethodInfo Method { get; } + + QueueBindingMode QueueBindingMode { get; } + + string StaticQueueName { get; } + + string DynamicQueuePrefix { get; } + + Type MessageClass { get; } // Needed to get routing key information when QueueBindingMode = RoutingKey + + bool Accept(Type messageClass); + + bool Accept(IMessageContext context, object message); + + Task Invoke(IMessageContext context, object message); + + void SetQueueName(string queueName); + } +} diff --git a/Tapeti/Config/ITapetiExtentionBinding.cs b/Tapeti/Config/ITapetiExtentionBinding.cs new file mode 100644 index 0000000..5eee3a4 --- /dev/null +++ b/Tapeti/Config/ITapetiExtentionBinding.cs @@ -0,0 +1,10 @@ +using System.Collections.Generic; + +namespace Tapeti.Config +{ + public interface ITapetiExtentionBinding + { + IEnumerable GetBindings(IDependencyResolver dependencyResolver); + + } +} \ No newline at end of file diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index f9d577a..796bea1 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -109,6 +109,9 @@ namespace Tapeti.Connection { if (binding.QueueBindingMode == QueueBindingMode.RoutingKey) { + if (binding.MessageClass == null) + throw new NullReferenceException("Binding with QueueBindingMode = RoutingKey must have a MessageClass"); + var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); var exchange = exchangeStrategy.GetExchange(binding.MessageClass); diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index c785408..db83fd7 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -22,8 +22,8 @@ namespace Tapeti public class TapetiConfig { - private readonly Dictionary> staticRegistrations = new Dictionary>(); - private readonly Dictionary>> dynamicRegistrations = new Dictionary>>(); + private readonly Dictionary> staticRegistrations = new Dictionary>(); + private readonly Dictionary>> dynamicRegistrations = new Dictionary>>(); private readonly List bindingMiddleware = new List(); private readonly List messageMiddleware = new List(); @@ -79,12 +79,12 @@ namespace Tapeti // foreach (var prefixGroup in dynamicRegistrations) { - var dynamicBindings = new List>(); + var dynamicBindings = new List>(); foreach (var bindings in prefixGroup.Value.Values) { while (dynamicBindings.Count < bindings.Count) - dynamicBindings.Add(new List()); + dynamicBindings.Add(new List()); for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++) dynamicBindings[bindingIndex].Add(bindings[bindingIndex]); @@ -144,6 +144,8 @@ namespace Tapeti var middlewareBundle = extension.GetMiddleware(dependencyResolver); + (extension as ITapetiExtentionBinding)?.GetBindings(dependencyResolver); + // ReSharper disable once InvertIf if (middlewareBundle != null) { @@ -212,7 +214,8 @@ namespace Tapeti { var controllerQueueInfo = GetQueueInfo(controller); - (dependencyResolver as IDependencyContainer)?.RegisterController(controller); + if (!controller.IsInterface) + (dependencyResolver as IDependencyContainer)?.RegisterController(controller); foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance) .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) @@ -359,7 +362,7 @@ namespace Tapeti } - protected void AddStaticRegistration(IBindingContext context, Binding binding) + protected void AddStaticRegistration(IBindingContext context, IBindingQueueInfo binding) { if (staticRegistrations.ContainsKey(binding.QueueInfo.Name)) { @@ -372,23 +375,23 @@ namespace Tapeti existing.Add(binding); } else - staticRegistrations.Add(binding.QueueInfo.Name, new List { binding }); + staticRegistrations.Add(binding.QueueInfo.Name, new List { binding }); } - protected void AddDynamicRegistration(IBindingContext context, Binding binding) + protected void AddDynamicRegistration(IBindingContext context, IBindingQueueInfo binding) { var prefix = binding.QueueInfo.Name ?? ""; - if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary> prefixRegistrations)) + if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary> prefixRegistrations)) { - prefixRegistrations = new Dictionary>(); + prefixRegistrations = new Dictionary>(); dynamicRegistrations.Add(prefix, prefixRegistrations); } - if (!prefixRegistrations.TryGetValue(context.MessageClass, out List bindings)) + if (!prefixRegistrations.TryGetValue(context.MessageClass, out List bindings)) { - bindings = new List(); + bindings = new List(); prefixRegistrations.Add(context.MessageClass, bindings); } @@ -491,8 +494,12 @@ namespace Tapeti } } + protected interface IBindingQueueInfo : IBuildBinding + { + QueueInfo QueueInfo { get; } + } - protected class Binding : IBuildBinding + protected class Binding : IBindingQueueInfo { public Type Controller { get; set; } public MethodInfo Method { get; set; } @@ -523,6 +530,11 @@ namespace Tapeti } + public bool Accept(Type messageClass) + { + return MessageClass.IsAssignableFrom(messageClass); + } + public bool Accept(IMessageContext context, object message) { return message.GetType() == MessageClass; @@ -536,6 +548,69 @@ namespace Tapeti } + protected class CustomBinding : IBindingQueueInfo + { + private readonly ICustomBinding inner; + + public CustomBinding(ICustomBinding inner) + { + this.inner = inner; + + // Copy all variables to make them guaranteed readonly. + Controller = inner.Controller; + Method = inner.Method; + QueueBindingMode = inner.QueueBindingMode; + MessageClass = inner.MessageClass; + + QueueInfo = inner.StaticQueueName != null + ? new QueueInfo() + { + Dynamic = false, + Name = inner.StaticQueueName + } + : new QueueInfo() + { + Dynamic = true, + Name = inner.DynamicQueuePrefix + }; + + // Custom bindings cannot have other middleware messing with the binding. + MessageFilterMiddleware = new IMessageFilterMiddleware[0]; + MessageMiddleware = new IMessageMiddleware[0]; + } + + public Type Controller { get; } + public MethodInfo Method { get; } + public string QueueName { get; private set; } + public QueueBindingMode QueueBindingMode { get; set; } + public IReadOnlyList MessageFilterMiddleware { get; } + public IReadOnlyList MessageMiddleware { get; } + + public bool Accept(Type messageClass) + { + return inner.Accept(messageClass); + } + + public bool Accept(IMessageContext context, object message) + { + return inner.Accept(context, message); + } + + public Task Invoke(IMessageContext context, object message) + { + return inner.Invoke(context, message); + } + + public void SetQueueName(string queueName) + { + QueueName = queueName; + inner.SetQueueName(queueName); + } + + public Type MessageClass { get; } + public QueueInfo QueueInfo { get; } + } + internal interface IBindingParameterAccess { ValueFactory GetBinding(); diff --git a/Test/Program.cs b/Test/Program.cs index 62377aa..309361b 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -1,13 +1,21 @@ using System; +using System.Runtime.CompilerServices; using SimpleInjector; using Tapeti; using Tapeti.DataAnnotations; using Tapeti.Flow; using Tapeti.SimpleInjector; using System.Threading; +using Tapeti.Annotations; namespace Test { + public interface IDummy + { + [DynamicQueue("test1")] + void HandleMessage(PoloConfirmationResponseMessage msg); + } + internal class Program { private static void Main() @@ -25,6 +33,7 @@ namespace Test .WithFlow() .WithDataAnnotations() .RegisterAllControllers() + .RegisterController(typeof(IDummy)) //.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails .Build(); From 6cb701378de8bfb03175531c272b2e774d81d3ba Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Thu, 25 Apr 2019 15:19:51 +0200 Subject: [PATCH 02/10] MAX-911 RDB Relaties samenvoegen vanuit LEF en update ontvangen in LEF MAX-1081 POC Dictionary tasks in Web voor request Transient 0.1 --- Tapeti.Transient/TransientMiddleware.cs | 8 ++--- Tapeti.Transient/TransientPublisher.cs | 6 ++-- Tapeti.Transient/TransientRouter.cs | 16 ++++----- Tapeti/TapetiConfig.cs | 47 +++++++++++++++++++++---- Test/Program.cs | 34 ++++++++++-------- Test/Test.csproj | 1 + 6 files changed, 75 insertions(+), 37 deletions(-) diff --git a/Tapeti.Transient/TransientMiddleware.cs b/Tapeti.Transient/TransientMiddleware.cs index 24ef7f3..5077fa5 100644 --- a/Tapeti.Transient/TransientMiddleware.cs +++ b/Tapeti.Transient/TransientMiddleware.cs @@ -7,17 +7,17 @@ namespace Tapeti.Transient public class TransientMiddleware : ITapetiExtension, ITapetiExtentionBinding { private string dynamicQueuePrefix; - private TimeSpan defaultTimeout; + private readonly TransientRouter router; public TransientMiddleware(TimeSpan defaultTimeout, string dynamicQueuePrefix) { this.dynamicQueuePrefix = dynamicQueuePrefix; - this.defaultTimeout = defaultTimeout; + this.router = new TransientRouter(defaultTimeout); } public void RegisterDefaults(IDependencyContainer container) { - container.RegisterDefaultSingleton(() => new TransientRouter(container.Resolve(), defaultTimeout)); + container.RegisterDefaultSingleton(router); container.RegisterDefault(); } @@ -28,7 +28,7 @@ namespace Tapeti.Transient public IEnumerable GetBindings(IDependencyResolver dependencyResolver) { - yield return new TransientGenericBinding(dependencyResolver.Resolve(), dynamicQueuePrefix); + yield return new TransientGenericBinding(router, dynamicQueuePrefix); } } } \ No newline at end of file diff --git a/Tapeti.Transient/TransientPublisher.cs b/Tapeti.Transient/TransientPublisher.cs index 6bbae21..62715e7 100644 --- a/Tapeti.Transient/TransientPublisher.cs +++ b/Tapeti.Transient/TransientPublisher.cs @@ -8,15 +8,17 @@ namespace Tapeti.Transient public class TransientPublisher : ITransientPublisher { private readonly TransientRouter router; + private readonly IPublisher publisher; - public TransientPublisher(TransientRouter router) + public TransientPublisher(TransientRouter router, IPublisher publisher) { this.router = router; + this.publisher = publisher; } public async Task RequestResponse(TRequest request) { - return (TResponse)(await router.RequestResponse(request)); + return (TResponse)(await router.RequestResponse(publisher, request)); } } } diff --git a/Tapeti.Transient/TransientRouter.cs b/Tapeti.Transient/TransientRouter.cs index f91be75..775576b 100644 --- a/Tapeti.Transient/TransientRouter.cs +++ b/Tapeti.Transient/TransientRouter.cs @@ -9,18 +9,16 @@ namespace Tapeti.Transient { public class TransientRouter { - private readonly TimeSpan defaultTimeout; + private readonly int defaultTimeoutMs; private readonly ConcurrentDictionary> map = new ConcurrentDictionary>(); - private readonly IInternalPublisher internalPublisher; public string TransientResponseQueueName { get; set; } - public TransientRouter(IInternalPublisher internalPublisher, TimeSpan defaultTimeout) + public TransientRouter(TimeSpan defaultTimeout) { - this.internalPublisher = internalPublisher; - this.defaultTimeout = defaultTimeout; + defaultTimeoutMs = (int)defaultTimeout.TotalMilliseconds; } public void GenericHandleResponse(object response, IMessageContext context) @@ -35,7 +33,7 @@ namespace Tapeti.Transient tcs.SetResult(response); } - public async Task RequestResponse(object request) + public async Task RequestResponse(IPublisher publisher, object request) { var correlation = Guid.NewGuid(); var tcs = map.GetOrAdd(correlation, c => new TaskCompletionSource()); @@ -49,7 +47,7 @@ namespace Tapeti.Transient Persistent = false }; - await internalPublisher.Publish(request, properties, false); + await ((IInternalPublisher)publisher).Publish(request, properties, false); } catch (Exception) { @@ -60,7 +58,7 @@ namespace Tapeti.Transient throw; } - using (new Timer(TimeoutResponse, tcs, defaultTimeout, TimeSpan.MaxValue)) + using (new Timer(TimeoutResponse, tcs, defaultTimeoutMs, -1)) { return await tcs.Task; } @@ -68,7 +66,7 @@ namespace Tapeti.Transient private void TimeoutResponse(object tcs) { - ((TaskCompletionSource)tcs).SetException(new TimeoutException("Transient RequestResponse timed out at " + defaultTimeout)); + ((TaskCompletionSource)tcs).SetException(new TimeoutException("Transient RequestResponse timed out at (ms) " + defaultTimeoutMs)); } } } \ No newline at end of file diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index db83fd7..9291798 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -24,7 +24,9 @@ namespace Tapeti { private readonly Dictionary> staticRegistrations = new Dictionary>(); private readonly Dictionary>> dynamicRegistrations = new Dictionary>>(); + private readonly List uniqueRegistrations = new List(); + private readonly List customBindings = new List(); private readonly List bindingMiddleware = new List(); private readonly List messageMiddleware = new List(); private readonly List cleanupMiddleware = new List(); @@ -47,6 +49,8 @@ namespace Tapeti public IConfig Build() { + RegisterCustomBindings(); + RegisterDefaults(); var queues = new List(); @@ -93,6 +97,9 @@ namespace Tapeti queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl))); } + queues.AddRange(uniqueRegistrations.Select(b => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(b.QueueInfo.Name) }, new []{b}))); + + var config = new Config(queues) { DependencyResolver = dependencyResolver, @@ -144,7 +151,8 @@ namespace Tapeti var middlewareBundle = extension.GetMiddleware(dependencyResolver); - (extension as ITapetiExtentionBinding)?.GetBindings(dependencyResolver); + if (extension is ITapetiExtentionBinding extentionBindings) + customBindings.AddRange(extentionBindings.GetBindings(dependencyResolver)); // ReSharper disable once InvertIf if (middlewareBundle != null) @@ -244,9 +252,9 @@ namespace Tapeti }; if (methodQueueInfo.Dynamic.GetValueOrDefault()) - AddDynamicRegistration(context, handlerInfo); + AddDynamicRegistration(handlerInfo); else - AddStaticRegistration(context, handlerInfo); + AddStaticRegistration(handlerInfo); } return this; @@ -267,6 +275,27 @@ namespace Tapeti return RegisterAllControllers(Assembly.GetEntryAssembly()); } + private void RegisterCustomBindings() + { + foreach (var customBinding in customBindings) + { + // TODO Do we need to configure additional middleware, or does this only get confused if there is no MessageClass + + var binding = new CustomBinding(customBinding); + if (binding.QueueInfo.Dynamic == false) + { + AddStaticRegistration(binding); + } + else if (binding.MessageClass != null) + { + AddDynamicRegistration(binding); + } + else + { + AddUniqueRegistration(binding); + } + } + } protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method) { @@ -362,7 +391,7 @@ namespace Tapeti } - protected void AddStaticRegistration(IBindingContext context, IBindingQueueInfo binding) + protected void AddStaticRegistration(IBindingQueueInfo binding) { if (staticRegistrations.ContainsKey(binding.QueueInfo.Name)) { @@ -379,7 +408,7 @@ namespace Tapeti } - protected void AddDynamicRegistration(IBindingContext context, IBindingQueueInfo binding) + protected void AddDynamicRegistration(IBindingQueueInfo binding) { var prefix = binding.QueueInfo.Name ?? ""; @@ -389,15 +418,19 @@ namespace Tapeti dynamicRegistrations.Add(prefix, prefixRegistrations); } - if (!prefixRegistrations.TryGetValue(context.MessageClass, out List bindings)) + if (!prefixRegistrations.TryGetValue(binding.MessageClass, out List bindings)) { bindings = new List(); - prefixRegistrations.Add(context.MessageClass, bindings); + prefixRegistrations.Add(binding.MessageClass, bindings); } bindings.Add(binding); } + protected void AddUniqueRegistration(IBindingQueueInfo binding) + { + uniqueRegistrations.Add(binding); + } protected QueueInfo GetQueueInfo(MemberInfo member) { diff --git a/Test/Program.cs b/Test/Program.cs index 309361b..3969269 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -7,21 +7,16 @@ using Tapeti.Flow; using Tapeti.SimpleInjector; using System.Threading; using Tapeti.Annotations; +using Tapeti.Transient; namespace Test { - public interface IDummy - { - [DynamicQueue("test1")] - void HandleMessage(PoloConfirmationResponseMessage msg); - } - internal class Program { private static void Main() { // TODO logging - try + //try { var container = new Container(); container.Register(); @@ -32,8 +27,8 @@ namespace Test //.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true") .WithFlow() .WithDataAnnotations() + .WithTransient(TimeSpan.FromSeconds(30)) .RegisterAllControllers() - .RegisterController(typeof(IDummy)) //.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails .Build(); @@ -59,23 +54,32 @@ namespace Test Console.WriteLine("Done!"); - connection.GetPublisher().Publish(new FlowEndController.PingMessage()); + var response = container.GetInstance() + .RequestResponse( + new PoloConfirmationRequestMessage + { + StoredInState = new Guid("309088d8-9906-4ef3-bc64-56976538d3ab") + }).Result; + + Console.WriteLine(response.ShouldMatchState); + + //connection.GetPublisher().Publish(new FlowEndController.PingMessage()); //container.GetInstance().Start(c => c.StartFlow, true).Wait(); - container.GetInstance().Start(c => c.TestParallelRequest).Wait(); + //container.GetInstance().Start(c => c.TestParallelRequest).Wait(); Thread.Sleep(1000); - var emitter = container.GetInstance(); - emitter.Run().Wait(); + //var emitter = container.GetInstance(); + //emitter.Run().Wait(); } } - catch (Exception e) + //catch (Exception e) { - Console.WriteLine(e.ToString()); - Console.ReadKey(); + // Console.WriteLine(e.ToString()); + // Console.ReadKey(); } } } diff --git a/Test/Test.csproj b/Test/Test.csproj index 7aa20f6..00f9c1e 100644 --- a/Test/Test.csproj +++ b/Test/Test.csproj @@ -11,6 +11,7 @@ + From 5cebe96aff4a6c17a4ae1bcc1fe49d1a4614dd11 Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Thu, 25 Apr 2019 16:06:54 +0200 Subject: [PATCH 03/10] MAX-911 RDB Relaties samenvoegen vanuit LEF en update ontvangen in LEF MAX-1081 POC Dictionary tasks in Web voor request Bug fix reply moet ook bij de default handler niet mandatory kunnen zijn. --- Tapeti/Default/PublishResultBinding.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tapeti/Default/PublishResultBinding.cs b/Tapeti/Default/PublishResultBinding.cs index 2ffe4e1..76542c3 100644 --- a/Tapeti/Default/PublishResultBinding.cs +++ b/Tapeti/Default/PublishResultBinding.cs @@ -69,7 +69,7 @@ namespace Tapeti.Default properties.CorrelationId = messageContext.Properties.CorrelationId; if (messageContext.Properties.IsReplyToPresent()) - return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, true); + return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, messageContext.Properties.Persistent); return publisher.Publish(message, properties, false); } From f7382d89de3b140ce999b7d98e507f17424c1494 Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Thu, 25 Apr 2019 16:16:14 +0200 Subject: [PATCH 04/10] Added Tapeti.Transient to nuget publish --- appveyor.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/appveyor.yml b/appveyor.yml index d2d5812..f0e96d6 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -20,6 +20,8 @@ after_build: - cmd: appveyor PushArtifact "Tapeti.Flow.%GitVersion_NuGetVersion%.nupkg" - cmd: nuget pack Tapeti.Flow.SQL\Tapeti.Flow.SQL.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" - cmd: appveyor PushArtifact "Tapeti.Flow.SQL.%GitVersion_NuGetVersion%.nupkg" + - cmd: nuget pack Tapeti.Flow\Tapeti.Transient.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" + - cmd: appveyor PushArtifact "Tapeti.Transient.%GitVersion_NuGetVersion%.nupkg" - cmd: nuget pack Tapeti.SimpleInjector\Tapeti.SimpleInjector.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" - cmd: appveyor PushArtifact "Tapeti.SimpleInjector.%GitVersion_NuGetVersion%.nupkg" - cmd: nuget pack Tapeti.Serilog\Tapeti.Serilog.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" From 6c88cf9e9e4154505ab6a56ee025f7ccf3c5f4cf Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Thu, 25 Apr 2019 16:36:46 +0200 Subject: [PATCH 05/10] Added Nuspec file to Tapti.Transient --- Tapeti.Transient/Tapeti.Transient.nuspec | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 Tapeti.Transient/Tapeti.Transient.nuspec diff --git a/Tapeti.Transient/Tapeti.Transient.nuspec b/Tapeti.Transient/Tapeti.Transient.nuspec new file mode 100644 index 0000000..ad29af0 --- /dev/null +++ b/Tapeti.Transient/Tapeti.Transient.nuspec @@ -0,0 +1,23 @@ + + + + Tapeti.Transient + $version$ + Tapeti Transient + Menno van Lavieren, Mark van Renswoude + Mark van Renswoude + https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE + https://github.com/MvRens/Tapeti + https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.png + false + Transient extension for Tapeti + + rabbitmq tapeti transient + + + + + + + + \ No newline at end of file From 67c2fff0250bed64bfa4b7cd725a65961fbe1a94 Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Mon, 29 Apr 2019 17:34:19 +0200 Subject: [PATCH 06/10] MAX-1018 ModelID in workflow engine gebruiken - MAX-1127 Berichten service gebruik modelID Bug Fix in appveyor --- appveyor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/appveyor.yml b/appveyor.yml index f0e96d6..8a442c8 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -20,7 +20,7 @@ after_build: - cmd: appveyor PushArtifact "Tapeti.Flow.%GitVersion_NuGetVersion%.nupkg" - cmd: nuget pack Tapeti.Flow.SQL\Tapeti.Flow.SQL.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" - cmd: appveyor PushArtifact "Tapeti.Flow.SQL.%GitVersion_NuGetVersion%.nupkg" - - cmd: nuget pack Tapeti.Flow\Tapeti.Transient.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" + - cmd: nuget pack Tapeti.Transient\Tapeti.Transient.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" - cmd: appveyor PushArtifact "Tapeti.Transient.%GitVersion_NuGetVersion%.nupkg" - cmd: nuget pack Tapeti.SimpleInjector\Tapeti.SimpleInjector.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" - cmd: appveyor PushArtifact "Tapeti.SimpleInjector.%GitVersion_NuGetVersion%.nupkg" From 785cda387f4501f384824cba64bca753458d2298 Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Thu, 2 May 2019 13:26:59 +0200 Subject: [PATCH 07/10] Enforce loading of the flowstore before lookingup continuations, to prevent a common misuse that leads to data loss. --- Tapeti.Flow/Default/FlowStore.cs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index 2597007..8fb9dc2 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Tapeti.Flow.FlowHelpers; @@ -16,6 +17,7 @@ namespace Tapeti.Flow.Default private readonly IFlowRepository repository; private volatile bool inUse; + private volatile bool loaded; public FlowStore(IFlowRepository repository) { @@ -40,17 +42,25 @@ namespace Tapeti.Flow.Default foreach (var continuation in flowStateRecord.Value.Continuations) continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key); } + + loaded = true; } public Task FindFlowID(Guid continuationID) { + if (!loaded) + throw new InvalidOperationException("Flow store is not yet loaded."); + return Task.FromResult(continuationLookup.TryGetValue(continuationID, out var result) ? result : (Guid?)null); } public async Task LockFlowState(Guid flowID) { + if (!loaded && Debugger.IsAttached) + throw new InvalidOperationException("Flow store should be loaded before storing flows."); + inUse = true; var flowStatelock = new FlowStateLock(this, flowID, await locks.GetLock(flowID)); From e85807f622762ee18bdda248fa7c331d094b4e66 Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Thu, 2 May 2019 13:32:03 +0200 Subject: [PATCH 08/10] Enforce loading of the flowstore before lookingup continuations, to prevent a common misuse that leads to data loss. fix --- Tapeti.Flow/Default/FlowStore.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index 8fb9dc2..b98923d 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -58,7 +58,7 @@ namespace Tapeti.Flow.Default public async Task LockFlowState(Guid flowID) { - if (!loaded && Debugger.IsAttached) + if (!loaded) throw new InvalidOperationException("Flow store should be loaded before storing flows."); inUse = true; From cb552cc4cbc99c6ee43749147fde21e32636ba4e Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 23 May 2019 14:00:36 +0200 Subject: [PATCH 09/10] Enabled XML Documentation generation for all projects --- Tapeti.Annotations/Tapeti.Annotations.csproj | 1 + Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj | 1 + Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj | 1 + Tapeti.Flow/Tapeti.Flow.csproj | 1 + Tapeti.Serilog/Tapeti.Serilog.csproj | 1 + Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj | 1 + Tapeti.Transient/Tapeti.Transient.csproj | 1 + Tapeti/Connection/TapetiWorker.cs | 2 +- Tapeti/Tapeti.csproj | 1 + 9 files changed, 9 insertions(+), 1 deletion(-) diff --git a/Tapeti.Annotations/Tapeti.Annotations.csproj b/Tapeti.Annotations/Tapeti.Annotations.csproj index 9f5c4f4..be5c9ef 100644 --- a/Tapeti.Annotations/Tapeti.Annotations.csproj +++ b/Tapeti.Annotations/Tapeti.Annotations.csproj @@ -2,6 +2,7 @@ netstandard2.0 + true diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj index 3df5704..52e0d73 100644 --- a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj +++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj @@ -2,6 +2,7 @@ netstandard2.0 + true diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj index b80a796..eaa2e91 100644 --- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj +++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj @@ -2,6 +2,7 @@ netstandard2.0 + true diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index cc0d9d9..105aa14 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -2,6 +2,7 @@ netstandard2.0 + true diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj index ec64a23..b33e71b 100644 --- a/Tapeti.Serilog/Tapeti.Serilog.csproj +++ b/Tapeti.Serilog/Tapeti.Serilog.csproj @@ -2,6 +2,7 @@ netstandard2.0 + true diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj index 25ed29e..ed72a19 100644 --- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj @@ -2,6 +2,7 @@ netstandard2.0 + true diff --git a/Tapeti.Transient/Tapeti.Transient.csproj b/Tapeti.Transient/Tapeti.Transient.csproj index 8d989f6..f3cca6f 100644 --- a/Tapeti.Transient/Tapeti.Transient.csproj +++ b/Tapeti.Transient/Tapeti.Transient.csproj @@ -2,6 +2,7 @@ netstandard2.0 + true diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index 3a6a85b..2993535 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -40,7 +40,7 @@ namespace Tapeti.Connection private DateTime connectedDateTime; // These fields must be locked, since the callbacks for BasicAck/BasicReturn can run in a different thread - private readonly object confirmLock = new Object(); + private readonly object confirmLock = new object(); private readonly Dictionary confirmMessages = new Dictionary(); private readonly Dictionary returnRoutingKeys = new Dictionary(); diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index ecc266a..d4ecad3 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -2,6 +2,7 @@ netstandard2.0 + true From bcd2cbed6991a293e97ff74525bb0d102adc83a6 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 27 May 2019 11:48:10 +0200 Subject: [PATCH 10/10] Removed MessageHandler attribute in favor of WithMembers on MessageController and Queue attributes --- Tapeti.Annotations/DurableQueueAttribute.cs | 5 ++++- Tapeti.Annotations/DynamicQueueAttribute.cs | 5 +++++ .../MessageControllerAttribute.cs | 2 +- Tapeti.Annotations/MessageHandlerAttribute.cs | 17 ----------------- Tapeti.Annotations/RequestAttribute.cs | 3 +++ 5 files changed, 13 insertions(+), 19 deletions(-) delete mode 100644 Tapeti.Annotations/MessageHandlerAttribute.cs diff --git a/Tapeti.Annotations/DurableQueueAttribute.cs b/Tapeti.Annotations/DurableQueueAttribute.cs index 281d91f..8971044 100644 --- a/Tapeti.Annotations/DurableQueueAttribute.cs +++ b/Tapeti.Annotations/DurableQueueAttribute.cs @@ -14,9 +14,12 @@ namespace Tapeti.Annotations /// for deploy-time management of durable queues (shameless plug intended). /// [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] - [MeansImplicitUse] + [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)] public class DurableQueueAttribute : Attribute { + /// + /// Specifies the name of the durable queue (must already be declared). + /// public string Name { get; set; } diff --git a/Tapeti.Annotations/DynamicQueueAttribute.cs b/Tapeti.Annotations/DynamicQueueAttribute.cs index 3743edf..240b001 100644 --- a/Tapeti.Annotations/DynamicQueueAttribute.cs +++ b/Tapeti.Annotations/DynamicQueueAttribute.cs @@ -12,6 +12,11 @@ namespace Tapeti.Annotations [MeansImplicitUse] public class DynamicQueueAttribute : Attribute { + /// + /// An optional prefix. If specified, Tapeti will compose the queue name using the + /// prefix and a unique ID. If not specified, an empty queue name will be passed + /// to RabbitMQ thus letting it create a unique queue name. + /// public string Prefix { get; set; } diff --git a/Tapeti.Annotations/MessageControllerAttribute.cs b/Tapeti.Annotations/MessageControllerAttribute.cs index 150fefc..6a18416 100644 --- a/Tapeti.Annotations/MessageControllerAttribute.cs +++ b/Tapeti.Annotations/MessageControllerAttribute.cs @@ -9,7 +9,7 @@ namespace Tapeti.Annotations /// when using the RegisterAllControllers method. It is not required when manually registering a controller. /// [AttributeUsage(AttributeTargets.Class)] - [MeansImplicitUse] + [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)] public class MessageControllerAttribute : Attribute { } diff --git a/Tapeti.Annotations/MessageHandlerAttribute.cs b/Tapeti.Annotations/MessageHandlerAttribute.cs deleted file mode 100644 index d13724e..0000000 --- a/Tapeti.Annotations/MessageHandlerAttribute.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; -using JetBrains.Annotations; - -namespace Tapeti.Annotations -{ - /// - /// - /// This attribute does nothing in runtime and is not required. It is only used as - /// a hint to ReSharper, and maybe developers as well, to indicate the method is - /// indeed used. - /// - [AttributeUsage(AttributeTargets.Method)] - [MeansImplicitUse] - public class MessageHandlerAttribute : Attribute - { - } -} diff --git a/Tapeti.Annotations/RequestAttribute.cs b/Tapeti.Annotations/RequestAttribute.cs index 2f14097..f298c50 100644 --- a/Tapeti.Annotations/RequestAttribute.cs +++ b/Tapeti.Annotations/RequestAttribute.cs @@ -13,6 +13,9 @@ namespace Tapeti.Annotations [AttributeUsage(AttributeTargets.Class)] public class RequestAttribute : Attribute { + /// + /// The type of the message class which must be returned as the response. + /// public Type Response { get; set; } } }