From 6cf2314ae0c2ff8092df14793032b2cd631fce2c Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 6 Apr 2023 07:44:45 +0200 Subject: [PATCH 1/4] Prevent possible concurrency issues in ParallelRequestBuilder --- Tapeti.Flow/Default/FlowProvider.cs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 8b6d416..df1a15c 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -378,14 +378,16 @@ namespace Tapeti.Flow.Default if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); - await Task.WhenAll(requests.Select(requestInfo => - flowProvider.SendRequest( - context, + foreach (var requestInfo in requests) + { + await flowProvider.SendRequest( + context, requestInfo.Message, requestInfo.ResponseHandlerInfo, convergeMethod.Method.Name, convergeMethodSync, - false))); + false); + } await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); }); From 6b38d594683af61c4a0d5f52e89a9730cbcef8e1 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 13 Apr 2023 08:39:43 +0200 Subject: [PATCH 2/4] Fixed #39: Stateless Request-Response does not filter target controller method Added NoBinding attribute --- .../Default/FlowContinuationMiddleware.cs | 1 + Tapeti.Flow/Default/FlowProvider.cs | 1 + Tapeti.Flow/Default/FlowStore.cs | 1 + .../RequestResponseFilterController.cs | 67 ++++++++++++++ Tapeti.Tests/Client/ControllerTests.cs | 87 +++++++++++++++++++ Tapeti.Tests/Tapeti.Tests.csproj | 2 + .../Config/Annotations/NoBindingAttribute.cs | 14 +++ Tapeti/Connection/TapetiPublisher.cs | 3 +- Tapeti/Default/ResponseFilterMiddleware.cs | 37 ++++++++ .../Helpers}/MethodSerializer.cs | 2 +- Tapeti/Tapeti.csproj | 1 + Tapeti/TapetiConfigControllers.cs | 15 ++++ 12 files changed, 229 insertions(+), 2 deletions(-) create mode 100644 Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs create mode 100644 Tapeti.Tests/Client/ControllerTests.cs create mode 100644 Tapeti/Config/Annotations/NoBindingAttribute.cs create mode 100644 Tapeti/Default/ResponseFilterMiddleware.cs rename {Tapeti.Flow/FlowHelpers => Tapeti/Helpers}/MethodSerializer.cs (97%) diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index 51098d3..2bc293e 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using Tapeti.Config; using Tapeti.Flow.FlowHelpers; +using Tapeti.Helpers; namespace Tapeti.Flow.Default { diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 91ffb39..f0041f6 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -9,6 +9,7 @@ using Tapeti.Config; using Tapeti.Default; using Tapeti.Flow.Annotations; using Tapeti.Flow.FlowHelpers; +using Tapeti.Helpers; namespace Tapeti.Flow.Default { diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index 3d26ca5..6a7306c 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -6,6 +6,7 @@ using System.Linq; using System.Threading.Tasks; using Tapeti.Config; using Tapeti.Flow.FlowHelpers; +using Tapeti.Helpers; namespace Tapeti.Flow.Default { diff --git a/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs b/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs new file mode 100644 index 0000000..1b9634c --- /dev/null +++ b/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs @@ -0,0 +1,67 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Annotations; +using Tapeti.Config.Annotations; + +namespace Tapeti.Tests.Client.Controller +{ + [Request(Response = typeof(FilteredResponseMessage))] + public class FilteredRequestMessage + { + public int ExpectedHandler { get; set; } + } + + public class FilteredResponseMessage + { + public int ExpectedHandler { get; set; } + } + + + #pragma warning disable CA1822 // Mark members as static + [MessageController] + [DurableQueue("request.response.filter")] + public class RequestResponseFilterController + { + public static TaskCompletionSource ValidResponse { get; private set; } = new(); + public static TaskCompletionSource InvalidResponse { get; private set; } = new(); + + + public FilteredResponseMessage EchoRequest(FilteredRequestMessage message) + { + return new FilteredResponseMessage + { + ExpectedHandler = message.ExpectedHandler + }; + } + + + [NoBinding] + public static void ResetCompletionSource() + { + ValidResponse = new TaskCompletionSource(); + InvalidResponse = new TaskCompletionSource(); + } + + + + [ResponseHandler] + public void Handler1(FilteredResponseMessage message) + { + if (message.ExpectedHandler != 1) + InvalidResponse.TrySetResult(1); + else + ValidResponse.SetResult(1); + } + + + [ResponseHandler] + public void Handler2(FilteredResponseMessage message) + { + if (message.ExpectedHandler != 2) + InvalidResponse.TrySetResult(2); + else + ValidResponse.SetResult(2); + } + } + #pragma warning restore CA1822 +} diff --git a/Tapeti.Tests/Client/ControllerTests.cs b/Tapeti.Tests/Client/ControllerTests.cs new file mode 100644 index 0000000..ed08378 --- /dev/null +++ b/Tapeti.Tests/Client/ControllerTests.cs @@ -0,0 +1,87 @@ +using System.Threading.Tasks; +using FluentAssertions; +using SimpleInjector; +using Tapeti.Config; +using Tapeti.SimpleInjector; +using Tapeti.Tests.Client.Controller; +using Tapeti.Tests.Mock; +using Xunit; +using Xunit.Abstractions; + +namespace Tapeti.Tests.Client +{ + [Collection(RabbitMQCollection.Name)] + [Trait("Category", "Requires Docker")] + public class ControllerTests : IAsyncLifetime + { + private readonly RabbitMQFixture fixture; + private readonly Container container = new(); + + private TapetiConnection? connection; + + + public ControllerTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper) + { + this.fixture = fixture; + + container.RegisterInstance(new MockLogger(testOutputHelper)); + } + + + public Task InitializeAsync() + { + return Task.CompletedTask; + } + + + public async Task DisposeAsync() + { + if (connection != null) + await connection.DisposeAsync(); + } + + + + [Fact] + public async Task RequestResponseFilter() + { + var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) + .EnableDeclareDurableQueues() + .RegisterController() + .Build(); + + connection = CreateConnection(config); + await connection.Subscribe(); + + + await connection.GetPublisher().PublishRequest(new FilteredRequestMessage + { + ExpectedHandler = 2 + }, c => c.Handler2); + + + var handler = await RequestResponseFilterController.ValidResponse.Task; + handler.Should().Be(2); + + var invalidHandler = await Task.WhenAny(RequestResponseFilterController.InvalidResponse.Task, Task.Delay(1000)); + invalidHandler.Should().NotBe(RequestResponseFilterController.InvalidResponse.Task); + } + + + private TapetiConnection CreateConnection(ITapetiConfig config) + { + return new TapetiConnection(config) + { + Params = new TapetiConnectionParams + { + HostName = "127.0.0.1", + Port = fixture.RabbitMQPort, + ManagementPort = fixture.RabbitMQManagementPort, + Username = RabbitMQFixture.RabbitMQUsername, + Password = RabbitMQFixture.RabbitMQPassword, + PrefetchCount = 1 + } + }; + } + } +} \ No newline at end of file diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index a5d8368..5a38b4e 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -14,6 +14,7 @@ + @@ -23,6 +24,7 @@ + diff --git a/Tapeti/Config/Annotations/NoBindingAttribute.cs b/Tapeti/Config/Annotations/NoBindingAttribute.cs new file mode 100644 index 0000000..4150fa0 --- /dev/null +++ b/Tapeti/Config/Annotations/NoBindingAttribute.cs @@ -0,0 +1,14 @@ +using System; +using JetBrains.Annotations; + +namespace Tapeti.Config.Annotations +{ + /// + /// Indicates that the method is not a message handler and should not be bound by Tapeti. + /// + [AttributeUsage(AttributeTargets.Method)] + [PublicAPI] + public class NoBindingAttribute : Attribute + { + } +} diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs index 8f18b10..865d5a9 100644 --- a/Tapeti/Connection/TapetiPublisher.cs +++ b/Tapeti/Connection/TapetiPublisher.cs @@ -82,7 +82,8 @@ namespace Tapeti.Connection var properties = new MessageProperties { - ReplyTo = binding.QueueName + CorrelationId = ResponseFilterMiddleware.CorrelationIdRequestPrefix + MethodSerializer.Serialize(responseHandler), + ReplyTo = binding.QueueName, }; await Publish(message, properties, IsMandatory(message)); diff --git a/Tapeti/Default/ResponseFilterMiddleware.cs b/Tapeti/Default/ResponseFilterMiddleware.cs new file mode 100644 index 0000000..8d990b6 --- /dev/null +++ b/Tapeti/Default/ResponseFilterMiddleware.cs @@ -0,0 +1,37 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Config; +using Tapeti.Helpers; + +namespace Tapeti.Default +{ + /// /> + /// + /// Handles methods marked with the ResponseHandler attribute. + /// + internal class ResponseFilterMiddleware : IControllerFilterMiddleware//, IControllerMessageMiddleware + { + internal const string CorrelationIdRequestPrefix = "request|"; + + + public async ValueTask Filter(IMessageContext context, Func next) + { + if (!context.TryGet(out var controllerPayload)) + return; + + // If no CorrelationId is present, this could be a request-response in flight from a previous version of + // Tapeti so we should not filter the response handler. + if (!string.IsNullOrEmpty(context.Properties.CorrelationId)) + { + if (!context.Properties.CorrelationId.StartsWith(CorrelationIdRequestPrefix)) + return; + + var methodName = context.Properties.CorrelationId[CorrelationIdRequestPrefix.Length..]; + if (methodName != MethodSerializer.Serialize(controllerPayload.Binding.Method)) + return; + } + + await next(); + } + } +} diff --git a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs b/Tapeti/Helpers/MethodSerializer.cs similarity index 97% rename from Tapeti.Flow/FlowHelpers/MethodSerializer.cs rename to Tapeti/Helpers/MethodSerializer.cs index 38c6f93..e0ea4a4 100644 --- a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs +++ b/Tapeti/Helpers/MethodSerializer.cs @@ -1,7 +1,7 @@ using System.Reflection; using System.Text.RegularExpressions; -namespace Tapeti.Flow.FlowHelpers +namespace Tapeti.Helpers { /// /// Converts a method into a unique string representation. diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 67eaba9..e47700c 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -24,6 +24,7 @@ + diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs index 8459445..e81032b 100644 --- a/Tapeti/TapetiConfigControllers.cs +++ b/Tapeti/TapetiConfigControllers.cs @@ -4,6 +4,7 @@ using System.Reflection; using System.Text; using Tapeti.Annotations; using Tapeti.Config; +using Tapeti.Config.Annotations; using Tapeti.Connection; using Tapeti.Default; @@ -48,12 +49,18 @@ namespace Tapeti .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) .Select(m => (MethodInfo)m)) { + if (method.GetCustomAttributes().Any()) + continue; + var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute() != null; var context = new ControllerBindingContext(controller, method, method.GetParameters(), method.ReturnParameter); if (method.GetCustomAttribute() != null) + { context.SetBindingTargetMode(BindingTargetMode.Direct); + context.Use(new ResponseFilterMiddleware()); + } var allowBinding = false; @@ -100,6 +107,14 @@ namespace Tapeti } + /// + public static ITapetiConfigBuilder RegisterController(this ITapetiConfigBuilder builder) where TController : class + { + return RegisterController(builder, typeof(TController)); + } + + + /// /// Registers all controllers in the specified assembly which are marked with the MessageController attribute. /// From 4ce318b560545c540be62ad851a0c2987d3fea95 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 14 Apr 2023 15:47:27 +0200 Subject: [PATCH 3/4] #43 Move binding related attributes to Tapeti Core --- .../ExampleMessageController.cs | 2 +- .../ExampleMessageController.cs | 2 +- .../ParallelFlowController.cs | 2 +- .../ReceivingMessageController.cs | 2 +- .../SimpleFlowController.cs | 2 +- .../04-Transient/UsersMessageController.cs | 2 +- .../05-SpeedTest/SpeedMessageController.cs | 2 +- .../ExampleMessageController.cs | 2 +- .../ReceivingMessageController.cs | 2 +- .../ParallelizationMessageController.cs | 2 +- .../SlowMessageController.cs | 2 +- .../SpeedyMessageController.cs | 2 +- Tapeti.Autofac/Tapeti.Autofac.csproj | 1 - .../Tapeti.CastleWindsor.csproj | 1 - .../WindsorDependencyResolver.cs | 2 + .../Tapeti.DataAnnotations.csproj | 1 - Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj | 8 +- .../Default/FlowContinuationMiddleware.cs | 1 - Tapeti.Flow/Default/FlowProvider.cs | 3 +- .../ReSharper/JetBrains.Annotations.cs | 180 ------------------ Tapeti.Flow/Tapeti.Flow.csproj | 6 - Tapeti.Ninject/Tapeti.Ninject.csproj | 1 - Tapeti.Serilog/Tapeti.Serilog.csproj | 1 - .../Tapeti.SimpleInjector.csproj | 1 - .../RequestResponseFilterController.cs | 6 +- Tapeti.Tests/Client/ControllerTests.cs | 2 +- Tapeti.Tests/Client/RabbitMQFixture.cs | 2 +- Tapeti.Tests/Client/TapetiClientTests.cs | 3 +- Tapeti.Tests/Config/QueueArgumentsTest.cs | 6 +- Tapeti.Tests/Config/SimpleControllerTest.cs | 2 +- Tapeti.Tests/Tapeti.Tests.csproj | 2 +- Tapeti.Transient/Tapeti.Transient.csproj | 1 - .../BackwardsCompatibilityHelpers.cs | 77 ++++++++ .../Annotations/DurableQueueAttribute.cs | 28 +++ .../Annotations/DynamicQueueAttribute.cs | 32 ++++ .../Annotations/MessageControllerAttribute.cs | 16 ++ .../Annotations/QueueArgumentsAttribute.cs | 106 +++++++++++ .../Annotations/ResponseHandlerAttribute.cs | 15 ++ Tapeti/Config/ITapetiConfigBuilder.cs | 2 + Tapeti/Connection/TapetiPublisher.cs | 5 +- Tapeti/Default/ControllerMethodBinding.cs | 10 +- Tapeti/Default/DependencyResolverBinding.cs | 2 +- Tapeti/MessageController.cs | 2 +- Tapeti/Tapeti.csproj | 12 +- Tapeti/TapetiConfigControllers.cs | 19 +- 45 files changed, 321 insertions(+), 259 deletions(-) delete mode 100644 Tapeti.Flow/ReSharper/JetBrains.Annotations.cs create mode 100644 Tapeti/Config/Annotations/BackwardsCompatibilityHelpers.cs create mode 100644 Tapeti/Config/Annotations/DurableQueueAttribute.cs create mode 100644 Tapeti/Config/Annotations/DynamicQueueAttribute.cs create mode 100644 Tapeti/Config/Annotations/MessageControllerAttribute.cs create mode 100644 Tapeti/Config/Annotations/QueueArgumentsAttribute.cs create mode 100644 Tapeti/Config/Annotations/ResponseHandlerAttribute.cs diff --git a/Examples/01-PublishSubscribe/ExampleMessageController.cs b/Examples/01-PublishSubscribe/ExampleMessageController.cs index 6cce0a1..eeb7581 100644 --- a/Examples/01-PublishSubscribe/ExampleMessageController.cs +++ b/Examples/01-PublishSubscribe/ExampleMessageController.cs @@ -1,7 +1,7 @@ using System; using ExampleLib; using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; namespace _01_PublishSubscribe { diff --git a/Examples/02-DeclareDurableQueues/ExampleMessageController.cs b/Examples/02-DeclareDurableQueues/ExampleMessageController.cs index ee5266e..d091a7e 100644 --- a/Examples/02-DeclareDurableQueues/ExampleMessageController.cs +++ b/Examples/02-DeclareDurableQueues/ExampleMessageController.cs @@ -1,7 +1,7 @@ using System; using ExampleLib; using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; namespace _02_DeclareDurableQueues { diff --git a/Examples/03-FlowRequestResponse/ParallelFlowController.cs b/Examples/03-FlowRequestResponse/ParallelFlowController.cs index 00f9687..34fe2c4 100644 --- a/Examples/03-FlowRequestResponse/ParallelFlowController.cs +++ b/Examples/03-FlowRequestResponse/ParallelFlowController.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using ExampleLib; using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; using Tapeti.Flow; using Tapeti.Flow.Annotations; diff --git a/Examples/03-FlowRequestResponse/ReceivingMessageController.cs b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs index 97c498c..11d0e16 100644 --- a/Examples/03-FlowRequestResponse/ReceivingMessageController.cs +++ b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs @@ -1,6 +1,6 @@ using System.Threading.Tasks; using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; namespace _03_FlowRequestResponse { diff --git a/Examples/03-FlowRequestResponse/SimpleFlowController.cs b/Examples/03-FlowRequestResponse/SimpleFlowController.cs index e093ccd..0ea6aff 100644 --- a/Examples/03-FlowRequestResponse/SimpleFlowController.cs +++ b/Examples/03-FlowRequestResponse/SimpleFlowController.cs @@ -1,7 +1,7 @@ using System; using ExampleLib; using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; using Tapeti.Flow; using Tapeti.Flow.Annotations; diff --git a/Examples/04-Transient/UsersMessageController.cs b/Examples/04-Transient/UsersMessageController.cs index 5565c49..a5b147f 100644 --- a/Examples/04-Transient/UsersMessageController.cs +++ b/Examples/04-Transient/UsersMessageController.cs @@ -1,7 +1,7 @@ using System; using System.Threading.Tasks; using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; namespace _04_Transient { diff --git a/Examples/05-SpeedTest/SpeedMessageController.cs b/Examples/05-SpeedTest/SpeedMessageController.cs index b0e5386..8ff04e7 100644 --- a/Examples/05-SpeedTest/SpeedMessageController.cs +++ b/Examples/05-SpeedTest/SpeedMessageController.cs @@ -1,5 +1,5 @@ using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; namespace _05_SpeedTest { diff --git a/Examples/06-StatelessRequestResponse/ExampleMessageController.cs b/Examples/06-StatelessRequestResponse/ExampleMessageController.cs index bc908ab..6f08afd 100644 --- a/Examples/06-StatelessRequestResponse/ExampleMessageController.cs +++ b/Examples/06-StatelessRequestResponse/ExampleMessageController.cs @@ -1,7 +1,7 @@ using System; using ExampleLib; using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; namespace _06_StatelessRequestResponse { diff --git a/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs b/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs index 4a2704b..9257a68 100644 --- a/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs +++ b/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs @@ -1,5 +1,5 @@ using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; namespace _06_StatelessRequestResponse { diff --git a/Examples/07-ParallelizationTest/ParallelizationMessageController.cs b/Examples/07-ParallelizationTest/ParallelizationMessageController.cs index 209ba39..bc6c06b 100644 --- a/Examples/07-ParallelizationTest/ParallelizationMessageController.cs +++ b/Examples/07-ParallelizationTest/ParallelizationMessageController.cs @@ -1,6 +1,6 @@ using System.Threading.Tasks; using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; namespace _07_ParallelizationTest { diff --git a/Examples/08-MessageHandlerLogging/SlowMessageController.cs b/Examples/08-MessageHandlerLogging/SlowMessageController.cs index 33695b2..93be67d 100644 --- a/Examples/08-MessageHandlerLogging/SlowMessageController.cs +++ b/Examples/08-MessageHandlerLogging/SlowMessageController.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using ExampleLib; using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; namespace _08_MessageHandlerLogging { diff --git a/Examples/08-MessageHandlerLogging/SpeedyMessageController.cs b/Examples/08-MessageHandlerLogging/SpeedyMessageController.cs index 0409d10..f021c41 100644 --- a/Examples/08-MessageHandlerLogging/SpeedyMessageController.cs +++ b/Examples/08-MessageHandlerLogging/SpeedyMessageController.cs @@ -1,6 +1,6 @@ using System; using Messaging.TapetiExample; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; using Tapeti.Serilog; namespace _08_MessageHandlerLogging diff --git a/Tapeti.Autofac/Tapeti.Autofac.csproj b/Tapeti.Autofac/Tapeti.Autofac.csproj index 10e4b50..e6664b3 100644 --- a/Tapeti.Autofac/Tapeti.Autofac.csproj +++ b/Tapeti.Autofac/Tapeti.Autofac.csproj @@ -11,7 +11,6 @@ https://github.com/MvRens/Tapeti Tapeti.SimpleInjector.png 2.0.0 - 9 enable diff --git a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj index bd1af38..892f497 100644 --- a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj +++ b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj @@ -11,7 +11,6 @@ https://github.com/MvRens/Tapeti Tapeti.SimpleInjector.png 2.0.0 - 9 enable diff --git a/Tapeti.CastleWindsor/WindsorDependencyResolver.cs b/Tapeti.CastleWindsor/WindsorDependencyResolver.cs index 95d5f5e..46018bf 100644 --- a/Tapeti.CastleWindsor/WindsorDependencyResolver.cs +++ b/Tapeti.CastleWindsor/WindsorDependencyResolver.cs @@ -1,12 +1,14 @@ using System; using Castle.MicroKernel.Registration; using Castle.Windsor; +using JetBrains.Annotations; namespace Tapeti.CastleWindsor { /// /// Dependency resolver and container implementation for Castle Windsor. /// + [PublicAPI] public class WindsorDependencyResolver : IDependencyContainer { private readonly IWindsorContainer container; diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj index 713493f..276f6ba 100644 --- a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj +++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj @@ -11,7 +11,6 @@ https://github.com/MvRens/Tapeti Tapeti.DataAnnotations.png 2.0.0 - 9 enable diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj index 9650958..db7eff7 100644 --- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj +++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj @@ -1,4 +1,4 @@ - + net6.0;net7.0 @@ -11,7 +11,6 @@ https://github.com/MvRens/Tapeti Tapeti.Flow.SQL.png 2.0.0 - 9 enable @@ -19,11 +18,6 @@ 1701;1702 - - - IDE0063 - - diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index 2bc293e..ebd4fc6 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -1,7 +1,6 @@ using System; using System.Threading.Tasks; using Tapeti.Config; -using Tapeti.Flow.FlowHelpers; using Tapeti.Helpers; namespace Tapeti.Flow.Default diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index f0041f6..d162836 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -8,7 +8,6 @@ using Tapeti.Annotations; using Tapeti.Config; using Tapeti.Default; using Tapeti.Flow.Annotations; -using Tapeti.Flow.FlowHelpers; using Tapeti.Helpers; namespace Tapeti.Flow.Default @@ -135,7 +134,7 @@ namespace Tapeti.Flow.Default { await context.Delete(); - if (context.HasFlowStateAndLock && context.FlowState.Metadata.Reply != null) + if (context is { HasFlowStateAndLock: true, FlowState.Metadata.Reply: { } }) throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); } diff --git a/Tapeti.Flow/ReSharper/JetBrains.Annotations.cs b/Tapeti.Flow/ReSharper/JetBrains.Annotations.cs deleted file mode 100644 index 39940b0..0000000 --- a/Tapeti.Flow/ReSharper/JetBrains.Annotations.cs +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Stripped version of the ReSharper Annotations source. Enables hinting without referencing the - * ReSharper.Annotations NuGet package. - * - * If you need more annotations, this code was generated using - * ReSharper - Options - Code Annotations - Copy C# implementation to clipboard - */ - - -/* MIT License - -Copyright (c) 2016 JetBrains http://www.jetbrains.com - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. */ - -using System; - -#pragma warning disable 1591 -// ReSharper disable UnusedMember.Global -// ReSharper disable MemberCanBePrivate.Global -// ReSharper disable UnusedAutoPropertyAccessor.Global -// ReSharper disable IntroduceOptionalParameters.Global -// ReSharper disable MemberCanBeProtected.Global -// ReSharper disable InconsistentNaming -// ReSharper disable InheritdocConsiderUsage - -// ReSharper disable once CheckNamespace -namespace JetBrains.Annotations -{ - /// - /// Indicates that the value of the marked element could be null sometimes, - /// so the check for null is necessary before its usage. - /// - /// - /// [CanBeNull] object Test() => null; - /// - /// void UseTest() { - /// var p = Test(); - /// var s = p.ToString(); // Warning: Possible 'System.NullReferenceException' - /// } - /// - [AttributeUsage( - AttributeTargets.Method | AttributeTargets.Parameter | AttributeTargets.Property | - AttributeTargets.Delegate | AttributeTargets.Field | AttributeTargets.Event | - AttributeTargets.Class | AttributeTargets.Interface | AttributeTargets.GenericParameter)] - internal sealed class CanBeNullAttribute : Attribute { } - - /// - /// Indicates that the value of the marked element could never be null. - /// - /// - /// [NotNull] object Foo() { - /// return null; // Warning: Possible 'null' assignment - /// } - /// - [AttributeUsage( - AttributeTargets.Method | AttributeTargets.Parameter | AttributeTargets.Property | - AttributeTargets.Delegate | AttributeTargets.Field | AttributeTargets.Event | - AttributeTargets.Class | AttributeTargets.Interface | AttributeTargets.GenericParameter)] - internal sealed class NotNullAttribute : Attribute { } - - /// - /// Indicates that the marked symbol is used implicitly (e.g. via reflection, in external library), - /// so this symbol will not be marked as unused (as well as by other usage inspections). - /// - [AttributeUsage(AttributeTargets.All)] - internal sealed class UsedImplicitlyAttribute : Attribute - { - public UsedImplicitlyAttribute() - : this(ImplicitUseKindFlags.Default, ImplicitUseTargetFlags.Default) { } - - public UsedImplicitlyAttribute(ImplicitUseKindFlags useKindFlags) - : this(useKindFlags, ImplicitUseTargetFlags.Default) { } - - public UsedImplicitlyAttribute(ImplicitUseTargetFlags targetFlags) - : this(ImplicitUseKindFlags.Default, targetFlags) { } - - public UsedImplicitlyAttribute(ImplicitUseKindFlags useKindFlags, ImplicitUseTargetFlags targetFlags) - { - UseKindFlags = useKindFlags; - TargetFlags = targetFlags; - } - - public ImplicitUseKindFlags UseKindFlags { get; } - - public ImplicitUseTargetFlags TargetFlags { get; } - } - - /// - /// Should be used on attributes and causes ReSharper to not mark symbols marked with such attributes - /// as unused (as well as by other usage inspections) - /// - [AttributeUsage(AttributeTargets.Class | AttributeTargets.GenericParameter)] - internal sealed class MeansImplicitUseAttribute : Attribute - { - public MeansImplicitUseAttribute() - : this(ImplicitUseKindFlags.Default, ImplicitUseTargetFlags.Default) { } - - public MeansImplicitUseAttribute(ImplicitUseKindFlags useKindFlags) - : this(useKindFlags, ImplicitUseTargetFlags.Default) { } - - public MeansImplicitUseAttribute(ImplicitUseTargetFlags targetFlags) - : this(ImplicitUseKindFlags.Default, targetFlags) { } - - public MeansImplicitUseAttribute(ImplicitUseKindFlags useKindFlags, ImplicitUseTargetFlags targetFlags) - { - UseKindFlags = useKindFlags; - TargetFlags = targetFlags; - } - - [UsedImplicitly] public ImplicitUseKindFlags UseKindFlags { get; private set; } - - [UsedImplicitly] public ImplicitUseTargetFlags TargetFlags { get; private set; } - } - - [Flags] - internal enum ImplicitUseKindFlags - { - Default = Access | Assign | InstantiatedWithFixedConstructorSignature, - /// Only entity marked with attribute considered used. - Access = 1, - /// Indicates implicit assignment to a member. - Assign = 2, - /// - /// Indicates implicit instantiation of a type with fixed constructor signature. - /// That means any unused constructor parameters won't be reported as such. - /// - InstantiatedWithFixedConstructorSignature = 4, - /// Indicates implicit instantiation of a type. - InstantiatedNoFixedConstructorSignature = 8 - } - - /// - /// Specify what is considered used implicitly when marked - /// with or . - /// - [Flags] - internal enum ImplicitUseTargetFlags - { - Default = Itself, - Itself = 1, - /// Members of entity marked with attribute are considered used. - Members = 2, - /// Entity marked with attribute and all its members considered used. - WithMembers = Itself | Members - } - - /// - /// This attribute is intended to mark publicly available API - /// which should not be removed and so is treated as used. - /// - [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)] - internal sealed class PublicAPIAttribute : Attribute - { - public PublicAPIAttribute() { } - - public PublicAPIAttribute(string comment) - { - Comment = comment; - } - - public string? Comment { get; } - } -} \ No newline at end of file diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index b772e71..dd287a4 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -11,7 +11,6 @@ https://github.com/MvRens/Tapeti Tapeti.Flow.png 2.0.0 - 9 enable @@ -19,11 +18,6 @@ 1701;1702 - - - IDE0066 - - diff --git a/Tapeti.Ninject/Tapeti.Ninject.csproj b/Tapeti.Ninject/Tapeti.Ninject.csproj index d31c98b..0c9fd16 100644 --- a/Tapeti.Ninject/Tapeti.Ninject.csproj +++ b/Tapeti.Ninject/Tapeti.Ninject.csproj @@ -11,7 +11,6 @@ https://github.com/MvRens/Tapeti Tapeti.SimpleInjector.png 2.0.0 - 9 enable diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj index ddef97e..9bd340c 100644 --- a/Tapeti.Serilog/Tapeti.Serilog.csproj +++ b/Tapeti.Serilog/Tapeti.Serilog.csproj @@ -11,7 +11,6 @@ https://github.com/MvRens/Tapeti Tapeti.Serilog.png 2.0.0 - 9 enable diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj index c0c5614..d258990 100644 --- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj @@ -11,7 +11,6 @@ https://github.com/MvRens/Tapeti Tapeti.SimpleInjector.png 2.0.0 - 9 enable diff --git a/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs b/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs index 1b9634c..d58a145 100644 --- a/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs +++ b/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs @@ -1,11 +1,9 @@ -using System; -using System.Threading.Tasks; -using Tapeti.Annotations; +using System.Threading.Tasks; using Tapeti.Config.Annotations; namespace Tapeti.Tests.Client.Controller { - [Request(Response = typeof(FilteredResponseMessage))] + [Annotations.Request(Response = typeof(FilteredResponseMessage))] public class FilteredRequestMessage { public int ExpectedHandler { get; set; } diff --git a/Tapeti.Tests/Client/ControllerTests.cs b/Tapeti.Tests/Client/ControllerTests.cs index ed08378..dacaba0 100644 --- a/Tapeti.Tests/Client/ControllerTests.cs +++ b/Tapeti.Tests/Client/ControllerTests.cs @@ -51,7 +51,7 @@ namespace Tapeti.Tests.Client .Build(); connection = CreateConnection(config); - await connection.Subscribe(); + await connection!.Subscribe(); await connection.GetPublisher().PublishRequest(new FilteredRequestMessage diff --git a/Tapeti.Tests/Client/RabbitMQFixture.cs b/Tapeti.Tests/Client/RabbitMQFixture.cs index 4a46634..3e8a581 100644 --- a/Tapeti.Tests/Client/RabbitMQFixture.cs +++ b/Tapeti.Tests/Client/RabbitMQFixture.cs @@ -63,7 +63,7 @@ namespace Tapeti.Tests.Client testcontainers = testcontainersBuilder.Build(); - await testcontainers.StartAsync(); + await testcontainers!.StartAsync(); RabbitMQPort = testcontainers.GetMappedPublicPort(DefaultRabbitMQPort); RabbitMQManagementPort = testcontainers.GetMappedPublicPort(DefaultRabbitMQManagementPort); diff --git a/Tapeti.Tests/Client/TapetiClientTests.cs b/Tapeti.Tests/Client/TapetiClientTests.cs index 0ffd716..d3a7112 100644 --- a/Tapeti.Tests/Client/TapetiClientTests.cs +++ b/Tapeti.Tests/Client/TapetiClientTests.cs @@ -1,5 +1,4 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; diff --git a/Tapeti.Tests/Config/QueueArgumentsTest.cs b/Tapeti.Tests/Config/QueueArgumentsTest.cs index 98a74c2..d8d0502 100644 --- a/Tapeti.Tests/Config/QueueArgumentsTest.cs +++ b/Tapeti.Tests/Config/QueueArgumentsTest.cs @@ -7,7 +7,7 @@ using System.Threading.Tasks; using FluentAssertions; using FluentAssertions.Execution; using Moq; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; using Tapeti.Config; using Tapeti.Connection; using Xunit; @@ -84,10 +84,10 @@ namespace Tapeti.Tests.Config { var config = GetControllerConfig(); - var binding1 = config.Bindings.Single(b => b is IControllerMethodBinding cmb && cmb.Method.Name == "HandleMessage1"); + var binding1 = config.Bindings.Single(b => b is IControllerMethodBinding { Method.Name: "HandleMessage1" }); binding1.Should().NotBeNull(); - var binding2 = config.Bindings.Single(b => b is IControllerMethodBinding cmb && cmb.Method.Name == "HandleMessage2"); + var binding2 = config.Bindings.Single(b => b is IControllerMethodBinding { Method.Name: "HandleMessage2" }); binding2.Should().NotBeNull(); diff --git a/Tapeti.Tests/Config/SimpleControllerTest.cs b/Tapeti.Tests/Config/SimpleControllerTest.cs index 8dca8ad..f909b79 100644 --- a/Tapeti.Tests/Config/SimpleControllerTest.cs +++ b/Tapeti.Tests/Config/SimpleControllerTest.cs @@ -1,6 +1,6 @@ using System.Linq; using FluentAssertions; -using Tapeti.Annotations; +using Tapeti.Config.Annotations; using Tapeti.Config; using Xunit; diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index 5a38b4e..1ce8661 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -11,7 +11,7 @@ - + diff --git a/Tapeti.Transient/Tapeti.Transient.csproj b/Tapeti.Transient/Tapeti.Transient.csproj index 27af569..bb56b5b 100644 --- a/Tapeti.Transient/Tapeti.Transient.csproj +++ b/Tapeti.Transient/Tapeti.Transient.csproj @@ -11,7 +11,6 @@ https://github.com/MvRens/Tapeti Tapeti.Flow.png 2.0.0 - 9 enable diff --git a/Tapeti/Config/Annotations/BackwardsCompatibilityHelpers.cs b/Tapeti/Config/Annotations/BackwardsCompatibilityHelpers.cs new file mode 100644 index 0000000..595e02e --- /dev/null +++ b/Tapeti/Config/Annotations/BackwardsCompatibilityHelpers.cs @@ -0,0 +1,77 @@ +using System; +using System.Reflection; + +#pragma warning disable CS0618 // Obsolete +#pragma warning disable CS1591 // Missing documentation + + +namespace Tapeti.Config.Annotations +{ + /// + /// Provides extensions methods to support moved (marked obsolete) attributes from Tapeti.Annotations. + /// + public static class BackwardsCompatibilityHelpers + { + public static DurableQueueAttribute? GetDurableQueueAttribute(this MemberInfo member) + { + return member.GetCustomAttribute() ?? Upgrade(member.GetCustomAttribute()); + } + + public static DynamicQueueAttribute? GetDynamicQueueAttribute(this MemberInfo member) + { + return member.GetCustomAttribute() ?? Upgrade(member.GetCustomAttribute()); + } + + public static QueueArgumentsAttribute? GetQueueArgumentsAttribute(this MemberInfo member) + { + return member.GetCustomAttribute() ?? Upgrade(member.GetCustomAttribute()); + } + + public static ResponseHandlerAttribute? GetResponseHandlerAttribute(this MemberInfo member) + { + return member.GetCustomAttribute() ?? Upgrade(member.GetCustomAttribute()); + } + + + public static bool HasMessageControllerAttribute(this MemberInfo member) + { + return member.IsDefined(typeof(MessageControllerAttribute)) || member.IsDefined(typeof(Tapeti.Annotations.MessageControllerAttribute)); + } + + + private static DurableQueueAttribute? Upgrade(Tapeti.Annotations.DurableQueueAttribute? attribute) + { + return attribute == null ? null : new DurableQueueAttribute(attribute.Name); + } + + private static DynamicQueueAttribute? Upgrade(Tapeti.Annotations.DynamicQueueAttribute? attribute) + { + return attribute == null ? null : new DynamicQueueAttribute(attribute.Prefix); + } + + private static QueueArgumentsAttribute? Upgrade(Tapeti.Annotations.QueueArgumentsAttribute? attribute) + { + return attribute == null + ? null + : new QueueArgumentsAttribute(attribute.CustomArguments) + { + MaxLength = attribute.MaxLength, + MaxLengthBytes = attribute.MaxLengthBytes, + Overflow = attribute.Overflow switch + { + Tapeti.Annotations.RabbitMQOverflow.NotSpecified => RabbitMQOverflow.NotSpecified, + Tapeti.Annotations.RabbitMQOverflow.DropHead => RabbitMQOverflow.DropHead, + Tapeti.Annotations.RabbitMQOverflow.RejectPublish => RabbitMQOverflow.RejectPublish, + Tapeti.Annotations.RabbitMQOverflow.RejectPublishDeadletter => RabbitMQOverflow.RejectPublishDeadletter, + _ => throw new ArgumentOutOfRangeException(nameof(attribute.Overflow)) + }, + MessageTTL = attribute.MessageTTL + }; + } + + private static ResponseHandlerAttribute? Upgrade(Tapeti.Annotations.ResponseHandlerAttribute? attribute) + { + return attribute == null ? null : new ResponseHandlerAttribute(); + } + } +} diff --git a/Tapeti/Config/Annotations/DurableQueueAttribute.cs b/Tapeti/Config/Annotations/DurableQueueAttribute.cs new file mode 100644 index 0000000..7a884a1 --- /dev/null +++ b/Tapeti/Config/Annotations/DurableQueueAttribute.cs @@ -0,0 +1,28 @@ +using System; +using JetBrains.Annotations; + +namespace Tapeti.Config.Annotations +{ + /// + /// Binds to an existing durable queue to receive messages. Can be used + /// on an entire MessageController class or on individual methods. + /// + [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)] + [PublicAPI] + public class DurableQueueAttribute : Attribute + { + /// + /// Specifies the name of the durable queue (must already be declared). + /// + public string Name { get; set; } + + + /// + /// The name of the durable queue + public DurableQueueAttribute(string name) + { + Name = name; + } + } +} diff --git a/Tapeti/Config/Annotations/DynamicQueueAttribute.cs b/Tapeti/Config/Annotations/DynamicQueueAttribute.cs new file mode 100644 index 0000000..ba14342 --- /dev/null +++ b/Tapeti/Config/Annotations/DynamicQueueAttribute.cs @@ -0,0 +1,32 @@ +using System; +using JetBrains.Annotations; + +namespace Tapeti.Config.Annotations +{ + /// + /// Creates a non-durable auto-delete queue to receive messages. Can be used + /// on an entire MessageController class or on individual methods. + /// + [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + [MeansImplicitUse] + [PublicAPI] + public class DynamicQueueAttribute : Attribute + { + /// + /// An optional prefix. If specified, Tapeti will compose the queue name using the + /// prefix and a unique ID. If not specified, an empty queue name will be passed + /// to RabbitMQ thus letting it create a unique queue name. + /// + public string? Prefix { get; set; } + + + /// + /// An optional prefix. If specified, Tapeti will compose the queue name using the + /// prefix and a unique ID. If not specified, an empty queue name will be passed + /// to RabbitMQ thus letting it create a unique queue name. + public DynamicQueueAttribute(string? prefix = null) + { + Prefix = prefix; + } + } +} diff --git a/Tapeti/Config/Annotations/MessageControllerAttribute.cs b/Tapeti/Config/Annotations/MessageControllerAttribute.cs new file mode 100644 index 0000000..8f9b720 --- /dev/null +++ b/Tapeti/Config/Annotations/MessageControllerAttribute.cs @@ -0,0 +1,16 @@ +using System; +using JetBrains.Annotations; + +namespace Tapeti.Config.Annotations +{ + /// + /// Attaching this attribute to a class includes it in the auto-discovery of message controllers + /// when using the RegisterAllControllers method. It is not required when manually registering a controller. + /// + [AttributeUsage(AttributeTargets.Class)] + [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)] + [PublicAPI] + public class MessageControllerAttribute : Attribute + { + } +} diff --git a/Tapeti/Config/Annotations/QueueArgumentsAttribute.cs b/Tapeti/Config/Annotations/QueueArgumentsAttribute.cs new file mode 100644 index 0000000..7ace9e9 --- /dev/null +++ b/Tapeti/Config/Annotations/QueueArgumentsAttribute.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections.Generic; +using JetBrains.Annotations; + +namespace Tapeti.Config.Annotations +{ + /// + /// Determines the overflow behaviour of a queue that has reached it's maximum as set by or . + /// + [PublicAPI] + public enum RabbitMQOverflow + { + /// + /// The argument will not be explicitly specified and use the RabbitMQ default, which is equivalent to . + /// + NotSpecified, + + /// + /// Discards or dead-letters the oldest published message. This is the default value. + /// + DropHead, + + /// + /// Discards the most recently published messages and nacks the message. + /// + RejectPublish, + + /// + /// Dead-letters the most recently published messages and nacks the message. + /// + RejectPublishDeadletter + } + + + /// + /// Specifies the optional queue arguments (also known as 'x-arguments') used when declaring + /// the queue. + /// + /// + /// The QueueArguments attribute can be applied to any controller or method and will affect the queue + /// that is used in that context. For durable queues, at most one QueueArguments attribute can be specified + /// per unique queue name. + ///

+ /// Also note that queue arguments can not be changed after a queue is declared. You should declare a new queue + /// and make the old one Obsolete to have Tapeti automatically removed it once it is empty. Tapeti will use the + /// existing queue, but log a warning at startup time. + ///
+ [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] + [PublicAPI] + public class QueueArgumentsAttribute : Attribute + { + /// + /// The maximum number of messages in the queue. Set to determine the overflow behaviour. + /// + /// + /// Corresponds to 'max-length'. See + /// + public int MaxLength { get; set; } + + /// + /// The maximum number of bytes in the queue (counting only the message bodies). Set to determine the overflow behaviour. + /// + /// + /// Corresponds to 'x-max-length-bytes'. See + /// + public int MaxLengthBytes { get; set; } + + + /// + /// + /// Corresponds to 'x-overflow'. Default is to drop or deadletter the oldest messages in the queue. See + /// + public RabbitMQOverflow Overflow { get; set; } = RabbitMQOverflow.NotSpecified; + + + /// + /// Specifies the maximum Time-to-Live for messages in the queue, in milliseconds. + /// + /// + /// Corresponds to 'x-message-ttl'. See + /// + public int MessageTTL { get; set; } + + + /// + /// Any arguments to add which are not supported by properties of QueueArguments. + /// + public IReadOnlyDictionary CustomArguments { get; private set; } + + + /// + /// Any arguments to add which are not supported by properties of QueueArguments. Must be a multiple of 2, specify each key followed by the value. + public QueueArgumentsAttribute(params object[] customArguments) + { + if (customArguments.Length % 2 != 0) + throw new ArgumentException("customArguments must be a multiple of 2 to specify each key-value combination", nameof(customArguments)); + + var customArgumentsPairs = new Dictionary(); + + for (var i = 0; i < customArguments.Length; i += 2) + customArgumentsPairs[(string)customArguments[i]] = customArguments[i + 1]; + + CustomArguments = customArgumentsPairs; + } + } +} diff --git a/Tapeti/Config/Annotations/ResponseHandlerAttribute.cs b/Tapeti/Config/Annotations/ResponseHandlerAttribute.cs new file mode 100644 index 0000000..66bdf10 --- /dev/null +++ b/Tapeti/Config/Annotations/ResponseHandlerAttribute.cs @@ -0,0 +1,15 @@ +using JetBrains.Annotations; +using System; + +namespace Tapeti.Config.Annotations +{ + /// + /// Indicates that the method only handles response messages which are sent directly + /// to the queue. No binding will be created. + /// + [AttributeUsage(AttributeTargets.Method)] + [PublicAPI] + public class ResponseHandlerAttribute : Attribute + { + } +} diff --git a/Tapeti/Config/ITapetiConfigBuilder.cs b/Tapeti/Config/ITapetiConfigBuilder.cs index 4fc1713..5d517c9 100644 --- a/Tapeti/Config/ITapetiConfigBuilder.cs +++ b/Tapeti/Config/ITapetiConfigBuilder.cs @@ -1,4 +1,5 @@ using System; +using JetBrains.Annotations; // ReSharper disable UnusedMember.Global @@ -8,6 +9,7 @@ namespace Tapeti.Config /// Configures Tapeti. Every method other than Build returns the builder instance /// for method chaining. ///
+ [PublicAPI] public interface ITapetiConfigBuilder { /// diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs index 865d5a9..8f7d06b 100644 --- a/Tapeti/Connection/TapetiPublisher.cs +++ b/Tapeti/Connection/TapetiPublisher.cs @@ -4,6 +4,7 @@ using System.Reflection; using System.Threading.Tasks; using Tapeti.Annotations; using Tapeti.Config; +using Tapeti.Config.Annotations; using Tapeti.Default; using Tapeti.Helpers; @@ -72,7 +73,7 @@ namespace Tapeti.Connection if (!binding.Accept(requestAttribute.Response)) throw new ArgumentException($"responseHandler must accept message of type {requestAttribute.Response}", nameof(responseHandler)); - var responseHandleAttribute = binding.Method.GetCustomAttribute(); + var responseHandleAttribute = binding.Method.GetResponseHandlerAttribute(); if (responseHandleAttribute == null) throw new ArgumentException("responseHandler must be marked with the ResponseHandler attribute", nameof(responseHandler)); @@ -83,7 +84,7 @@ namespace Tapeti.Connection var properties = new MessageProperties { CorrelationId = ResponseFilterMiddleware.CorrelationIdRequestPrefix + MethodSerializer.Serialize(responseHandler), - ReplyTo = binding.QueueName, + ReplyTo = binding.QueueName }; await Publish(message, properties, IsMandatory(message)); diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index 10d8ae4..93ca8c1 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -120,7 +120,7 @@ namespace Tapeti.Default QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); else { - await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); + await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments); QueueName = bindingInfo.QueueInfo.Name; } @@ -131,7 +131,7 @@ namespace Tapeti.Default QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); else { - await target.BindDurableDirect(bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); + await target.BindDurableDirect(bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments); QueueName = bindingInfo.QueueInfo.Name; } @@ -143,7 +143,7 @@ namespace Tapeti.Default } else if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Durable) { - await target.BindDurableObsolete(bindingInfo.QueueInfo.Name); + await target.BindDurableObsolete(bindingInfo.QueueInfo.Name!); QueueName = bindingInfo.QueueInfo.Name; } } @@ -317,7 +317,7 @@ namespace Tapeti.Default /// /// The name of the durable queue, or optional prefix of the dynamic queue. /// - public string Name { get; set; } + public string? Name { get; set; } /// /// Optional arguments (x-arguments) passed when declaring the queue. @@ -330,7 +330,7 @@ namespace Tapeti.Default public bool IsValid => QueueType == QueueType.Dynamic || !string.IsNullOrEmpty(Name); - public QueueInfo(QueueType queueType, string name) + public QueueInfo(QueueType queueType, string? name) { QueueType = queueType; Name = name; diff --git a/Tapeti/Default/DependencyResolverBinding.cs b/Tapeti/Default/DependencyResolverBinding.cs index 3f2c81e..bd99f19 100644 --- a/Tapeti/Default/DependencyResolverBinding.cs +++ b/Tapeti/Default/DependencyResolverBinding.cs @@ -15,7 +15,7 @@ namespace Tapeti.Default { next(); - foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType.IsClass)) + foreach (var parameter in context.Parameters.Where(p => p is { HasBinding: false, Info.ParameterType.IsClass: true })) parameter.SetBinding(messageContext => messageContext.Config.DependencyResolver.Resolve(parameter.Info.ParameterType)); } } diff --git a/Tapeti/MessageController.cs b/Tapeti/MessageController.cs index 138ca97..2cac796 100644 --- a/Tapeti/MessageController.cs +++ b/Tapeti/MessageController.cs @@ -1,4 +1,4 @@ -using Tapeti.Annotations; +using Tapeti.Config.Annotations; // ReSharper disable UnusedMember.Global diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index e47700c..7d3a967 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -11,7 +11,6 @@ Unlicense https://github.com/MvRens/Tapeti Tapeti.png - 9 enable @@ -19,21 +18,12 @@ 1701;1702 - - - - - + - - - - - True diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs index e81032b..7371eca 100644 --- a/Tapeti/TapetiConfigControllers.cs +++ b/Tapeti/TapetiConfigControllers.cs @@ -2,9 +2,8 @@ using System; using System.Linq; using System.Reflection; using System.Text; -using Tapeti.Annotations; -using Tapeti.Config; using Tapeti.Config.Annotations; +using Tapeti.Config; using Tapeti.Connection; using Tapeti.Default; @@ -56,7 +55,7 @@ namespace Tapeti var context = new ControllerBindingContext(controller, method, method.GetParameters(), method.ReturnParameter); - if (method.GetCustomAttribute() != null) + if (method.GetResponseHandlerAttribute() != null) { context.SetBindingTargetMode(BindingTargetMode.Direct); context.Use(new ResponseFilterMiddleware()); @@ -122,7 +121,7 @@ namespace Tapeti /// The assembly to scan for controllers. public static ITapetiConfigBuilder RegisterAllControllers(this ITapetiConfigBuilder builder, Assembly assembly) { - foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(MessageControllerAttribute)))) + foreach (var type in assembly.GetTypes().Where(t => t.HasMessageControllerAttribute())) RegisterController(builder, type); return builder; @@ -145,9 +144,9 @@ namespace Tapeti private static ControllerMethodBinding.QueueInfo? GetQueueInfo(MemberInfo member, ControllerMethodBinding.QueueInfo? fallbackQueueInfo) { - var dynamicQueueAttribute = member.GetCustomAttribute(); - var durableQueueAttribute = member.GetCustomAttribute(); - var queueArgumentsAttribute = member.GetCustomAttribute(); + var dynamicQueueAttribute = member.GetDynamicQueueAttribute(); + var durableQueueAttribute = member.GetDurableQueueAttribute(); + var queueArgumentsAttribute = member.GetQueueArgumentsAttribute(); if (dynamicQueueAttribute != null && durableQueueAttribute != null) throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on controller {member.DeclaringType?.Name} method {member.Name}"); @@ -157,7 +156,7 @@ namespace Tapeti QueueType queueType; - string name; + string? name; if (dynamicQueueAttribute != null) @@ -195,10 +194,8 @@ namespace Tapeti string stringValue => Encoding.UTF8.GetBytes(stringValue), _ => p.Value } - )) - { + )); - }; if (queueArgumentsAttribute.MaxLength > 0) arguments.Add(@"x-max-length", queueArgumentsAttribute.MaxLength); From 9c40c2928cbdae0c728d72527a490f20d13d932e Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 20 Apr 2023 10:40:13 +0200 Subject: [PATCH 4/4] Fixed timing issue when a parallel flow response arrives before the flow is stored --- Tapeti.Flow/Default/FlowBindingMiddleware.cs | 4 +- Tapeti.Flow/Default/FlowProvider.cs | 63 ++++++++++++++------ 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index c501feb..6f6a887 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -154,7 +154,9 @@ namespace Tapeti.Flow.Default var flowHandler = context.Config.DependencyResolver.Resolve(); return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext => { - await flowContext.Store(context.Binding.QueueType == QueueType.Durable); + // IFlowParallelRequest.AddRequest will store the flow immediately + if (!flowPayload.FlowContext.IsStoredOrDeleted()) + await flowContext.Store(context.Binding.QueueType == QueueType.Durable); })); } diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index d162836..62e3176 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -55,7 +55,7 @@ namespace Tapeti.Flow.Default /// public IFlowParallelRequestBuilder YieldWithParallelRequest() { - return new ParallelRequestBuilder(config, this); + return new ParallelRequestBuilder(config, this, publisher); } /// @@ -71,8 +71,8 @@ namespace Tapeti.Flow.Default } - internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, - string? convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true) + internal async Task PrepareRequest(FlowContext context, ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName = null, bool convergeMethodTaskSync = false) { if (!context.HasFlowStateAndLock) { @@ -96,8 +96,15 @@ namespace Tapeti.Flow.Default ReplyTo = responseHandlerInfo.ReplyToQueue }; - if (store) - await context.Store(responseHandlerInfo.IsDurableQueue); + return properties; + } + + + internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, + string convergeMethodName = null, bool convergeMethodTaskSync = false) + { + var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync); + await context.Store(responseHandlerInfo.IsDurableQueue); await publisher.Publish(message, properties, true); } @@ -200,7 +207,7 @@ namespace Tapeti.Flow.Default flowContext.SetFlowState(flowState, flowStateLock); } - + /// public async ValueTask Execute(IFlowHandlerContext context, IYieldPoint yieldPoint) { @@ -222,7 +229,7 @@ namespace Tapeti.Flow.Default } else flowContext = flowPayload.FlowContext; - + try { await executableYieldPoint.Execute(flowContext); @@ -327,13 +334,15 @@ namespace Tapeti.Flow.Default private readonly ITapetiConfig config; private readonly FlowProvider flowProvider; + private readonly IInternalPublisher publisher; private readonly List requests = new(); - public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider) + public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider, IInternalPublisher publisher) { this.config = config; this.flowProvider = flowProvider; + this.publisher = publisher; } @@ -407,18 +416,21 @@ namespace Tapeti.Flow.Default if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller?.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); + var preparedRequests = new List(); + foreach (var requestInfo in requests) { - await flowProvider.SendRequest( + var properties = await flowProvider.PrepareRequest( context, - requestInfo.Message, requestInfo.ResponseHandlerInfo, convergeMethod.Method.Name, - convergeMethodSync, - false); + convergeMethodSync); + + preparedRequests.Add(new PreparedRequest(requestInfo.Message, properties)); } await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); + await Task.WhenAll(preparedRequests.Select(r => publisher.Publish(r.Message, r.Properties, true))); }); } } @@ -465,12 +477,11 @@ namespace Tapeti.Flow.Default throw new InvalidOperationException("No ContinuationMetadata in FlowContext"); return flowProvider.SendRequest( - flowContext, - message, - responseHandlerInfo, - flowContext.ContinuationMetadata.ConvergeMethodName, - flowContext.ContinuationMetadata.ConvergeMethodSync, - false); + flowContext, + message, + responseHandlerInfo, + flowContext.ContinuationMetadata.ConvergeMethodName, + flowContext.ContinuationMetadata.ConvergeMethodSync); } } @@ -489,5 +500,19 @@ namespace Tapeti.Flow.Default IsDurableQueue = isDurableQueue; } } + + + internal class PreparedRequest + { + public object Message { get; } + public MessageProperties Properties { get; } + + + public PreparedRequest(object message, MessageProperties properties) + { + Message = message; + Properties = properties; + } + } } }