diff --git a/Tapeti/Config/ITapetiConfigBuilder.cs b/Tapeti/Config/ITapetiConfigBuilder.cs index c063d79..781648e 100644 --- a/Tapeti/Config/ITapetiConfigBuilder.cs +++ b/Tapeti/Config/ITapetiConfigBuilder.cs @@ -82,8 +82,7 @@ namespace Tapeti.Config /// Note that access to the RabbitMQ Management plugin's REST API is required for this /// feature to work, since AMQP does not provide a way to query existing bindings. /// - ITapetiConfigBuilder EnableDeclareDurableQueues(); - + ITapetiConfigBuilder EnableDeclareDurableQueues(); /// /// Configures the automatic creation of durable queues and updating of their bindings. diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 44b1275..9f3487c 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -111,6 +111,8 @@ namespace Tapeti.Connection WithRetryableChannel(channel => { + DeclareExchange(channel, exchange); + // The delivery tag is lost after a reconnect, register under the new tag if (config.Features.PublisherConfirms) { @@ -230,18 +232,20 @@ namespace Tapeti.Connection { await taskQueue.Value.Add(async () => { - var existingBindings = await GetQueueBindings(queueName); + var existingBindings = (await GetQueueBindings(queueName)).ToList(); + var currentBindings = bindings.ToList(); WithRetryableChannel(channel => { channel.QueueDeclare(queueName, true, false, false); - var currentBindings = bindings.ToList(); - - foreach (var binding in currentBindings) + foreach (var binding in currentBindings.Except(existingBindings)) + { + DeclareExchange(channel, binding.Exchange); channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey); + } - foreach (var deletedBinding in existingBindings.Where(binding => !currentBindings.Any(b => b.Exchange == binding.Exchange && b.RoutingKey == binding.RoutingKey))) + foreach (var deletedBinding in existingBindings.Except(currentBindings)) channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey); }); }); @@ -288,6 +292,7 @@ namespace Tapeti.Connection { WithRetryableChannel(channel => { + DeclareExchange(channel, binding.Exchange); channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey); }); }); @@ -372,7 +377,7 @@ namespace Tapeti.Connection { var virtualHostPath = Uri.EscapeDataString(connectionParams.VirtualHost); var queuePath = Uri.EscapeDataString(queueName); - var requestUri = new Uri($"{connectionParams.HostName}:{connectionParams.Port}/api/queues/{virtualHostPath}/{queuePath}/bindings"); + var requestUri = new Uri($"http://{connectionParams.HostName}:{connectionParams.ManagementPort}/api/queues/{virtualHostPath}/{queuePath}/bindings"); using (var request = new HttpRequestMessage(HttpMethod.Get, requestUri)) { @@ -414,6 +419,21 @@ namespace Tapeti.Connection } + private readonly HashSet declaredExchanges = new HashSet(); + + private void DeclareExchange(IModel channel, string exchange) + { + if (string.IsNullOrEmpty(exchange)) + return; + + if (declaredExchanges.Contains(exchange)) + return; + + channel.ExchangeDeclare(exchange, "topic", true); + declaredExchanges.Add(exchange); + } + + /// /// Only call this from a task in the taskQueue to ensure IModel is only used /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. diff --git a/Tapeti/TapetiAppSettingsConnectionParams.cs b/Tapeti/TapetiAppSettingsConnectionParams.cs index 9140f4a..d5e6dfb 100644 --- a/Tapeti/TapetiAppSettingsConnectionParams.cs +++ b/Tapeti/TapetiAppSettingsConnectionParams.cs @@ -18,6 +18,7 @@ namespace Tapeti /// rabbitmq:username /// rabbitmq:password /// rabbitmq:prefetchcount + /// rabbitmq:managementport /// public class TapetiAppSettingsConnectionParams : TapetiConnectionParams { @@ -28,6 +29,7 @@ namespace Tapeti private const string KeyUsername = "username"; private const string KeyPassword = "password"; private const string KeyPrefetchCount = "prefetchcount"; + private const string KeyManagementPort = "managementport"; /// @@ -49,6 +51,7 @@ namespace Tapeti GetAppSetting(KeyUsername, value => Username = value); GetAppSetting(KeyPassword, value => Password = value); GetAppSetting(KeyPrefetchCount, value => PrefetchCount = ushort.Parse(value)); + GetAppSetting(KeyManagementPort, value => ManagementPort = int.Parse(value)); } } } diff --git a/Tapeti/TapetiConnectionParams.cs b/Tapeti/TapetiConnectionParams.cs index 9b0414c..cdc60aa 100644 --- a/Tapeti/TapetiConnectionParams.cs +++ b/Tapeti/TapetiConnectionParams.cs @@ -41,6 +41,11 @@ namespace Tapeti /// public ushort PrefetchCount { get; set; } = 50; + /// + /// The port the management plugin binds to. Only used when DeclareDurableQueues is enabled. Defaults to 15672. + /// + public int ManagementPort { get; set; } = 15672; + /// public TapetiConnectionParams() diff --git a/docs/dataannotations.rst b/docs/dataannotations.rst index 2b07c50..5c3ed18 100644 --- a/docs/dataannotations.rst +++ b/docs/dataannotations.rst @@ -1,11 +1,51 @@ Validating messages =================== +To validate the contents of messages, Tapeti provides the Tapeti.DataAnnotations package. Once installed and enabled, it verifies each message that is published or consumed using the standard System.ComponentModel.DataAnnotations. -.. error:: You've stumbled upon a piece of unfinished documentation. - Behind you is all prior knowledge. In front of you is nothing but emptyness. What do you do? +To enable the validation extension, include it in your TapetiConfig: - 1. Attempt to explore further - 2. Complain to the author and demand your money back - 3. Abandon all hope +:: - > | + var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) + .WithDataAnnotations() + .RegisterAllControllers() + .Build(); + + +Annotations +----------- +All `ValidationAttribute `_ derived annotations are supported. For example, use the Required attribute to indicate a field is required: + +:: + + public class RabbitEscapedMessage + { + [Required] + public string Name { get; set; } + + public string LastKnownHutch { get; set; } + } + +Or the Range attribute to indicate valid ranges: + +:: + + public class RabbitBirthdayMessage + { + [Range(1, 15, ErrorMessage = "Sorry, we have no birthday cards for ages below {1} or above {2}")] + public int Age { get; set; } + } + +Required GUIDs +-------------- +Using the standard validation attributes it is tricky to get a Guid to be required, as it is a struct which defaults to Guid.Empty. Using Nullable may work, but then your business logic will look like it is supposed to be optional. + +For this reason, the Tapeti.DataAnnotations.Extensions package can be installed from NuGet into your messaging package. It contains the RequiredGuid attribute which specifically checks for Guid.Empty. + +:: + + public class RabbitBornMessage + { + [RequiredGuid] + public Guid RabbitId { get; set; } + } \ No newline at end of file