1
0
mirror of synced 2025-01-22 08:03:07 +01:00

[ci skip] Fixed wrong port being used for management API

Fixed exchange not being created for bindings and publishing
Added documentation for DataAnnotations package
This commit is contained in:
Mark van Renswoude 2019-08-15 15:55:45 +02:00
parent 46ea070865
commit c83ca889b7
5 changed files with 81 additions and 14 deletions

View File

@ -82,8 +82,7 @@ namespace Tapeti.Config
/// Note that access to the RabbitMQ Management plugin's REST API is required for this
/// feature to work, since AMQP does not provide a way to query existing bindings.
/// </remarks>
ITapetiConfigBuilder EnableDeclareDurableQueues();
ITapetiConfigBuilder EnableDeclareDurableQueues();
/// <summary>
/// Configures the automatic creation of durable queues and updating of their bindings.

View File

@ -111,6 +111,8 @@ namespace Tapeti.Connection
WithRetryableChannel(channel =>
{
DeclareExchange(channel, exchange);
// The delivery tag is lost after a reconnect, register under the new tag
if (config.Features.PublisherConfirms)
{
@ -230,18 +232,20 @@ namespace Tapeti.Connection
{
await taskQueue.Value.Add(async () =>
{
var existingBindings = await GetQueueBindings(queueName);
var existingBindings = (await GetQueueBindings(queueName)).ToList();
var currentBindings = bindings.ToList();
WithRetryableChannel(channel =>
{
channel.QueueDeclare(queueName, true, false, false);
var currentBindings = bindings.ToList();
foreach (var binding in currentBindings)
foreach (var binding in currentBindings.Except(existingBindings))
{
DeclareExchange(channel, binding.Exchange);
channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
}
foreach (var deletedBinding in existingBindings.Where(binding => !currentBindings.Any(b => b.Exchange == binding.Exchange && b.RoutingKey == binding.RoutingKey)))
foreach (var deletedBinding in existingBindings.Except(currentBindings))
channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey);
});
});
@ -288,6 +292,7 @@ namespace Tapeti.Connection
{
WithRetryableChannel(channel =>
{
DeclareExchange(channel, binding.Exchange);
channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
});
});
@ -372,7 +377,7 @@ namespace Tapeti.Connection
{
var virtualHostPath = Uri.EscapeDataString(connectionParams.VirtualHost);
var queuePath = Uri.EscapeDataString(queueName);
var requestUri = new Uri($"{connectionParams.HostName}:{connectionParams.Port}/api/queues/{virtualHostPath}/{queuePath}/bindings");
var requestUri = new Uri($"http://{connectionParams.HostName}:{connectionParams.ManagementPort}/api/queues/{virtualHostPath}/{queuePath}/bindings");
using (var request = new HttpRequestMessage(HttpMethod.Get, requestUri))
{
@ -414,6 +419,21 @@ namespace Tapeti.Connection
}
private readonly HashSet<string> declaredExchanges = new HashSet<string>();
private void DeclareExchange(IModel channel, string exchange)
{
if (string.IsNullOrEmpty(exchange))
return;
if (declaredExchanges.Contains(exchange))
return;
channel.ExchangeDeclare(exchange, "topic", true);
declaredExchanges.Add(exchange);
}
/// <remarks>
/// Only call this from a task in the taskQueue to ensure IModel is only used
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.

View File

@ -18,6 +18,7 @@ namespace Tapeti
/// <item><description>rabbitmq:username</description></item>
/// <item><description>rabbitmq:password</description></item>
/// <item><description>rabbitmq:prefetchcount</description></item>
/// <item><description>rabbitmq:managementport</description></item>
/// </list>
public class TapetiAppSettingsConnectionParams : TapetiConnectionParams
{
@ -28,6 +29,7 @@ namespace Tapeti
private const string KeyUsername = "username";
private const string KeyPassword = "password";
private const string KeyPrefetchCount = "prefetchcount";
private const string KeyManagementPort = "managementport";
/// <inheritdoc />
@ -49,6 +51,7 @@ namespace Tapeti
GetAppSetting(KeyUsername, value => Username = value);
GetAppSetting(KeyPassword, value => Password = value);
GetAppSetting(KeyPrefetchCount, value => PrefetchCount = ushort.Parse(value));
GetAppSetting(KeyManagementPort, value => ManagementPort = int.Parse(value));
}
}
}

View File

@ -41,6 +41,11 @@ namespace Tapeti
/// </summary>
public ushort PrefetchCount { get; set; } = 50;
/// <summary>
/// The port the management plugin binds to. Only used when DeclareDurableQueues is enabled. Defaults to 15672.
/// </summary>
public int ManagementPort { get; set; } = 15672;
/// <inheritdoc />
public TapetiConnectionParams()

View File

@ -1,11 +1,51 @@
Validating messages
===================
To validate the contents of messages, Tapeti provides the Tapeti.DataAnnotations package. Once installed and enabled, it verifies each message that is published or consumed using the standard System.ComponentModel.DataAnnotations.
.. error:: You've stumbled upon a piece of unfinished documentation.
Behind you is all prior knowledge. In front of you is nothing but emptyness. What do you do?
To enable the validation extension, include it in your TapetiConfig:
1. Attempt to explore further
2. Complain to the author and demand your money back
3. Abandon all hope
::
> |
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
.WithDataAnnotations()
.RegisterAllControllers()
.Build();
Annotations
-----------
All `ValidationAttribute <https://docs.microsoft.com/en-us/dotnet/api/system.componentmodel.dataannotations.validationattribute>`_ derived annotations are supported. For example, use the Required attribute to indicate a field is required:
::
public class RabbitEscapedMessage
{
[Required]
public string Name { get; set; }
public string LastKnownHutch { get; set; }
}
Or the Range attribute to indicate valid ranges:
::
public class RabbitBirthdayMessage
{
[Range(1, 15, ErrorMessage = "Sorry, we have no birthday cards for ages below {1} or above {2}")]
public int Age { get; set; }
}
Required GUIDs
--------------
Using the standard validation attributes it is tricky to get a Guid to be required, as it is a struct which defaults to Guid.Empty. Using Nullable<Guid> may work, but then your business logic will look like it is supposed to be optional.
For this reason, the Tapeti.DataAnnotations.Extensions package can be installed from NuGet into your messaging package. It contains the RequiredGuid attribute which specifically checks for Guid.Empty.
::
public class RabbitBornMessage
{
[RequiredGuid]
public Guid RabbitId { get; set; }
}