Fixed #39: Stateless Request-Response does not filter target controller method

Added NoBinding attribute
This commit is contained in:
Mark van Renswoude 2023-04-13 08:39:43 +02:00
parent 3f266355f0
commit 6b38d59468
12 changed files with 229 additions and 2 deletions

View File

@ -2,6 +2,7 @@
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Flow.FlowHelpers;
using Tapeti.Helpers;
namespace Tapeti.Flow.Default
{

View File

@ -9,6 +9,7 @@ using Tapeti.Config;
using Tapeti.Default;
using Tapeti.Flow.Annotations;
using Tapeti.Flow.FlowHelpers;
using Tapeti.Helpers;
namespace Tapeti.Flow.Default
{

View File

@ -6,6 +6,7 @@ using System.Linq;
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Flow.FlowHelpers;
using Tapeti.Helpers;
namespace Tapeti.Flow.Default
{

View File

@ -0,0 +1,67 @@
using System;
using System.Threading.Tasks;
using Tapeti.Annotations;
using Tapeti.Config.Annotations;
namespace Tapeti.Tests.Client.Controller
{
[Request(Response = typeof(FilteredResponseMessage))]
public class FilteredRequestMessage
{
public int ExpectedHandler { get; set; }
}
public class FilteredResponseMessage
{
public int ExpectedHandler { get; set; }
}
#pragma warning disable CA1822 // Mark members as static
[MessageController]
[DurableQueue("request.response.filter")]
public class RequestResponseFilterController
{
public static TaskCompletionSource<int> ValidResponse { get; private set; } = new();
public static TaskCompletionSource<int> InvalidResponse { get; private set; } = new();
public FilteredResponseMessage EchoRequest(FilteredRequestMessage message)
{
return new FilteredResponseMessage
{
ExpectedHandler = message.ExpectedHandler
};
}
[NoBinding]
public static void ResetCompletionSource()
{
ValidResponse = new TaskCompletionSource<int>();
InvalidResponse = new TaskCompletionSource<int>();
}
[ResponseHandler]
public void Handler1(FilteredResponseMessage message)
{
if (message.ExpectedHandler != 1)
InvalidResponse.TrySetResult(1);
else
ValidResponse.SetResult(1);
}
[ResponseHandler]
public void Handler2(FilteredResponseMessage message)
{
if (message.ExpectedHandler != 2)
InvalidResponse.TrySetResult(2);
else
ValidResponse.SetResult(2);
}
}
#pragma warning restore CA1822
}

View File

@ -0,0 +1,87 @@
using System.Threading.Tasks;
using FluentAssertions;
using SimpleInjector;
using Tapeti.Config;
using Tapeti.SimpleInjector;
using Tapeti.Tests.Client.Controller;
using Tapeti.Tests.Mock;
using Xunit;
using Xunit.Abstractions;
namespace Tapeti.Tests.Client
{
[Collection(RabbitMQCollection.Name)]
[Trait("Category", "Requires Docker")]
public class ControllerTests : IAsyncLifetime
{
private readonly RabbitMQFixture fixture;
private readonly Container container = new();
private TapetiConnection? connection;
public ControllerTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper)
{
this.fixture = fixture;
container.RegisterInstance<ILogger>(new MockLogger(testOutputHelper));
}
public Task InitializeAsync()
{
return Task.CompletedTask;
}
public async Task DisposeAsync()
{
if (connection != null)
await connection.DisposeAsync();
}
[Fact]
public async Task RequestResponseFilter()
{
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
.EnableDeclareDurableQueues()
.RegisterController<RequestResponseFilterController>()
.Build();
connection = CreateConnection(config);
await connection.Subscribe();
await connection.GetPublisher().PublishRequest<RequestResponseFilterController, FilteredRequestMessage, FilteredResponseMessage>(new FilteredRequestMessage
{
ExpectedHandler = 2
}, c => c.Handler2);
var handler = await RequestResponseFilterController.ValidResponse.Task;
handler.Should().Be(2);
var invalidHandler = await Task.WhenAny(RequestResponseFilterController.InvalidResponse.Task, Task.Delay(1000));
invalidHandler.Should().NotBe(RequestResponseFilterController.InvalidResponse.Task);
}
private TapetiConnection CreateConnection(ITapetiConfig config)
{
return new TapetiConnection(config)
{
Params = new TapetiConnectionParams
{
HostName = "127.0.0.1",
Port = fixture.RabbitMQPort,
ManagementPort = fixture.RabbitMQManagementPort,
Username = RabbitMQFixture.RabbitMQUsername,
Password = RabbitMQFixture.RabbitMQPassword,
PrefetchCount = 1
}
};
}
}
}

View File

@ -14,6 +14,7 @@
<PackageReference Include="JetBrains.Annotations" Version="2022.3.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
<PackageReference Include="Moq" Version="4.18.2" />
<PackageReference Include="SimpleInjector" Version="5.4.1" />
<PackageReference Include="Testcontainers" Version="2.2.0" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
@ -23,6 +24,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>

View File

@ -0,0 +1,14 @@
using System;
using JetBrains.Annotations;
namespace Tapeti.Config.Annotations
{
/// <summary>
/// Indicates that the method is not a message handler and should not be bound by Tapeti.
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
[PublicAPI]
public class NoBindingAttribute : Attribute
{
}
}

View File

@ -82,7 +82,8 @@ namespace Tapeti.Connection
var properties = new MessageProperties
{
ReplyTo = binding.QueueName
CorrelationId = ResponseFilterMiddleware.CorrelationIdRequestPrefix + MethodSerializer.Serialize(responseHandler),
ReplyTo = binding.QueueName,
};
await Publish(message, properties, IsMandatory(message));

View File

@ -0,0 +1,37 @@
using System;
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Helpers;
namespace Tapeti.Default
{
/// <inheritdoc cref="IControllerMessageMiddleware"/> />
/// <summary>
/// Handles methods marked with the ResponseHandler attribute.
/// </summary>
internal class ResponseFilterMiddleware : IControllerFilterMiddleware//, IControllerMessageMiddleware
{
internal const string CorrelationIdRequestPrefix = "request|";
public async ValueTask Filter(IMessageContext context, Func<ValueTask> next)
{
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return;
// If no CorrelationId is present, this could be a request-response in flight from a previous version of
// Tapeti so we should not filter the response handler.
if (!string.IsNullOrEmpty(context.Properties.CorrelationId))
{
if (!context.Properties.CorrelationId.StartsWith(CorrelationIdRequestPrefix))
return;
var methodName = context.Properties.CorrelationId[CorrelationIdRequestPrefix.Length..];
if (methodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
return;
}
await next();
}
}
}

View File

@ -1,7 +1,7 @@
using System.Reflection;
using System.Text.RegularExpressions;
namespace Tapeti.Flow.FlowHelpers
namespace Tapeti.Helpers
{
/// <summary>
/// Converts a method into a unique string representation.

View File

@ -24,6 +24,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2022.3.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.*" />
<PackageReference Include="RabbitMQ.Client" Version="[6.5]" />
</ItemGroup>

View File

@ -4,6 +4,7 @@ using System.Reflection;
using System.Text;
using Tapeti.Annotations;
using Tapeti.Config;
using Tapeti.Config.Annotations;
using Tapeti.Connection;
using Tapeti.Default;
@ -48,12 +49,18 @@ namespace Tapeti
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false)
.Select(m => (MethodInfo)m))
{
if (method.GetCustomAttributes<NoBindingAttribute>().Any())
continue;
var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute<ObsoleteAttribute>() != null;
var context = new ControllerBindingContext(controller, method, method.GetParameters(), method.ReturnParameter);
if (method.GetCustomAttribute<ResponseHandlerAttribute>() != null)
{
context.SetBindingTargetMode(BindingTargetMode.Direct);
context.Use(new ResponseFilterMiddleware());
}
var allowBinding = false;
@ -100,6 +107,14 @@ namespace Tapeti
}
/// <inheritdoc cref="RegisterController"/>
public static ITapetiConfigBuilder RegisterController<TController>(this ITapetiConfigBuilder builder) where TController : class
{
return RegisterController(builder, typeof(TController));
}
/// <summary>
/// Registers all controllers in the specified assembly which are marked with the MessageController attribute.
/// </summary>