diff --git a/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj b/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj
index 8bd2804..9d3fabd 100644
--- a/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj
+++ b/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj
@@ -7,10 +7,10 @@
-
-
-
-
+
+
+
+
diff --git a/Examples/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj b/Examples/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj
index 8f84f19..7fafac3 100644
--- a/Examples/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj
+++ b/Examples/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj
@@ -7,7 +7,7 @@
-
+
diff --git a/Examples/03-FlowRequestResponse/03-FlowRequestResponse.csproj b/Examples/03-FlowRequestResponse/03-FlowRequestResponse.csproj
index 2e082d3..3dbc0e7 100644
--- a/Examples/03-FlowRequestResponse/03-FlowRequestResponse.csproj
+++ b/Examples/03-FlowRequestResponse/03-FlowRequestResponse.csproj
@@ -7,7 +7,7 @@
-
+
diff --git a/Examples/04-Transient/04-Transient.csproj b/Examples/04-Transient/04-Transient.csproj
index 6625370..2a9a0d4 100644
--- a/Examples/04-Transient/04-Transient.csproj
+++ b/Examples/04-Transient/04-Transient.csproj
@@ -7,7 +7,7 @@
-
+
diff --git a/Examples/05-SpeedTest/05-SpeedTest.csproj b/Examples/05-SpeedTest/05-SpeedTest.csproj
index 7716c24..12506dc 100644
--- a/Examples/05-SpeedTest/05-SpeedTest.csproj
+++ b/Examples/05-SpeedTest/05-SpeedTest.csproj
@@ -7,7 +7,7 @@
-
+
diff --git a/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj b/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj
index ceb3f6d..654499b 100644
--- a/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj
+++ b/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj
@@ -7,7 +7,7 @@
-
+
diff --git a/Examples/07-ParallelizationTest/07-ParallelizationTest.csproj b/Examples/07-ParallelizationTest/07-ParallelizationTest.csproj
index 46119f9..47c3eb4 100644
--- a/Examples/07-ParallelizationTest/07-ParallelizationTest.csproj
+++ b/Examples/07-ParallelizationTest/07-ParallelizationTest.csproj
@@ -7,7 +7,7 @@
-
+
diff --git a/Examples/08-MessageHandlerLogging/08-MessageHandlerLogging.csproj b/Examples/08-MessageHandlerLogging/08-MessageHandlerLogging.csproj
index 0aaa313..d541909 100644
--- a/Examples/08-MessageHandlerLogging/08-MessageHandlerLogging.csproj
+++ b/Examples/08-MessageHandlerLogging/08-MessageHandlerLogging.csproj
@@ -7,8 +7,8 @@
-
-
+
+
diff --git a/Tapeti.Autofac/Tapeti.Autofac.csproj b/Tapeti.Autofac/Tapeti.Autofac.csproj
index 15e731d..e0263c8 100644
--- a/Tapeti.Autofac/Tapeti.Autofac.csproj
+++ b/Tapeti.Autofac/Tapeti.Autofac.csproj
@@ -15,7 +15,7 @@
-
+
@@ -30,6 +30,6 @@
-
+
diff --git a/Tapeti.Benchmarks/Tapeti.Benchmarks.csproj b/Tapeti.Benchmarks/Tapeti.Benchmarks.csproj
index e3ab297..9aa7e19 100644
--- a/Tapeti.Benchmarks/Tapeti.Benchmarks.csproj
+++ b/Tapeti.Benchmarks/Tapeti.Benchmarks.csproj
@@ -8,7 +8,7 @@
-
+
diff --git a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj
index edde611..9df8c90 100644
--- a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj
+++ b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj
@@ -15,7 +15,7 @@
-
+
@@ -30,6 +30,6 @@
-
+
diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj
index 18d230f..38a9699 100644
--- a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj
+++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj
@@ -34,6 +34,6 @@
-
+
diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
index bf6d7a9..8385eb5 100644
--- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
+++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
@@ -32,7 +32,7 @@
-
+
@@ -48,6 +48,6 @@
-
+
diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj
index fff1fbd..d991a6c 100644
--- a/Tapeti.Flow/Tapeti.Flow.csproj
+++ b/Tapeti.Flow/Tapeti.Flow.csproj
@@ -35,7 +35,7 @@
-
+
diff --git a/Tapeti.Ninject/Tapeti.Ninject.csproj b/Tapeti.Ninject/Tapeti.Ninject.csproj
index c4e8a9d..2010385 100644
--- a/Tapeti.Ninject/Tapeti.Ninject.csproj
+++ b/Tapeti.Ninject/Tapeti.Ninject.csproj
@@ -15,7 +15,7 @@
-
+
@@ -30,6 +30,6 @@
-
+
diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj
index c68d5cf..830f755 100644
--- a/Tapeti.Serilog/Tapeti.Serilog.csproj
+++ b/Tapeti.Serilog/Tapeti.Serilog.csproj
@@ -19,7 +19,7 @@
-
+
@@ -34,6 +34,6 @@
-
+
diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
index 734951f..d1e9d4f 100644
--- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
+++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
@@ -19,7 +19,7 @@
-
+
@@ -34,6 +34,6 @@
-
+
diff --git a/Tapeti.Tests/Client/RabbitMQFixture.cs b/Tapeti.Tests/Client/RabbitMQFixture.cs
new file mode 100644
index 0000000..fb73113
--- /dev/null
+++ b/Tapeti.Tests/Client/RabbitMQFixture.cs
@@ -0,0 +1,79 @@
+using System;
+using System.Threading.Tasks;
+using Docker.DotNet;
+using Docker.DotNet.Models;
+using DotNet.Testcontainers.Builders;
+using DotNet.Testcontainers.Configurations;
+using DotNet.Testcontainers.Containers;
+using Xunit;
+
+namespace Tapeti.Tests.Client
+{
+ [CollectionDefinition(Name)]
+ public sealed class RabbitMQCollection : ICollectionFixture
+ {
+ public const string Name = "RabbitMQ";
+ }
+
+
+ public sealed class RabbitMQFixture : IAsyncLifetime
+ {
+ public static string RabbitMQUsername => "tapetitests";
+ public static string RabbitMQPassword => "topsecret1234";
+
+ public ushort RabbitMQPort { get; private set; }
+ public ushort RabbitMQManagementPort { get; private set; }
+
+
+ private TestcontainerMessageBroker testcontainers;
+
+ private const int DefaultRabbitMQPort = 5672;
+ private const int DefaultRabbitMQManagementPort = 15672;
+
+
+
+ private const string ImageName = "rabbitmq";
+ private const string ImageTag = "3.11.3-alpine";
+
+
+ public async Task InitializeAsync()
+ {
+ // Testcontainers does not seem to pull the image the first time.
+ // I didn't get it to work, even using WithImagePullPolicy from the latest beta.
+ // Note: running it the first time can take a while.
+ var client = new DockerClientConfiguration().CreateClient();
+ await client.Images.CreateImageAsync(
+ new ImagesCreateParameters
+ {
+ FromImage = ImageName,
+ Tag = ImageTag
+ },
+ null,
+ new Progress());
+
+ // If you get a "Sequence contains no elements" error here: make sure Docker Desktop is running
+ var testcontainersBuilder = new TestcontainersBuilder()
+ .WithMessageBroker(new RabbitMqTestcontainerConfiguration($"{ImageName}:{ImageTag}")
+ {
+ Username = RabbitMQUsername,
+ Password = RabbitMQPassword
+ })
+ .WithExposedPort(DefaultRabbitMQManagementPort)
+ .WithPortBinding(0, DefaultRabbitMQManagementPort);
+
+ testcontainers = testcontainersBuilder.Build();
+
+ await testcontainers.StartAsync();
+
+ RabbitMQPort = testcontainers.GetMappedPublicPort(DefaultRabbitMQPort);
+ RabbitMQManagementPort = testcontainers.GetMappedPublicPort(DefaultRabbitMQManagementPort);
+ }
+
+
+ public async Task DisposeAsync()
+ {
+ if (testcontainers != null)
+ await testcontainers.DisposeAsync();
+ }
+ }
+}
diff --git a/Tapeti.Tests/Client/TapetiClientTests.cs b/Tapeti.Tests/Client/TapetiClientTests.cs
new file mode 100644
index 0000000..eda0622
--- /dev/null
+++ b/Tapeti.Tests/Client/TapetiClientTests.cs
@@ -0,0 +1,75 @@
+using System.Threading;
+using System.Threading.Tasks;
+using FluentAssertions;
+using Tapeti.Connection;
+using Tapeti.Tests.Mock;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Tapeti.Tests.Client
+{
+ [Collection(RabbitMQCollection.Name)]
+ public class TapetiClientTests
+ {
+ private readonly RabbitMQFixture fixture;
+ private readonly MockDependencyResolver dependencyResolver = new();
+
+ public TapetiClientTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper)
+ {
+ this.fixture = fixture;
+
+ dependencyResolver.Set(new MockLogger(testOutputHelper));
+ }
+
+
+ [Fact]
+ public void Fixture()
+ {
+ fixture.RabbitMQPort.Should().BeGreaterThan(0);
+ fixture.RabbitMQManagementPort.Should().BeGreaterThan(0);
+ }
+
+
+ [Fact]
+ public async Task DynamicQueueDeclareNoPrefix()
+ {
+ var client = CreateCilent();
+
+ var queueName = await client.DynamicQueueDeclare(null, null, CancellationToken.None);
+ queueName.Should().NotBeNullOrEmpty();
+
+ await client.Close();
+ }
+
+
+ [Fact]
+ public async Task DynamicQueueDeclarePrefix()
+ {
+ var client = CreateCilent();
+
+ var queueName = await client.DynamicQueueDeclare("dynamicprefix", null, CancellationToken.None);
+ queueName.Should().StartWith("dynamicprefix");
+
+ await client.Close();
+ }
+
+
+ // TODO test the other methods
+
+
+ private TapetiClient CreateCilent()
+ {
+ return new TapetiClient(
+ new TapetiConfig.Config(dependencyResolver),
+ new TapetiConnectionParams
+ {
+ HostName = "127.0.0.1",
+ Port = fixture.RabbitMQPort,
+ ManagementPort = fixture.RabbitMQManagementPort,
+ Username = RabbitMQFixture.RabbitMQUsername,
+ Password = RabbitMQFixture.RabbitMQPassword,
+ PrefetchCount = 50
+ });
+ }
+ }
+}
diff --git a/Tapeti.Tests/Config/BaseControllerTest.cs b/Tapeti.Tests/Config/BaseControllerTest.cs
new file mode 100644
index 0000000..aba1651
--- /dev/null
+++ b/Tapeti.Tests/Config/BaseControllerTest.cs
@@ -0,0 +1,29 @@
+using JetBrains.Annotations;
+using Tapeti.Config;
+using Tapeti.Tests.Mock;
+
+namespace Tapeti.Tests.Config
+{
+ public class BaseControllerTest
+ {
+ protected readonly MockDependencyResolver DependencyResolver = new();
+
+
+ protected ITapetiConfig GetControllerConfig<[MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)] T>() where T : class
+ {
+ var configBuilder = new TapetiConfig(DependencyResolver);
+
+ configBuilder.EnableDeclareDurableQueues();
+ configBuilder.RegisterController(typeof(T));
+ var config = configBuilder.Build();
+
+ return config;
+ }
+
+
+ protected ITapetiConfigBindings GetControllerBindings<[MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)] T>() where T : class
+ {
+ return GetControllerConfig().Bindings;
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tapeti.Tests/Config/QueueArgumentsTest.cs b/Tapeti.Tests/Config/QueueArgumentsTest.cs
new file mode 100644
index 0000000..a43e5bb
--- /dev/null
+++ b/Tapeti.Tests/Config/QueueArgumentsTest.cs
@@ -0,0 +1,199 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using FluentAssertions;
+using FluentAssertions.Execution;
+using Moq;
+using Tapeti.Annotations;
+using Tapeti.Config;
+using Tapeti.Connection;
+using Xunit;
+
+namespace Tapeti.Tests.Config
+{
+ public class QueueArgumentsTest : BaseControllerTest
+ {
+ private static readonly MockRepository MoqRepository = new(MockBehavior.Strict);
+
+ private readonly Mock client;
+ private readonly Dictionary> declaredQueues = new();
+
+
+ public QueueArgumentsTest()
+ {
+ client = MoqRepository.Create();
+ var routingKeyStrategy = MoqRepository.Create();
+ var exchangeStrategy = MoqRepository.Create();
+
+ DependencyResolver.Set(routingKeyStrategy.Object);
+ DependencyResolver.Set(exchangeStrategy.Object);
+
+
+ routingKeyStrategy
+ .Setup(s => s.GetRoutingKey(typeof(TestMessage1)))
+ .Returns("testmessage1");
+
+ routingKeyStrategy
+ .Setup(s => s.GetRoutingKey(typeof(TestMessage2)))
+ .Returns("testmessage2");
+
+ exchangeStrategy
+ .Setup(s => s.GetExchange(It.IsAny()))
+ .Returns("exchange");
+
+ var queue = 0;
+ client
+ .Setup(c => c.DynamicQueueDeclare(null, It.IsAny>(), It.IsAny()))
+ .Callback((string _, IReadOnlyDictionary arguments, CancellationToken _) =>
+ {
+ queue++;
+ declaredQueues.Add($"queue-{queue}", arguments);
+ })
+ .ReturnsAsync(() => $"queue-{queue}");
+
+ client
+ .Setup(c => c.DurableQueueDeclare(It.IsAny(), It.IsAny>(), It.IsAny>(), It.IsAny()))
+ .Callback((string queueName, IEnumerable _, IReadOnlyDictionary arguments, CancellationToken _) =>
+ {
+ declaredQueues.Add(queueName, arguments);
+ })
+ .Returns(Task.CompletedTask);
+
+
+ client
+ .Setup(c => c.DynamicQueueBind(It.IsAny(), It.IsAny(), It.IsAny()))
+ .Returns(Task.CompletedTask);
+ }
+
+
+ [Fact]
+ public async Task SingleQueueArguments()
+ {
+ var config = GetControllerConfig();
+
+ var binding1 = config.Bindings.Single(b => b is IControllerMethodBinding cmb && cmb.Method.Name == "HandleMessage1");
+ binding1.Should().NotBeNull();
+
+ var binding2 = config.Bindings.Single(b => b is IControllerMethodBinding cmb && cmb.Method.Name == "HandleMessage2");
+ binding2.Should().NotBeNull();
+
+
+
+ var subscriber = new TapetiSubscriber(() => client.Object, config);
+ await subscriber.ApplyBindings();
+
+
+ declaredQueues.Should().HaveCount(1);
+ var arguments = declaredQueues["queue-1"];
+
+ arguments.Should().ContainKey("x-custom").WhoseValue.Should().Be("custom value");
+ arguments.Should().ContainKey("x-another").WhoseValue.Should().Be("another value");
+ arguments.Should().ContainKey("x-max-length").WhoseValue.Should().Be("100");
+ arguments.Should().ContainKey("x-max-length-bytes").WhoseValue.Should().Be("100000");
+ arguments.Should().ContainKey("x-message-ttl").WhoseValue.Should().Be("4269");
+ arguments.Should().ContainKey("x-overflow").WhoseValue.Should().Be("reject-publish");
+ }
+
+
+ [Fact]
+ public async Task ConflictingDynamicQueueArguments()
+ {
+ var config = GetControllerConfig();
+
+ var subscriber = new TapetiSubscriber(() => client.Object, config);
+ await subscriber.ApplyBindings();
+
+ declaredQueues.Should().HaveCount(2);
+
+ var arguments1 = declaredQueues["queue-1"];
+ arguments1.Should().ContainKey("x-max-length").WhoseValue.Should().Be("100");
+
+ var arguments2 = declaredQueues["queue-2"];
+ arguments2.Should().ContainKey("x-max-length-bytes").WhoseValue.Should().Be("100000");
+ }
+
+
+ [Fact]
+ public async Task ConflictingDurableQueueArguments()
+ {
+ var config = GetControllerConfig();
+
+ var testApplyBindings = () =>
+ {
+ var subscriber = new TapetiSubscriber(() => client.Object, config);
+ return subscriber.ApplyBindings();
+ };
+
+ using (new AssertionScope())
+ {
+ await testApplyBindings.Should().ThrowAsync();
+ declaredQueues.Should().HaveCount(0);
+ }
+ }
+
+
+ // ReSharper disable all
+ #pragma warning disable
+
+ private class TestMessage1
+ {
+ }
+
+
+ private class TestMessage2
+ {
+ }
+
+
+ [DynamicQueue]
+ [QueueArguments("x-custom", "custom value", "x-another", "another value", MaxLength = 100, MaxLengthBytes = 100000, MessageTTL = 4269, Overflow = RabbitMQOverflow.RejectPublish)]
+ private class TestController
+ {
+ public void HandleMessage1(TestMessage1 message)
+ {
+ }
+
+
+ public void HandleMessage2(TestMessage2 message)
+ {
+ }
+ }
+
+
+ [DynamicQueue]
+ [QueueArguments(MaxLength = 100)]
+ private class ConflictingArgumentsTestController
+ {
+ public void HandleMessage1(TestMessage1 message)
+ {
+ }
+
+
+ [QueueArguments(MaxLengthBytes = 100000)]
+ public void HandleMessage2(TestMessage1 message)
+ {
+ }
+ }
+
+
+ [DurableQueue("durable")]
+ [QueueArguments(MaxLength = 100)]
+ private class ConflictingArgumentsDurableQueueTestController
+ {
+ public void HandleMessage1(TestMessage1 message)
+ {
+ }
+
+
+ [QueueArguments(MaxLengthBytes = 100000)]
+ public void HandleMessage2(TestMessage1 message)
+ {
+ }
+ }
+
+ #pragma warning restore
+ // ReSharper restore all
+ }
+}
\ No newline at end of file
diff --git a/Tapeti.Tests/Config/SimpleControllerTest.cs b/Tapeti.Tests/Config/SimpleControllerTest.cs
new file mode 100644
index 0000000..fbeb027
--- /dev/null
+++ b/Tapeti.Tests/Config/SimpleControllerTest.cs
@@ -0,0 +1,43 @@
+using System.Linq;
+using FluentAssertions;
+using Tapeti.Annotations;
+using Tapeti.Config;
+using Xunit;
+
+namespace Tapeti.Tests.Config
+{
+ public class SimpleControllerTest : BaseControllerTest
+ {
+ [Fact]
+ public void RegisterController()
+ {
+ var bindings = GetControllerBindings();
+ bindings.Should().HaveCount(1);
+
+ var handleSimpleMessageBinding = bindings.Single(b => b is IControllerMethodBinding cmb &&
+ cmb.Controller == typeof(TestController) &&
+ cmb.Method.Name == "HandleSimpleMessage");
+ handleSimpleMessageBinding.QueueType.Should().Be(QueueType.Dynamic);
+ }
+
+
+ // ReSharper disable all
+ #pragma warning disable
+
+ private class TestMessage
+ {
+ }
+
+
+ [DynamicQueue]
+ private class TestController
+ {
+ public void HandleSimpleMessage(TestMessage message)
+ {
+ }
+ }
+
+ #pragma warning restore
+ // ReSharper restore all
+ }
+}
\ No newline at end of file
diff --git a/Tapeti.Tests/Mock/MockDependencyResolver.cs b/Tapeti.Tests/Mock/MockDependencyResolver.cs
new file mode 100644
index 0000000..51d8698
--- /dev/null
+++ b/Tapeti.Tests/Mock/MockDependencyResolver.cs
@@ -0,0 +1,28 @@
+using System;
+using System.Collections.Generic;
+
+namespace Tapeti.Tests.Mock
+{
+ public class MockDependencyResolver : IDependencyResolver
+ {
+ private readonly Dictionary container = new();
+
+
+ public void Set(TInterface instance)
+ {
+ container.Add(typeof(TInterface), instance);
+ }
+
+
+ public T Resolve() where T : class
+ {
+ return (T)Resolve(typeof(T));
+ }
+
+
+ public object Resolve(Type type)
+ {
+ return container[type];
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tapeti.Tests/Mock/MockLogger.cs b/Tapeti.Tests/Mock/MockLogger.cs
new file mode 100644
index 0000000..a2d2387
--- /dev/null
+++ b/Tapeti.Tests/Mock/MockLogger.cs
@@ -0,0 +1,88 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Tapeti.Config;
+using Xunit.Abstractions;
+
+namespace Tapeti.Tests.Mock
+{
+ internal class MockLogger : IBindingLogger
+ {
+ private readonly ITestOutputHelper testOutputHelper;
+
+
+ public MockLogger(ITestOutputHelper testOutputHelper)
+ {
+ this.testOutputHelper = testOutputHelper;
+ }
+
+
+ public void Connect(IConnectContext connectContext)
+ {
+ testOutputHelper.WriteLine($"{(connectContext.IsReconnect ? "Reconnecting" : "Connecting")} to {connectContext.ConnectionParams.HostName}:{connectContext.ConnectionParams.Port}{connectContext.ConnectionParams.VirtualHost}");
+ }
+
+ public void ConnectFailed(IConnectFailedContext connectContext)
+ {
+ testOutputHelper.WriteLine($"Connection failed: {connectContext.Exception}");
+ }
+
+ public void ConnectSuccess(IConnectSuccessContext connectContext)
+ {
+ testOutputHelper.WriteLine($"{(connectContext.IsReconnect ? "Reconnected" : "Connected")} using local port {connectContext.LocalPort}");
+ }
+
+ public void Disconnect(IDisconnectContext disconnectContext)
+ {
+ testOutputHelper.WriteLine($"Connection closed: {(!string.IsNullOrEmpty(disconnectContext.ReplyText) ? disconnectContext.ReplyText : "")} (reply code: {disconnectContext.ReplyCode})");
+ }
+
+ public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
+ {
+ testOutputHelper.WriteLine(exception.Message);
+ }
+
+ public void QueueDeclare(string queueName, bool durable, bool passive)
+ {
+ testOutputHelper.WriteLine(passive
+ ? $"Verifying durable queue {queueName}"
+ : $"Declaring {(durable ? "durable" : "dynamic")} queue {queueName}");
+ }
+
+ public void QueueExistsWarning(string queueName, IReadOnlyDictionary existingArguments, IReadOnlyDictionary arguments)
+ {
+ var argumentsText = new StringBuilder();
+ foreach (var pair in arguments)
+ {
+ if (argumentsText.Length > 0)
+ argumentsText.Append(", ");
+
+ argumentsText.Append($"{pair.Key} = {pair.Value}");
+ }
+
+ testOutputHelper.WriteLine($"Durable queue {queueName} exists with incompatible x-arguments ({argumentsText}) and will not be redeclared, queue will be consumed as-is");
+ }
+
+ public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
+ {
+ testOutputHelper.WriteLine($"Binding {queueName} to exchange {exchange} with routing key {routingKey}");
+ }
+
+ public void QueueUnbind(string queueName, string exchange, string routingKey)
+ {
+ testOutputHelper.WriteLine($"Removing binding for {queueName} to exchange {exchange} with routing key {routingKey}");
+ }
+
+ public void ExchangeDeclare(string exchange)
+ {
+ testOutputHelper.WriteLine($"Declaring exchange {exchange}");
+ }
+
+ public void QueueObsolete(string queueName, bool deleted, uint messageCount)
+ {
+ testOutputHelper.WriteLine(deleted
+ ? $"Obsolete queue was deleted: {queueName}"
+ : $"Obsolete queue bindings removed: {queueName}, {messageCount} messages remaining");
+ }
+ }
+}
diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj
index 2588df8..0a2b345 100644
--- a/Tapeti.Tests/Tapeti.Tests.csproj
+++ b/Tapeti.Tests/Tapeti.Tests.csproj
@@ -15,10 +15,13 @@
-
-
-
-
+
+
+
+
+
+
+
all
runtime; build; native; contentfiles; analyzers
diff --git a/Tapeti.Transient/Tapeti.Transient.csproj b/Tapeti.Transient/Tapeti.Transient.csproj
index fd27cc4..7a26c25 100644
--- a/Tapeti.Transient/Tapeti.Transient.csproj
+++ b/Tapeti.Transient/Tapeti.Transient.csproj
@@ -30,6 +30,6 @@
-
+
diff --git a/Tapeti.UnityContainer/Tapeti.UnityContainer.csproj b/Tapeti.UnityContainer/Tapeti.UnityContainer.csproj
index bc84ae9..41ceb50 100644
--- a/Tapeti.UnityContainer/Tapeti.UnityContainer.csproj
+++ b/Tapeti.UnityContainer/Tapeti.UnityContainer.csproj
@@ -30,6 +30,6 @@
-
+
diff --git a/Tapeti/Config/ITapetiConfigBuilder.cs b/Tapeti/Config/ITapetiConfigBuilder.cs
index 87699df..4fc1713 100644
--- a/Tapeti/Config/ITapetiConfigBuilder.cs
+++ b/Tapeti/Config/ITapetiConfigBuilder.cs
@@ -118,7 +118,7 @@ namespace Tapeti.Config
/// before the configuration is built. Implementations of ITapetiConfigBuilder should also implement this interface.
/// Should not be used outside of Tapeti packages.
///
- public interface ITapetiConfigBuilderAccess
+ public interface ITapetiConfigBuilderAccess : ITapetiConfigBuilder
{
///
/// Provides access to the dependency resolver.
diff --git a/Tapeti/Properties/AssemblyInfo.cs b/Tapeti/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..eaca9e3
--- /dev/null
+++ b/Tapeti/Properties/AssemblyInfo.cs
@@ -0,0 +1,3 @@
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("Tapeti.Tests")]
\ No newline at end of file
diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj
index 4dae106..54098ee 100644
--- a/Tapeti/Tapeti.csproj
+++ b/Tapeti/Tapeti.csproj
@@ -24,8 +24,8 @@
-
-
+
+
@@ -41,7 +41,7 @@
-
+
diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs
index ec2234c..fed2296 100644
--- a/Tapeti/TapetiConfig.cs
+++ b/Tapeti/TapetiConfig.cs
@@ -15,7 +15,7 @@ namespace Tapeti
/// Default implementation of the Tapeti config builder.
/// Automatically registers the default middleware for injecting the message parameter and handling the return value.
///
- public class TapetiConfig : ITapetiConfigBuilder, ITapetiConfigBuilderAccess
+ public class TapetiConfig : ITapetiConfigBuilderAccess
{
private Config config;
private readonly List bindingMiddleware = new();
diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs
index 95e7643..b86e1f1 100644
--- a/Tapeti/TapetiConfigControllers.cs
+++ b/Tapeti/TapetiConfigControllers.cs
@@ -38,7 +38,7 @@ namespace Tapeti
if (!controller.IsClass)
throw new ArgumentException($"Controller {controller.Name} must be a class");
- var controllerQueueInfo = GetQueueInfo(controller);
+ var controllerQueueInfo = GetQueueInfo(controller, null);
(builderAccess.DependencyResolver as IDependencyContainer)?.RegisterController(controller);
var controllerIsObsolete = controller.GetCustomAttribute() != null;
@@ -79,7 +79,7 @@ namespace Tapeti
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}");
}
- var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo;
+ var methodQueueInfo = GetQueueInfo(method, controllerQueueInfo);
if (methodQueueInfo is not { IsValid: true })
throw new TopologyConfigurationException(
$"Method {method.Name} or controller {controller.Name} requires a queue attribute");
@@ -129,23 +129,45 @@ namespace Tapeti
}
- private static ControllerMethodBinding.QueueInfo GetQueueInfo(MemberInfo member)
+ private static ControllerMethodBinding.QueueInfo GetQueueInfo(MemberInfo member, ControllerMethodBinding.QueueInfo fallbackQueueInfo)
{
var dynamicQueueAttribute = member.GetCustomAttribute();
var durableQueueAttribute = member.GetCustomAttribute();
+ var queueArgumentsAttribute = member.GetCustomAttribute();
if (dynamicQueueAttribute != null && durableQueueAttribute != null)
throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on controller {member.DeclaringType?.Name} method {member.Name}");
+ if (dynamicQueueAttribute == null && durableQueueAttribute == null && (queueArgumentsAttribute == null || fallbackQueueInfo == null))
+ return fallbackQueueInfo;
- var queueArgumentsAttribute = member.GetCustomAttribute();
+
+ QueueType queueType;
+ string name;
+
if (dynamicQueueAttribute != null)
- return new ControllerMethodBinding.QueueInfo { QueueType = QueueType.Dynamic, Name = dynamicQueueAttribute.Prefix, QueueArguments = GetQueueArguments(queueArgumentsAttribute) };
+ {
+ queueType = QueueType.Dynamic;
+ name = dynamicQueueAttribute.Prefix;
+ }
+ else if (durableQueueAttribute != null)
+ {
+ queueType = QueueType.Durable;
+ name = durableQueueAttribute.Name;
+ }
+ else
+ {
+ queueType = fallbackQueueInfo.QueueType;
+ name = fallbackQueueInfo.Name;
+ }
- return durableQueueAttribute != null
- ? new ControllerMethodBinding.QueueInfo { QueueType = QueueType.Durable, Name = durableQueueAttribute.Name, QueueArguments = GetQueueArguments(queueArgumentsAttribute) }
- : null;
+ return new ControllerMethodBinding.QueueInfo
+ {
+ QueueType = queueType,
+ Name = name,
+ QueueArguments = GetQueueArguments(queueArgumentsAttribute) ?? fallbackQueueInfo?.QueueArguments
+ };
}