Merge tag '1.2' into develop
This commit is contained in:
commit
9bb192c067
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
namespace Tapeti.Annotations
|
namespace Tapeti.Annotations
|
||||||
{
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Binds to an existing durable queue to receive messages. Can be used
|
/// Binds to an existing durable queue to receive messages. Can be used
|
||||||
/// on an entire MessageController class or on individual methods.
|
/// on an entire MessageController class or on individual methods.
|
||||||
@ -17,6 +18,8 @@ namespace Tapeti.Annotations
|
|||||||
public string Name { get; set; }
|
public string Name { get; set; }
|
||||||
|
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
/// <param name="name">The name of the durable queue</param>
|
||||||
public DurableQueueAttribute(string name)
|
public DurableQueueAttribute(string name)
|
||||||
{
|
{
|
||||||
Name = name;
|
Name = name;
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
namespace Tapeti.Annotations
|
namespace Tapeti.Annotations
|
||||||
{
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Creates a non-durable auto-delete queue to receive messages. Can be used
|
/// Creates a non-durable auto-delete queue to receive messages. Can be used
|
||||||
/// on an entire MessageController class or on individual methods.
|
/// on an entire MessageController class or on individual methods.
|
||||||
@ -12,12 +13,10 @@ namespace Tapeti.Annotations
|
|||||||
public string Prefix { get; set; }
|
public string Prefix { get; set; }
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <inheritdoc />
|
||||||
/// If prefix is specified, Tapeti will compose the queue name using the
|
/// <param name="prefix">An optional prefix. If specified, Tapeti will compose the queue name using the
|
||||||
/// prefix and a unique ID. If not specified, an empty queue name will be passed
|
/// prefix and a unique ID. If not specified, an empty queue name will be passed
|
||||||
/// to RabbitMQ thus letting it create a unique queue name.
|
/// to RabbitMQ thus letting it create a unique queue name.</param>
|
||||||
/// </summary>
|
|
||||||
/// <param name="prefix"></param>
|
|
||||||
public DynamicQueueAttribute(string prefix = null)
|
public DynamicQueueAttribute(string prefix = null)
|
||||||
{
|
{
|
||||||
Prefix = prefix;
|
Prefix = prefix;
|
||||||
|
15
Tapeti.Annotations/MandatoryAttribute.cs
Normal file
15
Tapeti.Annotations/MandatoryAttribute.cs
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Tapeti.Annotations
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
/// <summary>
|
||||||
|
/// Can be attached to a message class to specify that delivery to a queue must be guaranteed.
|
||||||
|
/// Publish will fail if no queues bind to the routing key. Publisher confirms must be enabled
|
||||||
|
/// on the connection for this to work (enabled by default).
|
||||||
|
/// </summary>
|
||||||
|
[AttributeUsage(AttributeTargets.Class)]
|
||||||
|
public class MandatoryAttribute : Attribute
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
@ -2,6 +2,11 @@
|
|||||||
|
|
||||||
namespace Tapeti.Annotations
|
namespace Tapeti.Annotations
|
||||||
{
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
/// <summary>
|
||||||
|
/// Attaching this attribute to a class includes it in the auto-discovery of message controllers
|
||||||
|
/// when using the RegisterAllControllers method. It is not required when manually registering a controller.
|
||||||
|
/// </summary>
|
||||||
[AttributeUsage(AttributeTargets.Class)]
|
[AttributeUsage(AttributeTargets.Class)]
|
||||||
public class MessageControllerAttribute : Attribute
|
public class MessageControllerAttribute : Attribute
|
||||||
{
|
{
|
||||||
|
@ -2,6 +2,14 @@
|
|||||||
|
|
||||||
namespace Tapeti.Annotations
|
namespace Tapeti.Annotations
|
||||||
{
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
/// <summary>
|
||||||
|
/// Can be attached to a message class to specify that the receiver of the message must
|
||||||
|
/// provide a response message of the type specified in the Response attribute. This response
|
||||||
|
/// must be sent by either returning it from the message handler method or using
|
||||||
|
/// YieldWithResponse when using Tapeti Flow. These methods will respond directly
|
||||||
|
/// to the queue specified in the reply-to header automatically added by Tapeti.
|
||||||
|
/// </summary>
|
||||||
[AttributeUsage(AttributeTargets.Class)]
|
[AttributeUsage(AttributeTargets.Class)]
|
||||||
public class RequestAttribute : Attribute
|
public class RequestAttribute : Attribute
|
||||||
{
|
{
|
||||||
|
@ -80,7 +80,7 @@ namespace Tapeti.Flow.Default
|
|||||||
|
|
||||||
await context.Store();
|
await context.Store();
|
||||||
|
|
||||||
await publisher.Publish(message, properties);
|
await publisher.Publish(message, properties, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -105,9 +105,9 @@ namespace Tapeti.Flow.Default
|
|||||||
|
|
||||||
// TODO disallow if replyto is not specified?
|
// TODO disallow if replyto is not specified?
|
||||||
if (reply.ReplyTo != null)
|
if (reply.ReplyTo != null)
|
||||||
await publisher.PublishDirect(message, reply.ReplyTo, properties);
|
await publisher.PublishDirect(message, reply.ReplyTo, properties, true);
|
||||||
else
|
else
|
||||||
await publisher.Publish(message, properties);
|
await publisher.Publish(message, properties, true);
|
||||||
|
|
||||||
await context.Delete();
|
await context.Delete();
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
using System;
|
using System;
|
||||||
using ISeriLogger = Serilog.ILogger;
|
using ISeriLogger = Serilog.ILogger;
|
||||||
|
|
||||||
|
// ReSharper disable UnusedMember.Global
|
||||||
|
|
||||||
namespace Tapeti.Serilog
|
namespace Tapeti.Serilog
|
||||||
{
|
{
|
||||||
public class TapetiSeriLogger: ILogger
|
public class TapetiSeriLogger: ILogger
|
||||||
|
@ -7,6 +7,8 @@ namespace Tapeti.Config
|
|||||||
{
|
{
|
||||||
public interface IConfig
|
public interface IConfig
|
||||||
{
|
{
|
||||||
|
bool UsePublisherConfirms { get; }
|
||||||
|
|
||||||
IDependencyResolver DependencyResolver { get; }
|
IDependencyResolver DependencyResolver { get; }
|
||||||
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
|
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
|
||||||
IReadOnlyList<ICleanupMiddleware> CleanupMiddleware { get; }
|
IReadOnlyList<ICleanupMiddleware> CleanupMiddleware { get; }
|
||||||
@ -28,6 +30,7 @@ namespace Tapeti.Config
|
|||||||
|
|
||||||
public interface IDynamicQueue : IQueue
|
public interface IDynamicQueue : IQueue
|
||||||
{
|
{
|
||||||
|
string GetDeclareQueueName();
|
||||||
void SetName(string name);
|
void SetName(string name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,9 +1,16 @@
|
|||||||
namespace Tapeti.Connection
|
namespace Tapeti.Connection
|
||||||
{
|
{
|
||||||
|
public class DisconnectedEventArgs
|
||||||
|
{
|
||||||
|
public ushort ReplyCode;
|
||||||
|
public string ReplyText;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public interface IConnectionEventListener
|
public interface IConnectionEventListener
|
||||||
{
|
{
|
||||||
void Connected();
|
void Connected();
|
||||||
void Reconnected();
|
void Reconnected();
|
||||||
void Disconnected();
|
void Disconnected(DisconnectedEventArgs e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Reflection;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using RabbitMQ.Client;
|
using RabbitMQ.Client;
|
||||||
|
using Tapeti.Annotations;
|
||||||
|
|
||||||
namespace Tapeti.Connection
|
namespace Tapeti.Connection
|
||||||
{
|
{
|
||||||
@ -17,19 +19,25 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
public Task Publish(object message)
|
public Task Publish(object message)
|
||||||
{
|
{
|
||||||
return workerFactory().Publish(message, null);
|
return workerFactory().Publish(message, null, IsMandatory(message));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task Publish(object message, IBasicProperties properties)
|
public Task Publish(object message, IBasicProperties properties, bool mandatory)
|
||||||
{
|
{
|
||||||
return workerFactory().Publish(message, properties);
|
return workerFactory().Publish(message, properties, mandatory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task PublishDirect(object message, string queueName, IBasicProperties properties)
|
public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory)
|
||||||
{
|
{
|
||||||
return workerFactory().PublishDirect(message, queueName, properties);
|
return workerFactory().PublishDirect(message, queueName, properties, mandatory);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static bool IsMandatory(object message)
|
||||||
|
{
|
||||||
|
return message.GetType().GetCustomAttribute<MandatoryAttribute>() != null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,12 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public Task RebindQueues()
|
||||||
|
{
|
||||||
|
return BindQueues();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task Resume()
|
public Task Resume()
|
||||||
{
|
{
|
||||||
if (consuming)
|
if (consuming)
|
||||||
|
@ -1,10 +1,14 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using RabbitMQ.Client;
|
using RabbitMQ.Client;
|
||||||
|
using RabbitMQ.Client.Events;
|
||||||
using RabbitMQ.Client.Exceptions;
|
using RabbitMQ.Client.Exceptions;
|
||||||
using RabbitMQ.Client.Framing;
|
using RabbitMQ.Client.Framing;
|
||||||
using Tapeti.Config;
|
using Tapeti.Config;
|
||||||
|
using Tapeti.Exceptions;
|
||||||
using Tapeti.Helpers;
|
using Tapeti.Helpers;
|
||||||
using Tapeti.Tasks;
|
using Tapeti.Tasks;
|
||||||
|
|
||||||
@ -13,7 +17,8 @@ namespace Tapeti.Connection
|
|||||||
public class TapetiWorker
|
public class TapetiWorker
|
||||||
{
|
{
|
||||||
private const int ReconnectDelay = 5000;
|
private const int ReconnectDelay = 5000;
|
||||||
private const int PublishMaxConnectAttempts = 3;
|
private const int MandatoryReturnTimeout = 30000;
|
||||||
|
private const int MinimumConnectedReconnectDelay = 1000;
|
||||||
|
|
||||||
private readonly IConfig config;
|
private readonly IConfig config;
|
||||||
private readonly ILogger logger;
|
private readonly ILogger logger;
|
||||||
@ -24,8 +29,31 @@ namespace Tapeti.Connection
|
|||||||
private readonly IRoutingKeyStrategy routingKeyStrategy;
|
private readonly IRoutingKeyStrategy routingKeyStrategy;
|
||||||
private readonly IExchangeStrategy exchangeStrategy;
|
private readonly IExchangeStrategy exchangeStrategy;
|
||||||
private readonly Lazy<SingleThreadTaskQueue> taskQueue = new Lazy<SingleThreadTaskQueue>();
|
private readonly Lazy<SingleThreadTaskQueue> taskQueue = new Lazy<SingleThreadTaskQueue>();
|
||||||
|
|
||||||
|
|
||||||
|
// These fields are for use in the taskQueue only!
|
||||||
private RabbitMQ.Client.IConnection connection;
|
private RabbitMQ.Client.IConnection connection;
|
||||||
|
private bool isReconnect;
|
||||||
private IModel channelInstance;
|
private IModel channelInstance;
|
||||||
|
private ulong lastDeliveryTag;
|
||||||
|
private DateTime connectedDateTime;
|
||||||
|
private readonly Dictionary<ulong, ConfirmMessageInfo> confirmMessages = new Dictionary<ulong, ConfirmMessageInfo>();
|
||||||
|
private readonly Dictionary<string, ReturnInfo> returnRoutingKeys = new Dictionary<string, ReturnInfo>();
|
||||||
|
|
||||||
|
|
||||||
|
private class ConfirmMessageInfo
|
||||||
|
{
|
||||||
|
public string ReturnKey;
|
||||||
|
public TaskCompletionSource<int> CompletionSource;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private class ReturnInfo
|
||||||
|
{
|
||||||
|
public uint RefCount;
|
||||||
|
public int FirstReplyCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public TapetiWorker(IConfig config)
|
public TapetiWorker(IConfig config)
|
||||||
@ -39,15 +67,15 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task Publish(object message, IBasicProperties properties)
|
public Task Publish(object message, IBasicProperties properties, bool mandatory)
|
||||||
{
|
{
|
||||||
return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType()));
|
return Publish(message, properties, exchangeStrategy.GetExchange(message.GetType()), routingKeyStrategy.GetRoutingKey(message.GetType()), mandatory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task PublishDirect(object message, string queueName, IBasicProperties properties)
|
public Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory)
|
||||||
{
|
{
|
||||||
return Publish(message, properties, "", queueName);
|
return Publish(message, properties, "", queueName, mandatory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -56,69 +84,78 @@ namespace Tapeti.Connection
|
|||||||
if (string.IsNullOrEmpty(queueName))
|
if (string.IsNullOrEmpty(queueName))
|
||||||
throw new ArgumentNullException(nameof(queueName));
|
throw new ArgumentNullException(nameof(queueName));
|
||||||
|
|
||||||
return taskQueue.Value.Add(async () =>
|
return taskQueue.Value.Add(() =>
|
||||||
{
|
{
|
||||||
(await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware));
|
WithRetryableChannel(channel => channel.BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)));
|
||||||
}).Unwrap();
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task Subscribe(IQueue queue)
|
public Task Subscribe(IQueue queue)
|
||||||
{
|
{
|
||||||
return taskQueue.Value.Add(async () =>
|
return taskQueue.Value.Add(() =>
|
||||||
{
|
{
|
||||||
var channel = await GetChannel();
|
WithRetryableChannel(channel =>
|
||||||
|
|
||||||
if (queue.Dynamic)
|
|
||||||
{
|
{
|
||||||
var dynamicQueue = channel.QueueDeclare(queue.Name);
|
if (queue.Dynamic)
|
||||||
(queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName);
|
|
||||||
|
|
||||||
foreach (var binding in queue.Bindings)
|
|
||||||
{
|
{
|
||||||
if (binding.QueueBindingMode == QueueBindingMode.RoutingKey)
|
if (!(queue is IDynamicQueue dynamicQueue))
|
||||||
|
throw new NullReferenceException("Queue with Dynamic = true must implement IDynamicQueue");
|
||||||
|
|
||||||
|
var declaredQueue = channel.QueueDeclare(dynamicQueue.GetDeclareQueueName());
|
||||||
|
dynamicQueue.SetName(declaredQueue.QueueName);
|
||||||
|
|
||||||
|
foreach (var binding in queue.Bindings)
|
||||||
{
|
{
|
||||||
var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass);
|
if (binding.QueueBindingMode == QueueBindingMode.RoutingKey)
|
||||||
var exchange = exchangeStrategy.GetExchange(binding.MessageClass);
|
{
|
||||||
|
var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass);
|
||||||
|
var exchange = exchangeStrategy.GetExchange(binding.MessageClass);
|
||||||
|
|
||||||
channel.QueueBind(dynamicQueue.QueueName, exchange, routingKey);
|
channel.QueueBind(declaredQueue.QueueName, exchange, routingKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
(binding as IBuildBinding)?.SetQueueName(declaredQueue.QueueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
(binding as IBuildBinding)?.SetQueueName(dynamicQueue.QueueName);
|
|
||||||
}
|
}
|
||||||
}
|
else
|
||||||
else
|
|
||||||
{
|
|
||||||
channel.QueueDeclarePassive(queue.Name);
|
|
||||||
foreach (var binding in queue.Bindings)
|
|
||||||
{
|
{
|
||||||
(binding as IBuildBinding)?.SetQueueName(queue.Name);
|
channel.QueueDeclarePassive(queue.Name);
|
||||||
|
foreach (var binding in queue.Bindings)
|
||||||
|
{
|
||||||
|
(binding as IBuildBinding)?.SetQueueName(queue.Name);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
}).Unwrap();
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task Respond(ulong deliveryTag, ConsumeResponse response)
|
public Task Respond(ulong deliveryTag, ConsumeResponse response)
|
||||||
{
|
{
|
||||||
return taskQueue.Value.Add(async () =>
|
return taskQueue.Value.Add(() =>
|
||||||
{
|
{
|
||||||
|
// No need for a retryable channel here, if the connection is lost we can't
|
||||||
|
// use the deliveryTag anymore.
|
||||||
switch (response)
|
switch (response)
|
||||||
{
|
{
|
||||||
case ConsumeResponse.Ack:
|
case ConsumeResponse.Ack:
|
||||||
(await GetChannel()).BasicAck(deliveryTag, false);
|
GetChannel().BasicAck(deliveryTag, false);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case ConsumeResponse.Nack:
|
case ConsumeResponse.Nack:
|
||||||
(await GetChannel()).BasicNack(deliveryTag, false, false);
|
GetChannel().BasicNack(deliveryTag, false, false);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case ConsumeResponse.Requeue:
|
case ConsumeResponse.Requeue:
|
||||||
(await GetChannel()).BasicNack(deliveryTag, false, true);
|
GetChannel().BasicNack(deliveryTag, false, true);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw new ArgumentOutOfRangeException(nameof(response), response, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
}).Unwrap();
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -147,7 +184,7 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey)
|
private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey, bool mandatory)
|
||||||
{
|
{
|
||||||
var context = new PublishContext
|
var context = new PublishContext
|
||||||
{
|
{
|
||||||
@ -172,22 +209,96 @@ namespace Tapeti.Connection
|
|||||||
() => taskQueue.Value.Add(async () =>
|
() => taskQueue.Value.Add(async () =>
|
||||||
{
|
{
|
||||||
var body = messageSerializer.Serialize(context.Message, context.Properties);
|
var body = messageSerializer.Serialize(context.Message, context.Properties);
|
||||||
(await GetChannel(PublishMaxConnectAttempts)).BasicPublish(context.Exchange, context.RoutingKey, false,
|
|
||||||
context.Properties, body);
|
Task<int> publishResultTask = null;
|
||||||
}).Unwrap());
|
var messageInfo = new ConfirmMessageInfo
|
||||||
|
{
|
||||||
|
ReturnKey = GetReturnKey(context.Exchange, context.RoutingKey),
|
||||||
|
CompletionSource = new TaskCompletionSource<int>()
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
WithRetryableChannel(channel =>
|
||||||
|
{
|
||||||
|
// The delivery tag is lost after a reconnect, register under the new tag
|
||||||
|
if (config.UsePublisherConfirms)
|
||||||
|
{
|
||||||
|
lastDeliveryTag++;
|
||||||
|
|
||||||
|
confirmMessages.Add(lastDeliveryTag, messageInfo);
|
||||||
|
publishResultTask = messageInfo.CompletionSource.Task;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
mandatory = false;
|
||||||
|
|
||||||
|
channel.BasicPublish(context.Exchange, context.RoutingKey, mandatory, context.Properties, body);
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
if (publishResultTask == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
var delayCancellationTokenSource = new CancellationTokenSource();
|
||||||
|
var signalledTask = await Task.WhenAny(publishResultTask, Task.Delay(MandatoryReturnTimeout, delayCancellationTokenSource.Token));
|
||||||
|
|
||||||
|
if (signalledTask != publishResultTask)
|
||||||
|
throw new TimeoutException($"Timeout while waiting for basic.return for message with class {context.Message?.GetType().FullName ?? "null"} and Id {context.Properties.MessageId}");
|
||||||
|
|
||||||
|
delayCancellationTokenSource.Cancel();
|
||||||
|
|
||||||
|
if (publishResultTask.IsCanceled)
|
||||||
|
throw new NackException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} was nacked");
|
||||||
|
|
||||||
|
var replyCode = publishResultTask.Result;
|
||||||
|
|
||||||
|
// There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code
|
||||||
|
// at the time of writing...
|
||||||
|
if (replyCode == 312)
|
||||||
|
throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} does not have a route");
|
||||||
|
|
||||||
|
if (replyCode > 0)
|
||||||
|
throw new NoRouteException($"Mandatory message with class {context.Message?.GetType().FullName ?? "null"} could not be delivery, reply code {replyCode}");
|
||||||
|
}));
|
||||||
// ReSharper restore ImplicitlyCapturedClosure
|
// ReSharper restore ImplicitlyCapturedClosure
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <remarks>
|
/// <remarks>
|
||||||
/// Only call this from a task in the taskQueue to ensure IModel is only used
|
/// Only call this from a task in the taskQueue to ensure IModel is only used
|
||||||
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
|
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
|
||||||
/// </remarks>
|
/// </remarks>
|
||||||
private async Task<IModel> GetChannel(int? maxAttempts = null)
|
private void WithRetryableChannel(Action<IModel> operation)
|
||||||
{
|
{
|
||||||
if (channelInstance != null)
|
while (true)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
operation(GetChannel());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
catch (AlreadyClosedException e)
|
||||||
|
{
|
||||||
|
// TODO log?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <remarks>
|
||||||
|
/// Only call this from a task in the taskQueue to ensure IModel is only used
|
||||||
|
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
|
||||||
|
/// </remarks>
|
||||||
|
private IModel GetChannel()
|
||||||
|
{
|
||||||
|
if (channelInstance != null && channelInstance.IsOpen)
|
||||||
return channelInstance;
|
return channelInstance;
|
||||||
|
|
||||||
var attempts = 0;
|
// If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ
|
||||||
|
// not related to the connection), wait for a bit to avoid spamming the connection
|
||||||
|
if ((DateTime.UtcNow - connectedDateTime).TotalMilliseconds <= MinimumConnectedReconnectDelay)
|
||||||
|
Thread.Sleep(ReconnectDelay);
|
||||||
|
|
||||||
|
|
||||||
var connectionFactory = new ConnectionFactory
|
var connectionFactory = new ConnectionFactory
|
||||||
{
|
{
|
||||||
HostName = ConnectionParams.HostName,
|
HostName = ConnectionParams.HostName,
|
||||||
@ -195,7 +306,8 @@ namespace Tapeti.Connection
|
|||||||
VirtualHost = ConnectionParams.VirtualHost,
|
VirtualHost = ConnectionParams.VirtualHost,
|
||||||
UserName = ConnectionParams.Username,
|
UserName = ConnectionParams.Username,
|
||||||
Password = ConnectionParams.Password,
|
Password = ConnectionParams.Password,
|
||||||
AutomaticRecoveryEnabled = true, // The created connection is an IRecoverable
|
AutomaticRecoveryEnabled = false,
|
||||||
|
TopologyRecoveryEnabled = false,
|
||||||
RequestedHeartbeat = 30
|
RequestedHeartbeat = 30
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -208,27 +320,50 @@ namespace Tapeti.Connection
|
|||||||
connection = connectionFactory.CreateConnection();
|
connection = connectionFactory.CreateConnection();
|
||||||
channelInstance = connection.CreateModel();
|
channelInstance = connection.CreateModel();
|
||||||
|
|
||||||
|
if (channelInstance == null)
|
||||||
|
throw new BrokerUnreachableException(null);
|
||||||
|
|
||||||
|
if (config.UsePublisherConfirms)
|
||||||
|
{
|
||||||
|
lastDeliveryTag = 0;
|
||||||
|
confirmMessages.Clear();
|
||||||
|
channelInstance.ConfirmSelect();
|
||||||
|
}
|
||||||
|
|
||||||
if (ConnectionParams.PrefetchCount > 0)
|
if (ConnectionParams.PrefetchCount > 0)
|
||||||
channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false);
|
channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false);
|
||||||
|
|
||||||
((IRecoverable)connection).Recovery += (sender, e) => ConnectionEventListener?.Reconnected();
|
channelInstance.ModelShutdown += (sender, e) =>
|
||||||
|
{
|
||||||
|
ConnectionEventListener?.Disconnected(new DisconnectedEventArgs
|
||||||
|
{
|
||||||
|
ReplyCode = e.ReplyCode,
|
||||||
|
ReplyText = e.ReplyText
|
||||||
|
});
|
||||||
|
|
||||||
channelInstance.ModelShutdown += (sender, e) => ConnectionEventListener?.Disconnected();
|
channelInstance = null;
|
||||||
|
};
|
||||||
|
|
||||||
|
channelInstance.BasicReturn += HandleBasicReturn;
|
||||||
|
channelInstance.BasicAcks += HandleBasicAck;
|
||||||
|
channelInstance.BasicNacks += HandleBasicNack;
|
||||||
|
|
||||||
|
connectedDateTime = DateTime.UtcNow;
|
||||||
|
|
||||||
|
if (isReconnect)
|
||||||
|
ConnectionEventListener?.Reconnected();
|
||||||
|
else
|
||||||
|
ConnectionEventListener?.Connected();
|
||||||
|
|
||||||
ConnectionEventListener?.Connected();
|
|
||||||
logger.ConnectSuccess(ConnectionParams);
|
logger.ConnectSuccess(ConnectionParams);
|
||||||
|
isReconnect = true;
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
catch (BrokerUnreachableException e)
|
catch (BrokerUnreachableException e)
|
||||||
{
|
{
|
||||||
logger.ConnectFailed(ConnectionParams, e);
|
logger.ConnectFailed(ConnectionParams, e);
|
||||||
|
Thread.Sleep(ReconnectDelay);
|
||||||
attempts++;
|
|
||||||
if (maxAttempts.HasValue && attempts > maxAttempts.Value)
|
|
||||||
throw;
|
|
||||||
|
|
||||||
await Task.Delay(ReconnectDelay);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -236,6 +371,93 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void HandleBasicReturn(object sender, BasicReturnEventArgs e)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* "If the message is also published as mandatory, the basic.return is sent to the client before basic.ack."
|
||||||
|
* - https://www.rabbitmq.com/confirms.html
|
||||||
|
*
|
||||||
|
* Because there is no delivery tag included in the basic.return message. This solution is modeled after
|
||||||
|
* user OhJeez' answer on StackOverflow:
|
||||||
|
*
|
||||||
|
* "Since all messages with the same routing key are routed the same way. I assumed that once I get a
|
||||||
|
* basic.return about a specific routing key, all messages with this routing key can be considered undelivered"
|
||||||
|
* https://stackoverflow.com/questions/21336659/how-to-tell-which-amqp-message-was-not-routed-from-basic-return-response
|
||||||
|
*/
|
||||||
|
var key = GetReturnKey(e.Exchange, e.RoutingKey);
|
||||||
|
|
||||||
|
if (!returnRoutingKeys.TryGetValue(key, out var returnInfo))
|
||||||
|
{
|
||||||
|
returnInfo = new ReturnInfo
|
||||||
|
{
|
||||||
|
RefCount = 0,
|
||||||
|
FirstReplyCode = e.ReplyCode
|
||||||
|
};
|
||||||
|
|
||||||
|
returnRoutingKeys.Add(key, returnInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
returnInfo.RefCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void HandleBasicAck(object sender, BasicAckEventArgs e)
|
||||||
|
{
|
||||||
|
foreach (var deliveryTag in GetDeliveryTags(e))
|
||||||
|
{
|
||||||
|
if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (returnRoutingKeys.TryGetValue(messageInfo.ReturnKey, out var returnInfo))
|
||||||
|
{
|
||||||
|
messageInfo.CompletionSource.SetResult(returnInfo.FirstReplyCode);
|
||||||
|
|
||||||
|
returnInfo.RefCount--;
|
||||||
|
if (returnInfo.RefCount == 0)
|
||||||
|
returnRoutingKeys.Remove(messageInfo.ReturnKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
messageInfo.CompletionSource.SetResult(0);
|
||||||
|
confirmMessages.Remove(deliveryTag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void HandleBasicNack(object sender, BasicNackEventArgs e)
|
||||||
|
{
|
||||||
|
foreach (var deliveryTag in GetDeliveryTags(e))
|
||||||
|
{
|
||||||
|
if (!confirmMessages.TryGetValue(deliveryTag, out var messageInfo))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
messageInfo.CompletionSource.SetCanceled();
|
||||||
|
confirmMessages.Remove(e.DeliveryTag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private IEnumerable<ulong> GetDeliveryTags(BasicAckEventArgs e)
|
||||||
|
{
|
||||||
|
return e.Multiple
|
||||||
|
? confirmMessages.Keys.Where(tag => tag <= e.DeliveryTag).ToArray()
|
||||||
|
: new[] { e.DeliveryTag };
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private IEnumerable<ulong> GetDeliveryTags(BasicNackEventArgs e)
|
||||||
|
{
|
||||||
|
return e.Multiple
|
||||||
|
? confirmMessages.Keys.Where(tag => tag <= e.DeliveryTag).ToArray()
|
||||||
|
: new[] { e.DeliveryTag };
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static string GetReturnKey(string exchange, string routingKey)
|
||||||
|
{
|
||||||
|
return exchange + ':' + routingKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private class PublishContext : IPublishContext
|
private class PublishContext : IPublishContext
|
||||||
{
|
{
|
||||||
public IDependencyResolver DependencyResolver { get; set; }
|
public IDependencyResolver DependencyResolver { get; set; }
|
||||||
|
@ -16,7 +16,7 @@ namespace Tapeti.Default
|
|||||||
|
|
||||||
public void ConnectSuccess(TapetiConnectionParams connectionParams)
|
public void ConnectSuccess(TapetiConnectionParams connectionParams)
|
||||||
{
|
{
|
||||||
Console.WriteLine($"[Tapeti] Connected");
|
Console.WriteLine("[Tapeti] Connected");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void HandlerException(Exception e)
|
public void HandlerException(Exception e)
|
||||||
|
90
Tapeti/Default/FallbackStringEnumConverter.cs
Normal file
90
Tapeti/Default/FallbackStringEnumConverter.cs
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
using System;
|
||||||
|
using System.Diagnostics;
|
||||||
|
using Newtonsoft.Json;
|
||||||
|
|
||||||
|
namespace Tapeti.Default
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Converts an <see cref="Enum"/> to and from its name string value. If an unknown string value is encountered
|
||||||
|
/// it will translate to 0xDEADBEEF (-559038737) so it can be gracefully handled.
|
||||||
|
/// If you copy this value as-is to another message and try to send it, this converter will throw an exception.
|
||||||
|
///
|
||||||
|
/// This converter is far simpler than the default StringEnumConverter, it assumes both sides use the same
|
||||||
|
/// enum and therefore skips the naming strategy.
|
||||||
|
/// </summary>
|
||||||
|
public class FallbackStringEnumConverter : JsonConverter
|
||||||
|
{
|
||||||
|
private readonly int invalidEnumValue;
|
||||||
|
|
||||||
|
|
||||||
|
public FallbackStringEnumConverter()
|
||||||
|
{
|
||||||
|
unchecked { invalidEnumValue = (int)0xDEADBEEF; }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
|
||||||
|
{
|
||||||
|
if (value == null)
|
||||||
|
{
|
||||||
|
writer.WriteNull();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((int) value == invalidEnumValue)
|
||||||
|
throw new ArgumentException("Enum value was an unknown string value in an incoming message and can not be published in an outgoing message as-is");
|
||||||
|
|
||||||
|
var outputValue = Enum.GetName(value.GetType(), value);
|
||||||
|
writer.WriteValue(outputValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
|
||||||
|
{
|
||||||
|
var isNullable = IsNullableType(objectType);
|
||||||
|
|
||||||
|
if (reader.TokenType == JsonToken.Null)
|
||||||
|
{
|
||||||
|
if (!isNullable)
|
||||||
|
throw new JsonSerializationException($"Cannot convert null value to {objectType}");
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
var actualType = isNullable ? Nullable.GetUnderlyingType(objectType) : objectType;
|
||||||
|
Debug.Assert(actualType != null, nameof(actualType) + " != null");
|
||||||
|
|
||||||
|
if (reader.TokenType != JsonToken.String)
|
||||||
|
throw new JsonSerializationException($"Unexpected token {reader.TokenType} when parsing enum");
|
||||||
|
|
||||||
|
var enumText = reader.Value.ToString();
|
||||||
|
if (enumText == string.Empty && isNullable)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return Enum.Parse(actualType, enumText);
|
||||||
|
}
|
||||||
|
catch (ArgumentException)
|
||||||
|
{
|
||||||
|
return Enum.ToObject(actualType, invalidEnumValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public override bool CanConvert(Type objectType)
|
||||||
|
{
|
||||||
|
var actualType = IsNullableType(objectType) ? Nullable.GetUnderlyingType(objectType) : objectType;
|
||||||
|
return actualType?.IsEnum ?? false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static bool IsNullableType(Type t)
|
||||||
|
{
|
||||||
|
if (t == null)
|
||||||
|
throw new ArgumentNullException(nameof(t));
|
||||||
|
|
||||||
|
return t.IsGenericType && t.GetGenericTypeDefinition() == typeof(Nullable<>);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -3,7 +3,6 @@ using System.Collections.Concurrent;
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using Newtonsoft.Json;
|
using Newtonsoft.Json;
|
||||||
using Newtonsoft.Json.Converters;
|
|
||||||
using RabbitMQ.Client;
|
using RabbitMQ.Client;
|
||||||
|
|
||||||
namespace Tapeti.Default
|
namespace Tapeti.Default
|
||||||
@ -25,7 +24,7 @@ namespace Tapeti.Default
|
|||||||
NullValueHandling = NullValueHandling.Ignore
|
NullValueHandling = NullValueHandling.Ignore
|
||||||
};
|
};
|
||||||
|
|
||||||
serializerSettings.Converters.Add(new StringEnumConverter());
|
serializerSettings.Converters.Add(new FallbackStringEnumConverter());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -52,7 +51,7 @@ namespace Tapeti.Default
|
|||||||
throw new ArgumentException($"{ClassTypeHeader} header not present");
|
throw new ArgumentException($"{ClassTypeHeader} header not present");
|
||||||
|
|
||||||
var messageType = deserializedTypeNames.GetOrAdd(Encoding.UTF8.GetString((byte[])typeName), DeserializeTypeName);
|
var messageType = deserializedTypeNames.GetOrAdd(Encoding.UTF8.GetString((byte[])typeName), DeserializeTypeName);
|
||||||
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body), messageType);
|
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body), messageType, serializerSettings);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ using System.Diagnostics;
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using RabbitMQ.Client.Framing;
|
using RabbitMQ.Client.Framing;
|
||||||
|
using Tapeti.Annotations;
|
||||||
using Tapeti.Config;
|
using Tapeti.Config;
|
||||||
using Tapeti.Helpers;
|
using Tapeti.Helpers;
|
||||||
|
|
||||||
@ -18,23 +19,30 @@ namespace Tapeti.Default
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
|
|
||||||
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out var isTaskOf, out var actualType))
|
var hasClassResult = context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out var isTaskOf, out var actualType);
|
||||||
|
|
||||||
|
var request = context.MessageClass?.GetCustomAttribute<RequestAttribute>();
|
||||||
|
var expectedClassResult = request?.Response;
|
||||||
|
|
||||||
|
// Verify the return type matches with the Request attribute of the message class. This is a backwards incompatible change in
|
||||||
|
// Tapeti 1.2: if you just want to publish another message as a result of the incoming message, explicitly call IPublisher.Publish.
|
||||||
|
if (!hasClassResult && expectedClassResult != null || hasClassResult && expectedClassResult != actualType)
|
||||||
|
throw new ArgumentException($"Message handler must return type {expectedClassResult?.FullName ?? "void"} in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}, found: {actualType?.FullName ?? "void"}");
|
||||||
|
|
||||||
|
if (!hasClassResult)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (isTaskOf)
|
if (isTaskOf)
|
||||||
{
|
{
|
||||||
var handler = GetType().GetMethod("PublishGenericTaskResult", BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType);
|
var handler = GetType().GetMethod("PublishGenericTaskResult", BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType);
|
||||||
Debug.Assert(handler != null, nameof(handler) + " != null");
|
Debug.Assert(handler != null, nameof(handler) + " != null");
|
||||||
|
|
||||||
context.Result.SetHandler(async (messageContext, value) =>
|
context.Result.SetHandler(async (messageContext, value) => { await (Task) handler.Invoke(null, new[] {messageContext, value }); });
|
||||||
{
|
|
||||||
await (Task)handler.Invoke(null, new[] { messageContext, value });
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
context.Result.SetHandler((messageContext, value) =>
|
context.Result.SetHandler((messageContext, value) => Reply(value, messageContext));
|
||||||
value == null ? null : Reply(value, messageContext));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -43,13 +51,15 @@ namespace Tapeti.Default
|
|||||||
private static async Task PublishGenericTaskResult<T>(IMessageContext messageContext, object value) where T : class
|
private static async Task PublishGenericTaskResult<T>(IMessageContext messageContext, object value) where T : class
|
||||||
{
|
{
|
||||||
var message = await (Task<T>)value;
|
var message = await (Task<T>)value;
|
||||||
if (message != null)
|
await Reply(message, messageContext);
|
||||||
await Reply(message, messageContext);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static Task Reply(object message, IMessageContext messageContext)
|
private static Task Reply(object message, IMessageContext messageContext)
|
||||||
{
|
{
|
||||||
|
if (message == null)
|
||||||
|
throw new ArgumentException("Return value of a request message handler must not be null");
|
||||||
|
|
||||||
var publisher = (IInternalPublisher)messageContext.DependencyResolver.Resolve<IPublisher>();
|
var publisher = (IInternalPublisher)messageContext.DependencyResolver.Resolve<IPublisher>();
|
||||||
var properties = new BasicProperties();
|
var properties = new BasicProperties();
|
||||||
|
|
||||||
@ -59,9 +69,9 @@ namespace Tapeti.Default
|
|||||||
properties.CorrelationId = messageContext.Properties.CorrelationId;
|
properties.CorrelationId = messageContext.Properties.CorrelationId;
|
||||||
|
|
||||||
if (messageContext.Properties.IsReplyToPresent())
|
if (messageContext.Properties.IsReplyToPresent())
|
||||||
return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties);
|
return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, true);
|
||||||
|
|
||||||
return publisher.Publish(message, properties);
|
return publisher.Publish(message, properties, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
9
Tapeti/Exceptions/NackException.cs
Normal file
9
Tapeti/Exceptions/NackException.cs
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Tapeti.Exceptions
|
||||||
|
{
|
||||||
|
public class NackException : Exception
|
||||||
|
{
|
||||||
|
public NackException(string message) : base(message) { }
|
||||||
|
}
|
||||||
|
}
|
9
Tapeti/Exceptions/NoRouteException.cs
Normal file
9
Tapeti/Exceptions/NoRouteException.cs
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Tapeti.Exceptions
|
||||||
|
{
|
||||||
|
public class NoRouteException : Exception
|
||||||
|
{
|
||||||
|
public NoRouteException(string message) : base(message) { }
|
||||||
|
}
|
||||||
|
}
|
@ -13,7 +13,7 @@ namespace Tapeti
|
|||||||
|
|
||||||
public interface IInternalPublisher : IPublisher
|
public interface IInternalPublisher : IPublisher
|
||||||
{
|
{
|
||||||
Task Publish(object message, IBasicProperties properties);
|
Task Publish(object message, IBasicProperties properties, bool mandatory);
|
||||||
Task PublishDirect(object message, string queueName, IBasicProperties properties);
|
Task PublishDirect(object message, string queueName, IBasicProperties properties, bool mandatory);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ using Tapeti.Config;
|
|||||||
using Tapeti.Default;
|
using Tapeti.Default;
|
||||||
using Tapeti.Helpers;
|
using Tapeti.Helpers;
|
||||||
|
|
||||||
|
// ReSharper disable UnusedMember.Global
|
||||||
|
|
||||||
namespace Tapeti
|
namespace Tapeti
|
||||||
{
|
{
|
||||||
@ -31,6 +32,8 @@ namespace Tapeti
|
|||||||
|
|
||||||
private readonly IDependencyResolver dependencyResolver;
|
private readonly IDependencyResolver dependencyResolver;
|
||||||
|
|
||||||
|
private bool usePublisherConfirms = true;
|
||||||
|
|
||||||
|
|
||||||
public TapetiConfig(IDependencyResolver dependencyResolver)
|
public TapetiConfig(IDependencyResolver dependencyResolver)
|
||||||
{
|
{
|
||||||
@ -90,7 +93,16 @@ namespace Tapeti
|
|||||||
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl)));
|
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl)));
|
||||||
}
|
}
|
||||||
|
|
||||||
var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues);
|
var config = new Config(queues)
|
||||||
|
{
|
||||||
|
DependencyResolver = dependencyResolver,
|
||||||
|
MessageMiddleware = messageMiddleware,
|
||||||
|
CleanupMiddleware = cleanupMiddleware,
|
||||||
|
PublishMiddleware = publishMiddleware,
|
||||||
|
|
||||||
|
UsePublisherConfirms = usePublisherConfirms
|
||||||
|
};
|
||||||
|
|
||||||
(dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<IConfig>(config);
|
(dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<IConfig>(config);
|
||||||
|
|
||||||
return config;
|
return config;
|
||||||
@ -154,11 +166,34 @@ namespace Tapeti
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds,
|
||||||
|
/// and disables Tapeti.Flow from verifying if a request/response can be routed. This may
|
||||||
|
/// result in never-ending flows. Only disable if you can accept those consequences.
|
||||||
|
/// </summary>
|
||||||
|
public TapetiConfig DisablePublisherConfirms()
|
||||||
|
{
|
||||||
|
usePublisherConfirms = false;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// WARNING: disabling publisher confirms means there is no guarantee that a Publish succeeds,
|
||||||
|
/// and disables Tapeti.Flow from verifying if a request/response can be routed. This may
|
||||||
|
/// result in never-ending flows. Only disable if you accept those consequences.
|
||||||
|
/// </summary>
|
||||||
|
public TapetiConfig SetPublisherConfirms(bool enabled)
|
||||||
|
{
|
||||||
|
usePublisherConfirms = enabled;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void RegisterDefaults()
|
public void RegisterDefaults()
|
||||||
{
|
{
|
||||||
var container = dependencyResolver as IDependencyContainer;
|
if (!(dependencyResolver is IDependencyContainer container))
|
||||||
if (container == null)
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (ConsoleHelper.IsAvailable())
|
if (ConsoleHelper.IsAvailable())
|
||||||
@ -399,21 +434,19 @@ namespace Tapeti
|
|||||||
|
|
||||||
protected class Config : IConfig
|
protected class Config : IConfig
|
||||||
{
|
{
|
||||||
public IDependencyResolver DependencyResolver { get; }
|
public bool UsePublisherConfirms { get; set; }
|
||||||
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
|
|
||||||
public IReadOnlyList<ICleanupMiddleware> CleanupMiddleware { get; }
|
public IDependencyResolver DependencyResolver { get; set; }
|
||||||
public IReadOnlyList<IPublishMiddleware> PublishMiddleware { get; }
|
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; set; }
|
||||||
|
public IReadOnlyList<ICleanupMiddleware> CleanupMiddleware { get; set; }
|
||||||
|
public IReadOnlyList<IPublishMiddleware> PublishMiddleware { get; set; }
|
||||||
public IEnumerable<IQueue> Queues { get; }
|
public IEnumerable<IQueue> Queues { get; }
|
||||||
|
|
||||||
private readonly Dictionary<MethodInfo, IBinding> bindingMethodLookup;
|
private readonly Dictionary<MethodInfo, IBinding> bindingMethodLookup;
|
||||||
|
|
||||||
|
|
||||||
public Config(IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware, IReadOnlyList<ICleanupMiddleware> cleanupMiddleware, IReadOnlyList<IPublishMiddleware> publishMiddleware, IEnumerable<IQueue> queues)
|
public Config(IEnumerable<IQueue> queues)
|
||||||
{
|
{
|
||||||
DependencyResolver = dependencyResolver;
|
|
||||||
MessageMiddleware = messageMiddleware;
|
|
||||||
CleanupMiddleware = cleanupMiddleware;
|
|
||||||
PublishMiddleware = publishMiddleware;
|
|
||||||
Queues = queues.ToList();
|
Queues = queues.ToList();
|
||||||
|
|
||||||
bindingMethodLookup = Queues.SelectMany(q => q.Bindings).ToDictionary(b => b.Method, b => b);
|
bindingMethodLookup = Queues.SelectMany(q => q.Bindings).ToDictionary(b => b.Method, b => b);
|
||||||
@ -429,6 +462,8 @@ namespace Tapeti
|
|||||||
|
|
||||||
protected class Queue : IDynamicQueue
|
protected class Queue : IDynamicQueue
|
||||||
{
|
{
|
||||||
|
private readonly string declareQueueName;
|
||||||
|
|
||||||
public bool Dynamic { get; }
|
public bool Dynamic { get; }
|
||||||
public string Name { get; set; }
|
public string Name { get; set; }
|
||||||
public IEnumerable<IBinding> Bindings { get; }
|
public IEnumerable<IBinding> Bindings { get; }
|
||||||
@ -436,12 +471,20 @@ namespace Tapeti
|
|||||||
|
|
||||||
public Queue(QueueInfo queue, IEnumerable<IBinding> bindings)
|
public Queue(QueueInfo queue, IEnumerable<IBinding> bindings)
|
||||||
{
|
{
|
||||||
|
declareQueueName = queue.Name;
|
||||||
|
|
||||||
Dynamic = queue.Dynamic.GetValueOrDefault();
|
Dynamic = queue.Dynamic.GetValueOrDefault();
|
||||||
Name = queue.Name;
|
Name = queue.Name;
|
||||||
Bindings = bindings;
|
Bindings = bindings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public string GetDeclareQueueName()
|
||||||
|
{
|
||||||
|
return declareQueueName;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void SetName(string name)
|
public void SetName(string name)
|
||||||
{
|
{
|
||||||
Name = name;
|
Name = name;
|
||||||
|
@ -8,13 +8,15 @@ using Tapeti.Connection;
|
|||||||
|
|
||||||
namespace Tapeti
|
namespace Tapeti
|
||||||
{
|
{
|
||||||
|
public delegate void DisconnectedEventHandler(object sender, DisconnectedEventArgs e);
|
||||||
|
|
||||||
public class TapetiConnection : IDisposable
|
public class TapetiConnection : IDisposable
|
||||||
{
|
{
|
||||||
private readonly IConfig config;
|
private readonly IConfig config;
|
||||||
public TapetiConnectionParams Params { get; set; }
|
public TapetiConnectionParams Params { get; set; }
|
||||||
|
|
||||||
private readonly Lazy<TapetiWorker> worker;
|
private readonly Lazy<TapetiWorker> worker;
|
||||||
|
private TapetiSubscriber subscriber;
|
||||||
|
|
||||||
public TapetiConnection(IConfig config)
|
public TapetiConnection(IConfig config)
|
||||||
{
|
{
|
||||||
@ -29,15 +31,17 @@ namespace Tapeti
|
|||||||
}
|
}
|
||||||
|
|
||||||
public event EventHandler Connected;
|
public event EventHandler Connected;
|
||||||
|
public event DisconnectedEventHandler Disconnected;
|
||||||
public event EventHandler Disconnected;
|
|
||||||
|
|
||||||
public event EventHandler Reconnected;
|
public event EventHandler Reconnected;
|
||||||
|
|
||||||
|
|
||||||
public async Task<ISubscriber> Subscribe(bool startConsuming = true)
|
public async Task<ISubscriber> Subscribe(bool startConsuming = true)
|
||||||
{
|
{
|
||||||
var subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList());
|
if (subscriber == null)
|
||||||
await subscriber.BindQueues();
|
{
|
||||||
|
subscriber = new TapetiSubscriber(() => worker.Value, config.Queues.ToList());
|
||||||
|
await subscriber.BindQueues();
|
||||||
|
}
|
||||||
|
|
||||||
if (startConsuming)
|
if (startConsuming)
|
||||||
await subscriber.Resume();
|
await subscriber.Resume();
|
||||||
@ -46,9 +50,9 @@ namespace Tapeti
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public ISubscriber SubscribeSync()
|
public ISubscriber SubscribeSync(bool startConsuming = true)
|
||||||
{
|
{
|
||||||
return Subscribe().Result;
|
return Subscribe(startConsuming).Result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -84,9 +88,9 @@ namespace Tapeti
|
|||||||
owner.OnConnected(new EventArgs());
|
owner.OnConnected(new EventArgs());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Disconnected()
|
public void Disconnected(DisconnectedEventArgs e)
|
||||||
{
|
{
|
||||||
owner.OnDisconnected(new EventArgs());
|
owner.OnDisconnected(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Reconnected()
|
public void Reconnected()
|
||||||
@ -97,17 +101,23 @@ namespace Tapeti
|
|||||||
|
|
||||||
protected virtual void OnConnected(EventArgs e)
|
protected virtual void OnConnected(EventArgs e)
|
||||||
{
|
{
|
||||||
Connected?.Invoke(this, e);
|
Task.Run(() => Connected?.Invoke(this, e));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected virtual void OnReconnected(EventArgs e)
|
protected virtual void OnReconnected(EventArgs e)
|
||||||
{
|
{
|
||||||
Reconnected?.Invoke(this, e);
|
Task.Run(() =>
|
||||||
|
{
|
||||||
|
subscriber?.RebindQueues().ContinueWith((t) =>
|
||||||
|
{
|
||||||
|
Reconnected?.Invoke(this, e);
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
protected virtual void OnDisconnected(EventArgs e)
|
protected virtual void OnDisconnected(DisconnectedEventArgs e)
|
||||||
{
|
{
|
||||||
Disconnected?.Invoke(this, e);
|
Task.Run(() => Disconnected?.Invoke(this, e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,7 @@ namespace Tapeti.Tasks
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task<T> Add<T>(Func<T> func)
|
public Task Add(Func<Task> func)
|
||||||
{
|
{
|
||||||
lock (previousTaskLock)
|
lock (previousTaskLock)
|
||||||
{
|
{
|
||||||
@ -36,7 +36,11 @@ namespace Tapeti.Tasks
|
|||||||
, singleThreadScheduler.Value);
|
, singleThreadScheduler.Value);
|
||||||
|
|
||||||
previousTask = task;
|
previousTask = task;
|
||||||
return task;
|
|
||||||
|
// 'task' completes at the moment a Task is returned (for example, an await is encountered),
|
||||||
|
// this is used to chain the next. We return the unwrapped Task however, so that the caller
|
||||||
|
// awaits until the full task chain has completed.
|
||||||
|
return task.Unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,12 +80,16 @@ namespace Test
|
|||||||
return flowProvider.YieldWithParallelRequest()
|
return flowProvider.YieldWithParallelRequest()
|
||||||
.AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
|
.AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
|
||||||
{
|
{
|
||||||
StoredInState = StateTestGuid
|
StoredInState = StateTestGuid,
|
||||||
|
EnumValue = TestEnum.Value1,
|
||||||
|
|
||||||
}, HandlePoloConfirmationResponse1)
|
}, HandlePoloConfirmationResponse1)
|
||||||
|
|
||||||
.AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
|
.AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
|
||||||
{
|
{
|
||||||
StoredInState = StateTestGuid
|
StoredInState = StateTestGuid,
|
||||||
|
EnumValue = TestEnum.Value2,
|
||||||
|
OptionalEnumValue = TestEnum.Value1
|
||||||
}, HandlePoloConfirmationResponse2)
|
}, HandlePoloConfirmationResponse2)
|
||||||
|
|
||||||
.YieldSync(ContinuePoloConfirmation);
|
.YieldSync(ContinuePoloConfirmation);
|
||||||
@ -127,7 +131,9 @@ namespace Test
|
|||||||
|
|
||||||
return new PoloConfirmationResponseMessage
|
return new PoloConfirmationResponseMessage
|
||||||
{
|
{
|
||||||
ShouldMatchState = message.StoredInState
|
ShouldMatchState = message.StoredInState,
|
||||||
|
EnumValue = message.EnumValue,
|
||||||
|
OptionalEnumValue = message.OptionalEnumValue
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,6 +147,13 @@ namespace Test
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public enum TestEnum
|
||||||
|
{
|
||||||
|
Value1,
|
||||||
|
Value2
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
[Request(Response = typeof(PoloMessage))]
|
[Request(Response = typeof(PoloMessage))]
|
||||||
public class MarcoMessage
|
public class MarcoMessage
|
||||||
{
|
{
|
||||||
@ -157,6 +170,9 @@ namespace Test
|
|||||||
{
|
{
|
||||||
[Required]
|
[Required]
|
||||||
public Guid StoredInState { get; set; }
|
public Guid StoredInState { get; set; }
|
||||||
|
|
||||||
|
public TestEnum EnumValue;
|
||||||
|
public TestEnum? OptionalEnumValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -164,5 +180,8 @@ namespace Test
|
|||||||
{
|
{
|
||||||
[Required]
|
[Required]
|
||||||
public Guid ShouldMatchState { get; set; }
|
public Guid ShouldMatchState { get; set; }
|
||||||
|
|
||||||
|
public TestEnum EnumValue;
|
||||||
|
public TestEnum? OptionalEnumValue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,25 +0,0 @@
|
|||||||
using System;
|
|
||||||
using Tapeti;
|
|
||||||
|
|
||||||
namespace Test
|
|
||||||
{
|
|
||||||
public class MyLogger : ILogger
|
|
||||||
{
|
|
||||||
public void Connect(TapetiConnectionParams connectionParams)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public void ConnectSuccess(TapetiConnectionParams connectionParams)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public void HandlerException(Exception e)
|
|
||||||
{
|
|
||||||
Console.WriteLine("Mylogger: " + e.Message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
103
Test/Program.cs
103
Test/Program.cs
@ -5,7 +5,6 @@ using Tapeti.DataAnnotations;
|
|||||||
using Tapeti.Flow;
|
using Tapeti.Flow;
|
||||||
using Tapeti.SimpleInjector;
|
using Tapeti.SimpleInjector;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using Tapeti.Flow.SQL;
|
|
||||||
|
|
||||||
namespace Test
|
namespace Test
|
||||||
{
|
{
|
||||||
@ -14,58 +13,60 @@ namespace Test
|
|||||||
private static void Main()
|
private static void Main()
|
||||||
{
|
{
|
||||||
// TODO logging
|
// TODO logging
|
||||||
|
try
|
||||||
var container = new Container();
|
|
||||||
container.Register<MarcoEmitter>();
|
|
||||||
container.Register<Visualizer>();
|
|
||||||
container.Register<ILogger, Tapeti.Default.ConsoleLogger>();
|
|
||||||
|
|
||||||
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
|
|
||||||
//.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true")
|
|
||||||
.WithFlow()
|
|
||||||
.WithDataAnnotations()
|
|
||||||
.RegisterAllControllers()
|
|
||||||
.Build();
|
|
||||||
|
|
||||||
using (var connection = new TapetiConnection(config)
|
|
||||||
{
|
{
|
||||||
Params = new TapetiAppSettingsConnectionParams()
|
var container = new Container();
|
||||||
})
|
container.Register<MarcoEmitter>();
|
||||||
|
container.Register<Visualizer>();
|
||||||
|
container.Register<ILogger, Tapeti.Default.ConsoleLogger>();
|
||||||
|
|
||||||
|
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
|
||||||
|
//.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true")
|
||||||
|
.WithFlow()
|
||||||
|
.WithDataAnnotations()
|
||||||
|
.RegisterAllControllers()
|
||||||
|
//.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails
|
||||||
|
.Build();
|
||||||
|
|
||||||
|
using (var connection = new TapetiConnection(config)
|
||||||
|
{
|
||||||
|
Params = new TapetiAppSettingsConnectionParams()
|
||||||
|
})
|
||||||
|
{
|
||||||
|
var flowStore = container.GetInstance<IFlowStore>();
|
||||||
|
var flowStore2 = container.GetInstance<IFlowStore>();
|
||||||
|
|
||||||
|
Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2));
|
||||||
|
|
||||||
|
connection.Connected += (sender, e) => { Console.WriteLine("Event Connected"); };
|
||||||
|
connection.Disconnected += (sender, e) => { Console.WriteLine("Event Disconnected"); };
|
||||||
|
connection.Reconnected += (sender, e) => { Console.WriteLine("Event Reconnected"); };
|
||||||
|
|
||||||
|
Console.WriteLine("Subscribing...");
|
||||||
|
var subscriber = connection.Subscribe(false).Result;
|
||||||
|
|
||||||
|
Console.WriteLine("Consuming...");
|
||||||
|
subscriber.Resume().Wait();
|
||||||
|
|
||||||
|
Console.WriteLine("Done!");
|
||||||
|
|
||||||
|
connection.GetPublisher().Publish(new FlowEndController.PingMessage());
|
||||||
|
|
||||||
|
//container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true).Wait();
|
||||||
|
container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.TestParallelRequest).Wait();
|
||||||
|
|
||||||
|
Thread.Sleep(1000);
|
||||||
|
|
||||||
|
var emitter = container.GetInstance<MarcoEmitter>();
|
||||||
|
emitter.Run().Wait();
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
var flowStore = container.GetInstance<IFlowStore>();
|
Console.WriteLine(e.ToString());
|
||||||
var flowStore2 = container.GetInstance<IFlowStore>();
|
Console.ReadKey();
|
||||||
|
|
||||||
Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2));
|
|
||||||
|
|
||||||
connection.Connected += (sender, e) => {
|
|
||||||
Console.WriteLine("Event Connected");
|
|
||||||
};
|
|
||||||
connection.Disconnected += (sender, e) => {
|
|
||||||
Console.WriteLine("Event Disconnected");
|
|
||||||
};
|
|
||||||
connection.Reconnected += (sender, e) => {
|
|
||||||
Console.WriteLine("Event Reconnected");
|
|
||||||
};
|
|
||||||
|
|
||||||
Console.WriteLine("Subscribing...");
|
|
||||||
var subscriber = connection.Subscribe(false).Result;
|
|
||||||
|
|
||||||
Console.WriteLine("Consuming...");
|
|
||||||
subscriber.Resume().Wait();
|
|
||||||
|
|
||||||
Console.WriteLine("Done!");
|
|
||||||
|
|
||||||
connection.GetPublisher().Publish(new FlowEndController.PingMessage());
|
|
||||||
|
|
||||||
//container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true);
|
|
||||||
container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.TestParallelRequest);
|
|
||||||
|
|
||||||
Thread.Sleep(1000);
|
|
||||||
|
|
||||||
var emitter = container.GetInstance<MarcoEmitter>();
|
|
||||||
emitter.Run().Wait();
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user