diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index 51098d3..2bc293e 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using Tapeti.Config; using Tapeti.Flow.FlowHelpers; +using Tapeti.Helpers; namespace Tapeti.Flow.Default { diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 91ffb39..f0041f6 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -9,6 +9,7 @@ using Tapeti.Config; using Tapeti.Default; using Tapeti.Flow.Annotations; using Tapeti.Flow.FlowHelpers; +using Tapeti.Helpers; namespace Tapeti.Flow.Default { diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index 3d26ca5..6a7306c 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -6,6 +6,7 @@ using System.Linq; using System.Threading.Tasks; using Tapeti.Config; using Tapeti.Flow.FlowHelpers; +using Tapeti.Helpers; namespace Tapeti.Flow.Default { diff --git a/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs b/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs new file mode 100644 index 0000000..1b9634c --- /dev/null +++ b/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs @@ -0,0 +1,67 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Annotations; +using Tapeti.Config.Annotations; + +namespace Tapeti.Tests.Client.Controller +{ + [Request(Response = typeof(FilteredResponseMessage))] + public class FilteredRequestMessage + { + public int ExpectedHandler { get; set; } + } + + public class FilteredResponseMessage + { + public int ExpectedHandler { get; set; } + } + + + #pragma warning disable CA1822 // Mark members as static + [MessageController] + [DurableQueue("request.response.filter")] + public class RequestResponseFilterController + { + public static TaskCompletionSource ValidResponse { get; private set; } = new(); + public static TaskCompletionSource InvalidResponse { get; private set; } = new(); + + + public FilteredResponseMessage EchoRequest(FilteredRequestMessage message) + { + return new FilteredResponseMessage + { + ExpectedHandler = message.ExpectedHandler + }; + } + + + [NoBinding] + public static void ResetCompletionSource() + { + ValidResponse = new TaskCompletionSource(); + InvalidResponse = new TaskCompletionSource(); + } + + + + [ResponseHandler] + public void Handler1(FilteredResponseMessage message) + { + if (message.ExpectedHandler != 1) + InvalidResponse.TrySetResult(1); + else + ValidResponse.SetResult(1); + } + + + [ResponseHandler] + public void Handler2(FilteredResponseMessage message) + { + if (message.ExpectedHandler != 2) + InvalidResponse.TrySetResult(2); + else + ValidResponse.SetResult(2); + } + } + #pragma warning restore CA1822 +} diff --git a/Tapeti.Tests/Client/ControllerTests.cs b/Tapeti.Tests/Client/ControllerTests.cs new file mode 100644 index 0000000..ed08378 --- /dev/null +++ b/Tapeti.Tests/Client/ControllerTests.cs @@ -0,0 +1,87 @@ +using System.Threading.Tasks; +using FluentAssertions; +using SimpleInjector; +using Tapeti.Config; +using Tapeti.SimpleInjector; +using Tapeti.Tests.Client.Controller; +using Tapeti.Tests.Mock; +using Xunit; +using Xunit.Abstractions; + +namespace Tapeti.Tests.Client +{ + [Collection(RabbitMQCollection.Name)] + [Trait("Category", "Requires Docker")] + public class ControllerTests : IAsyncLifetime + { + private readonly RabbitMQFixture fixture; + private readonly Container container = new(); + + private TapetiConnection? connection; + + + public ControllerTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper) + { + this.fixture = fixture; + + container.RegisterInstance(new MockLogger(testOutputHelper)); + } + + + public Task InitializeAsync() + { + return Task.CompletedTask; + } + + + public async Task DisposeAsync() + { + if (connection != null) + await connection.DisposeAsync(); + } + + + + [Fact] + public async Task RequestResponseFilter() + { + var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) + .EnableDeclareDurableQueues() + .RegisterController() + .Build(); + + connection = CreateConnection(config); + await connection.Subscribe(); + + + await connection.GetPublisher().PublishRequest(new FilteredRequestMessage + { + ExpectedHandler = 2 + }, c => c.Handler2); + + + var handler = await RequestResponseFilterController.ValidResponse.Task; + handler.Should().Be(2); + + var invalidHandler = await Task.WhenAny(RequestResponseFilterController.InvalidResponse.Task, Task.Delay(1000)); + invalidHandler.Should().NotBe(RequestResponseFilterController.InvalidResponse.Task); + } + + + private TapetiConnection CreateConnection(ITapetiConfig config) + { + return new TapetiConnection(config) + { + Params = new TapetiConnectionParams + { + HostName = "127.0.0.1", + Port = fixture.RabbitMQPort, + ManagementPort = fixture.RabbitMQManagementPort, + Username = RabbitMQFixture.RabbitMQUsername, + Password = RabbitMQFixture.RabbitMQPassword, + PrefetchCount = 1 + } + }; + } + } +} \ No newline at end of file diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index a5d8368..5a38b4e 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -14,6 +14,7 @@ + @@ -23,6 +24,7 @@ + diff --git a/Tapeti/Config/Annotations/NoBindingAttribute.cs b/Tapeti/Config/Annotations/NoBindingAttribute.cs new file mode 100644 index 0000000..4150fa0 --- /dev/null +++ b/Tapeti/Config/Annotations/NoBindingAttribute.cs @@ -0,0 +1,14 @@ +using System; +using JetBrains.Annotations; + +namespace Tapeti.Config.Annotations +{ + /// + /// Indicates that the method is not a message handler and should not be bound by Tapeti. + /// + [AttributeUsage(AttributeTargets.Method)] + [PublicAPI] + public class NoBindingAttribute : Attribute + { + } +} diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs index 8f18b10..865d5a9 100644 --- a/Tapeti/Connection/TapetiPublisher.cs +++ b/Tapeti/Connection/TapetiPublisher.cs @@ -82,7 +82,8 @@ namespace Tapeti.Connection var properties = new MessageProperties { - ReplyTo = binding.QueueName + CorrelationId = ResponseFilterMiddleware.CorrelationIdRequestPrefix + MethodSerializer.Serialize(responseHandler), + ReplyTo = binding.QueueName, }; await Publish(message, properties, IsMandatory(message)); diff --git a/Tapeti/Default/ResponseFilterMiddleware.cs b/Tapeti/Default/ResponseFilterMiddleware.cs new file mode 100644 index 0000000..8d990b6 --- /dev/null +++ b/Tapeti/Default/ResponseFilterMiddleware.cs @@ -0,0 +1,37 @@ +using System; +using System.Threading.Tasks; +using Tapeti.Config; +using Tapeti.Helpers; + +namespace Tapeti.Default +{ + /// /> + /// + /// Handles methods marked with the ResponseHandler attribute. + /// + internal class ResponseFilterMiddleware : IControllerFilterMiddleware//, IControllerMessageMiddleware + { + internal const string CorrelationIdRequestPrefix = "request|"; + + + public async ValueTask Filter(IMessageContext context, Func next) + { + if (!context.TryGet(out var controllerPayload)) + return; + + // If no CorrelationId is present, this could be a request-response in flight from a previous version of + // Tapeti so we should not filter the response handler. + if (!string.IsNullOrEmpty(context.Properties.CorrelationId)) + { + if (!context.Properties.CorrelationId.StartsWith(CorrelationIdRequestPrefix)) + return; + + var methodName = context.Properties.CorrelationId[CorrelationIdRequestPrefix.Length..]; + if (methodName != MethodSerializer.Serialize(controllerPayload.Binding.Method)) + return; + } + + await next(); + } + } +} diff --git a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs b/Tapeti/Helpers/MethodSerializer.cs similarity index 97% rename from Tapeti.Flow/FlowHelpers/MethodSerializer.cs rename to Tapeti/Helpers/MethodSerializer.cs index 38c6f93..e0ea4a4 100644 --- a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs +++ b/Tapeti/Helpers/MethodSerializer.cs @@ -1,7 +1,7 @@ using System.Reflection; using System.Text.RegularExpressions; -namespace Tapeti.Flow.FlowHelpers +namespace Tapeti.Helpers { /// /// Converts a method into a unique string representation. diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 67eaba9..e47700c 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -24,6 +24,7 @@ + diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs index 8459445..e81032b 100644 --- a/Tapeti/TapetiConfigControllers.cs +++ b/Tapeti/TapetiConfigControllers.cs @@ -4,6 +4,7 @@ using System.Reflection; using System.Text; using Tapeti.Annotations; using Tapeti.Config; +using Tapeti.Config.Annotations; using Tapeti.Connection; using Tapeti.Default; @@ -48,12 +49,18 @@ namespace Tapeti .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) .Select(m => (MethodInfo)m)) { + if (method.GetCustomAttributes().Any()) + continue; + var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute() != null; var context = new ControllerBindingContext(controller, method, method.GetParameters(), method.ReturnParameter); if (method.GetCustomAttribute() != null) + { context.SetBindingTargetMode(BindingTargetMode.Direct); + context.Use(new ResponseFilterMiddleware()); + } var allowBinding = false; @@ -100,6 +107,14 @@ namespace Tapeti } + /// + public static ITapetiConfigBuilder RegisterController(this ITapetiConfigBuilder builder) where TController : class + { + return RegisterController(builder, typeof(TController)); + } + + + /// /// Registers all controllers in the specified assembly which are marked with the MessageController attribute. ///