Merge branch 'release/2.6'
This commit is contained in:
commit
8f158fdaa9
@ -32,6 +32,12 @@ namespace _03_FlowRequestResponse
|
|||||||
public DateTime RequestStartTime;
|
public DateTime RequestStartTime;
|
||||||
|
|
||||||
|
|
||||||
|
// Be sure not to accidentally use any public fields that aren't serializable, for example:
|
||||||
|
//public TaskCompletionSource<bool> SerializationFail = new TaskCompletionSource<bool>();
|
||||||
|
//
|
||||||
|
// In the Newtonsoft.Json version at the time of writing, this will not result in an exception but instead hang the flow!
|
||||||
|
|
||||||
|
|
||||||
public SimpleFlowController(IFlowProvider flowProvider, IExampleState exampleState)
|
public SimpleFlowController(IFlowProvider flowProvider, IExampleState exampleState)
|
||||||
{
|
{
|
||||||
this.flowProvider = flowProvider;
|
this.flowProvider = flowProvider;
|
||||||
|
@ -11,10 +11,10 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\Examples\ExampleLib\ExampleLib.csproj" />
|
<ProjectReference Include="..\ExampleLib\ExampleLib.csproj" />
|
||||||
<ProjectReference Include="..\Examples\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
|
<ProjectReference Include="..\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
|
||||||
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
|
<ProjectReference Include="..\..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
|
||||||
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
|
<ProjectReference Include="..\..\Tapeti\Tapeti.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
@ -72,14 +72,16 @@ namespace _07_ParallelizationTest
|
|||||||
|
|
||||||
|
|
||||||
var publisher = dependencyResolver.Resolve<IPublisher>();
|
var publisher = dependencyResolver.Resolve<IPublisher>();
|
||||||
Console.WriteLine($"Publishing {MessageCount * RepeatBatch} messages...");
|
Console.WriteLine($"Publishing first {MessageCount} of {MessageCount * RepeatBatch} messages...");
|
||||||
|
|
||||||
await PublishMessages(publisher, MessageCount * RepeatBatch);
|
await PublishMessages(publisher, MessageCount);
|
||||||
|
|
||||||
|
|
||||||
|
Console.WriteLine("Consuming messages while publishing the rest...");
|
||||||
Console.WriteLine("Consuming messages...");
|
|
||||||
await subscriber.Resume();
|
await subscriber.Resume();
|
||||||
|
|
||||||
|
await PublishMessages(publisher, MessageCount * (RepeatBatch - 1));
|
||||||
|
|
||||||
await waitForDone();
|
await waitForDone();
|
||||||
}
|
}
|
||||||
|
|
@ -22,10 +22,14 @@ namespace Tapeti.Cmd.Commands
|
|||||||
// No more messages on the queue
|
// No more messages on the queue
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
// Since RabbitMQ client 6 we need to copy the body before calling another channel method
|
||||||
|
// like BasicPublish, or the published body will be corrupted
|
||||||
|
var bodyCopy = result.Body.ToArray();
|
||||||
|
|
||||||
|
|
||||||
rateLimiter.Execute(() =>
|
rateLimiter.Execute(() =>
|
||||||
{
|
{
|
||||||
targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body);
|
targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, bodyCopy);
|
||||||
messageCount++;
|
messageCount++;
|
||||||
|
|
||||||
if (RemoveMessages)
|
if (RemoveMessages)
|
||||||
|
@ -79,91 +79,88 @@ namespace Tapeti.Cmd.Mock
|
|||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public bool IsAppIdPresent()
|
public bool IsAppIdPresent() => appIdPresent;
|
||||||
{
|
public bool IsClusterIdPresent() => clusterIdPresent;
|
||||||
throw new NotImplementedException();
|
public bool IsContentEncodingPresent() => contentEncodingPresent;
|
||||||
}
|
public bool IsContentTypePresent() => contentTypePresent;
|
||||||
|
public bool IsCorrelationIdPresent() => correlationIdPresent;
|
||||||
|
public bool IsDeliveryModePresent() => deliveryModePresent;
|
||||||
|
public bool IsExpirationPresent() => expirationPresent;
|
||||||
|
public bool IsHeadersPresent() => headersPresent;
|
||||||
|
public bool IsMessageIdPresent() => messageIdPresent;
|
||||||
|
public bool IsPriorityPresent() => priorityPresent;
|
||||||
|
public bool IsReplyToPresent() => replyToPresent;
|
||||||
|
public bool IsTimestampPresent() => timestampPresent;
|
||||||
|
public bool IsTypePresent() => typePresent;
|
||||||
|
public bool IsUserIdPresent() => userIdPresent;
|
||||||
|
|
||||||
public bool IsClusterIdPresent()
|
|
||||||
{
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsContentEncodingPresent()
|
private bool appIdPresent;
|
||||||
{
|
private string appId;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsContentTypePresent()
|
private bool clusterIdPresent;
|
||||||
{
|
private string clusterId;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsCorrelationIdPresent()
|
private bool contentEncodingPresent;
|
||||||
{
|
private string contentEncoding;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsDeliveryModePresent()
|
private bool contentTypePresent;
|
||||||
{
|
private string contentType;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsExpirationPresent()
|
private bool correlationIdPresent;
|
||||||
{
|
private string correlationId;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsHeadersPresent()
|
private bool deliveryModePresent;
|
||||||
{
|
private byte deliveryMode;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsMessageIdPresent()
|
private bool expirationPresent;
|
||||||
{
|
private string expiration;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsPriorityPresent()
|
private bool headersPresent;
|
||||||
{
|
private IDictionary<string, object> headers;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsReplyToPresent()
|
private bool messageIdPresent;
|
||||||
{
|
private string messageId;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsTimestampPresent()
|
private bool priorityPresent;
|
||||||
{
|
private byte priority;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsTypePresent()
|
private bool replyToPresent;
|
||||||
{
|
private string replyTo;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool IsUserIdPresent()
|
private bool timestampPresent;
|
||||||
{
|
private AmqpTimestamp timestamp;
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
public string AppId { get; set; }
|
private bool typePresent;
|
||||||
public string ClusterId { get; set; }
|
private string type;
|
||||||
public string ContentEncoding { get; set; }
|
|
||||||
public string ContentType { get; set; }
|
private bool userIdPresent;
|
||||||
public string CorrelationId { get; set; }
|
private string userId;
|
||||||
public byte DeliveryMode { get; set; }
|
|
||||||
public string Expiration { get; set; }
|
|
||||||
public IDictionary<string, object> Headers { get; set; }
|
|
||||||
public string MessageId { get; set; }
|
public string AppId { get => appId; set => SetValue(out appId, out appIdPresent, value); }
|
||||||
|
public string ClusterId { get => clusterId; set => SetValue(out clusterId, out clusterIdPresent, value); }
|
||||||
|
public string ContentEncoding { get => contentEncoding; set => SetValue(out contentEncoding, out contentEncodingPresent, value); }
|
||||||
|
public string ContentType { get => contentType; set => SetValue(out contentType, out contentTypePresent, value); }
|
||||||
|
public string CorrelationId { get => correlationId; set => SetValue(out correlationId, out correlationIdPresent, value); }
|
||||||
|
public byte DeliveryMode { get => deliveryMode; set => SetValue(out deliveryMode, out deliveryModePresent, value); }
|
||||||
|
public string Expiration { get => expiration; set => SetValue(out expiration, out expirationPresent, value); }
|
||||||
|
public IDictionary<string, object> Headers { get => headers; set => SetValue(out headers, out headersPresent, value); }
|
||||||
|
public string MessageId { get => messageId; set => SetValue(out messageId, out messageIdPresent, value); }
|
||||||
public bool Persistent { get; set; }
|
public bool Persistent { get; set; }
|
||||||
public byte Priority { get; set; }
|
public byte Priority { get => priority; set => SetValue(out priority, out priorityPresent, value); }
|
||||||
public string ReplyTo { get; set; }
|
public string ReplyTo { get => replyTo; set => SetValue(out replyTo, out replyToPresent, value); }
|
||||||
public PublicationAddress ReplyToAddress { get; set; }
|
public PublicationAddress ReplyToAddress { get; set; }
|
||||||
public AmqpTimestamp Timestamp { get; set; }
|
public AmqpTimestamp Timestamp { get => timestamp; set => SetValue(out timestamp, out timestampPresent, value); }
|
||||||
public string Type { get; set; }
|
public string Type { get => type; set => SetValue(out type, out typePresent, value); }
|
||||||
public string UserId { get; set; }
|
public string UserId { get => userId; set => SetValue(out userId, out userIdPresent, value); }
|
||||||
|
|
||||||
|
|
||||||
|
private static void SetValue<T>(out T field, out bool present, T value)
|
||||||
|
{
|
||||||
|
field = value;
|
||||||
|
present = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,9 +2,11 @@
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.IO;
|
using System.IO;
|
||||||
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using CommandLine;
|
using CommandLine;
|
||||||
using RabbitMQ.Client;
|
using RabbitMQ.Client;
|
||||||
|
using RabbitMQ.Client.Exceptions;
|
||||||
using Tapeti.Cmd.Commands;
|
using Tapeti.Cmd.Commands;
|
||||||
using Tapeti.Cmd.Mock;
|
using Tapeti.Cmd.Mock;
|
||||||
using Tapeti.Cmd.RateLimiter;
|
using Tapeti.Cmd.RateLimiter;
|
||||||
@ -85,6 +87,12 @@ namespace Tapeti.Cmd
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
[Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")]
|
||||||
|
public class ExampleOptions
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
[Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")]
|
[Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")]
|
||||||
public class ShovelOptions : CommonOptions
|
public class ShovelOptions : CommonOptions
|
||||||
{
|
{
|
||||||
@ -131,22 +139,67 @@ namespace Tapeti.Cmd
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
[Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")]
|
[Verb("declarequeue", HelpText = "Declares a durable queue without arguments, compatible with Tapeti.")]
|
||||||
public class ExampleOptions
|
public class DeclareQueueOptions : CommonOptions
|
||||||
{
|
{
|
||||||
|
[Option('q', "queue", Required = true, HelpText = "The name of the queue to declare.")]
|
||||||
|
public string QueueName { get; set; }
|
||||||
|
|
||||||
|
[Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: <exchange>:<routingKey>")]
|
||||||
|
public IEnumerable<string> Bindings { get; set; }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
[Verb("removequeue", HelpText = "Removes a durable queue.")]
|
||||||
|
public class RemoveQueueOptions : CommonOptions
|
||||||
|
{
|
||||||
|
[Option('q', "queue", Required = true, HelpText = "The name of the queue to remove.")]
|
||||||
|
public string QueueName { get; set; }
|
||||||
|
|
||||||
|
[Option("confirm", HelpText = "Confirms the removal of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)]
|
||||||
|
public bool Confirm { get; set; }
|
||||||
|
|
||||||
|
[Option("confirmpurge", HelpText = "Confirms the removal of the specified queue even if there still are messages in the queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)]
|
||||||
|
public bool ConfirmPurge { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
[Verb("bindqueue", HelpText = "Add a binding to a queue.")]
|
||||||
|
public class BindQueueOptions : CommonOptions
|
||||||
|
{
|
||||||
|
[Option('q', "queue", Required = true, HelpText = "The name of the queue to add the binding(s) to.")]
|
||||||
|
public string QueueName { get; set; }
|
||||||
|
|
||||||
|
[Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: <exchange>:<routingKey>")]
|
||||||
|
public IEnumerable<string> Bindings { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
[Verb("unbindqueue", HelpText = "Remove a binding from a queue.")]
|
||||||
|
public class UnbindQueueOptions : CommonOptions
|
||||||
|
{
|
||||||
|
[Option('q', "queue", Required = true, HelpText = "The name of the queue to remove the binding(s) from.")]
|
||||||
|
public string QueueName { get; set; }
|
||||||
|
|
||||||
|
[Option('b', "bindings", Required = false, HelpText = "One or more bindings to remove from the queue. Format: <exchange>:<routingKey>")]
|
||||||
|
public IEnumerable<string> Bindings { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static int Main(string[] args)
|
public static int Main(string[] args)
|
||||||
{
|
{
|
||||||
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions, PurgeOptions, ExampleOptions>(args)
|
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions, PurgeOptions, ExampleOptions,
|
||||||
|
DeclareQueueOptions, RemoveQueueOptions, BindQueueOptions, UnbindQueueOptions>(args)
|
||||||
.MapResult(
|
.MapResult(
|
||||||
(ExportOptions o) => ExecuteVerb(o, RunExport),
|
(ExportOptions o) => ExecuteVerb(o, RunExport),
|
||||||
(ImportOptions o) => ExecuteVerb(o, RunImport),
|
(ImportOptions o) => ExecuteVerb(o, RunImport),
|
||||||
|
(ExampleOptions o) => ExecuteVerb(o, RunExample),
|
||||||
(ShovelOptions o) => ExecuteVerb(o, RunShovel),
|
(ShovelOptions o) => ExecuteVerb(o, RunShovel),
|
||||||
(PurgeOptions o) => ExecuteVerb(o, RunPurge),
|
(PurgeOptions o) => ExecuteVerb(o, RunPurge),
|
||||||
(ExampleOptions o) => ExecuteVerb(o, RunExample),
|
(DeclareQueueOptions o) => ExecuteVerb(o, RunDeclareQueue),
|
||||||
|
(RemoveQueueOptions o) => ExecuteVerb(o, RunRemoveQueue),
|
||||||
|
(BindQueueOptions o) => ExecuteVerb(o, RunBindQueue),
|
||||||
|
(UnbindQueueOptions o) => ExecuteVerb(o, RunUnbindQueue),
|
||||||
errs =>
|
errs =>
|
||||||
{
|
{
|
||||||
if (!Debugger.IsAttached)
|
if (!Debugger.IsAttached)
|
||||||
@ -305,6 +358,33 @@ namespace Tapeti.Cmd
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static void RunExample(ExampleOptions options)
|
||||||
|
{
|
||||||
|
using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false)))
|
||||||
|
{
|
||||||
|
messageSerializer.Serialize(new Message
|
||||||
|
{
|
||||||
|
Exchange = "example",
|
||||||
|
Queue = "example.queue",
|
||||||
|
RoutingKey = "example.routing.key",
|
||||||
|
DeliveryTag = 42,
|
||||||
|
Properties = new MockBasicProperties
|
||||||
|
{
|
||||||
|
ContentType = "application/json",
|
||||||
|
DeliveryMode = 2,
|
||||||
|
Headers = new Dictionary<string, object>
|
||||||
|
{
|
||||||
|
{ "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") }
|
||||||
|
},
|
||||||
|
ReplyTo = "reply.queue",
|
||||||
|
Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds())
|
||||||
|
},
|
||||||
|
Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }")
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static void RunShovel(ShovelOptions options)
|
private static void RunShovel(ShovelOptions options)
|
||||||
{
|
{
|
||||||
int messageCount;
|
int messageCount;
|
||||||
@ -403,34 +483,120 @@ namespace Tapeti.Cmd
|
|||||||
}
|
}
|
||||||
|
|
||||||
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'.");
|
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'.");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static void RunExample(ExampleOptions options)
|
private static void RunDeclareQueue(DeclareQueueOptions options)
|
||||||
{
|
{
|
||||||
using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false)))
|
// Parse early to fail early
|
||||||
|
var bindings = ParseBindings(options.Bindings);
|
||||||
|
|
||||||
|
using (var connection = GetConnection(options))
|
||||||
|
using (var channel = connection.CreateModel())
|
||||||
{
|
{
|
||||||
messageSerializer.Serialize(new Message
|
channel.QueueDeclare(options.QueueName, true, false, false);
|
||||||
{
|
|
||||||
Exchange = "example",
|
foreach (var (exchange, routingKey) in bindings)
|
||||||
Queue = "example.queue",
|
channel.QueueBind(options.QueueName, exchange, routingKey);
|
||||||
RoutingKey = "example.routing.key",
|
|
||||||
DeliveryTag = 42,
|
|
||||||
Properties = new MockBasicProperties
|
|
||||||
{
|
|
||||||
ContentType = "application/json",
|
|
||||||
DeliveryMode = 2,
|
|
||||||
Headers = new Dictionary<string, object>
|
|
||||||
{
|
|
||||||
{ "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") }
|
|
||||||
},
|
|
||||||
ReplyTo = "reply.queue",
|
|
||||||
Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds())
|
|
||||||
},
|
|
||||||
Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }")
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"Queue {options.QueueName} declared with {bindings.Length} binding{(bindings.Length != 1 ? "s" : "")}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static void RunRemoveQueue(RemoveQueueOptions options)
|
||||||
|
{
|
||||||
|
if (!options.Confirm)
|
||||||
|
{
|
||||||
|
Console.Write($"Do you want to remove the queue '{options.QueueName}'? (Y/N) ");
|
||||||
|
var answer = Console.ReadLine();
|
||||||
|
|
||||||
|
if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase))
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint messageCount;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using (var connection = GetConnection(options))
|
||||||
|
using (var channel = connection.CreateModel())
|
||||||
|
{
|
||||||
|
messageCount = channel.QueueDelete(options.QueueName, true, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (OperationInterruptedException e)
|
||||||
|
{
|
||||||
|
if (e.ShutdownReason.ReplyCode == 406)
|
||||||
|
{
|
||||||
|
if (!options.ConfirmPurge)
|
||||||
|
{
|
||||||
|
Console.Write($"There are messages remaining. Do you want to purge the queue '{options.QueueName}'? (Y/N) ");
|
||||||
|
var answer = Console.ReadLine();
|
||||||
|
|
||||||
|
if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase))
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
using (var connection = GetConnection(options))
|
||||||
|
using (var channel = connection.CreateModel())
|
||||||
|
{
|
||||||
|
messageCount = channel.QueueDelete(options.QueueName, true, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine(messageCount == 0
|
||||||
|
? $"Empty or non-existent queue '{options.QueueName}' removed."
|
||||||
|
: $"{messageCount} message{(messageCount != 1 ? "s" : "")} purged while removing '{options.QueueName}'.");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static void RunBindQueue(BindQueueOptions options)
|
||||||
|
{
|
||||||
|
var bindings = ParseBindings(options.Bindings);
|
||||||
|
|
||||||
|
using (var connection = GetConnection(options))
|
||||||
|
using (var channel = connection.CreateModel())
|
||||||
|
{
|
||||||
|
foreach (var (exchange, routingKey) in bindings)
|
||||||
|
channel.QueueBind(options.QueueName, exchange, routingKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} added to queue {options.QueueName}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static void RunUnbindQueue(UnbindQueueOptions options)
|
||||||
|
{
|
||||||
|
var bindings = ParseBindings(options.Bindings);
|
||||||
|
|
||||||
|
using (var connection = GetConnection(options))
|
||||||
|
using (var channel = connection.CreateModel())
|
||||||
|
{
|
||||||
|
foreach (var (exchange, routingKey) in bindings)
|
||||||
|
channel.QueueUnbind(options.QueueName, exchange, routingKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} removed from queue {options.QueueName}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private static Tuple<string, string>[] ParseBindings(IEnumerable<string> bindings)
|
||||||
|
{
|
||||||
|
return bindings
|
||||||
|
.Select(b =>
|
||||||
|
{
|
||||||
|
var parts = b.Split(':');
|
||||||
|
if (parts.Length != 2)
|
||||||
|
throw new InvalidOperationException($"Invalid binding format: {b}");
|
||||||
|
|
||||||
|
return new Tuple<string, string>(parts[0], parts[1]);
|
||||||
|
})
|
||||||
|
.ToArray();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
|
using System.IO;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
using Tapeti.Config;
|
||||||
using Tapeti.Flow.FlowHelpers;
|
using Tapeti.Flow.FlowHelpers;
|
||||||
|
|
||||||
namespace Tapeti.Flow.Default
|
namespace Tapeti.Flow.Default
|
||||||
@ -28,8 +30,10 @@ namespace Tapeti.Flow.Default
|
|||||||
private readonly ConcurrentDictionary<Guid, CachedFlowState> flowStates = new ConcurrentDictionary<Guid, CachedFlowState>();
|
private readonly ConcurrentDictionary<Guid, CachedFlowState> flowStates = new ConcurrentDictionary<Guid, CachedFlowState>();
|
||||||
private readonly ConcurrentDictionary<Guid, Guid> continuationLookup = new ConcurrentDictionary<Guid, Guid>();
|
private readonly ConcurrentDictionary<Guid, Guid> continuationLookup = new ConcurrentDictionary<Guid, Guid>();
|
||||||
private readonly LockCollection<Guid> locks = new LockCollection<Guid>(EqualityComparer<Guid>.Default);
|
private readonly LockCollection<Guid> locks = new LockCollection<Guid>(EqualityComparer<Guid>.Default);
|
||||||
|
private HashSet<string> validatedMethods = null;
|
||||||
|
|
||||||
private readonly IFlowRepository repository;
|
private readonly IFlowRepository repository;
|
||||||
|
private readonly ITapetiConfig config;
|
||||||
|
|
||||||
private volatile bool inUse;
|
private volatile bool inUse;
|
||||||
private volatile bool loaded;
|
private volatile bool loaded;
|
||||||
@ -37,9 +41,10 @@ namespace Tapeti.Flow.Default
|
|||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public FlowStore(IFlowRepository repository)
|
public FlowStore(IFlowRepository repository, ITapetiConfig config)
|
||||||
{
|
{
|
||||||
this.repository = repository;
|
this.repository = repository;
|
||||||
|
this.config = config;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -54,18 +59,55 @@ namespace Tapeti.Flow.Default
|
|||||||
flowStates.Clear();
|
flowStates.Clear();
|
||||||
continuationLookup.Clear();
|
continuationLookup.Clear();
|
||||||
|
|
||||||
|
validatedMethods = new HashSet<string>();
|
||||||
|
try
|
||||||
|
{
|
||||||
foreach (var flowStateRecord in await repository.GetStates<FlowState>())
|
foreach (var flowStateRecord in await repository.GetStates<FlowState>())
|
||||||
{
|
{
|
||||||
flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true));
|
flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true));
|
||||||
|
|
||||||
foreach (var continuation in flowStateRecord.Value.Continuations)
|
foreach (var continuation in flowStateRecord.Value.Continuations)
|
||||||
|
{
|
||||||
|
ValidateContinuation(flowStateRecord.Key, continuation.Key, continuation.Value);
|
||||||
continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key);
|
continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
validatedMethods = null;
|
||||||
|
}
|
||||||
|
|
||||||
loaded = true;
|
loaded = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void ValidateContinuation(Guid flowId, Guid continuationId, ContinuationMetadata metadata)
|
||||||
|
{
|
||||||
|
// We could check all the things that are required for a continuation or converge method, but this should suffice
|
||||||
|
// for the common scenario where you change code without realizing that it's signature has been persisted
|
||||||
|
if (validatedMethods.Add(metadata.MethodName))
|
||||||
|
{
|
||||||
|
var methodInfo = MethodSerializer.Deserialize(metadata.MethodName);
|
||||||
|
if (methodInfo == null)
|
||||||
|
throw new InvalidDataException($"Flow ID {flowId} references continuation method '{metadata.MethodName}' which no longer exists (continuation Id = {continuationId})");
|
||||||
|
|
||||||
|
var binding = config.Bindings.ForMethod(methodInfo);
|
||||||
|
if (binding == null)
|
||||||
|
throw new InvalidDataException($"Flow ID {flowId} references continuation method '{metadata.MethodName}' which no longer has a binding as a message handler (continuation Id = {continuationId})");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (string.IsNullOrEmpty(metadata.ConvergeMethodName) || !validatedMethods.Add(metadata.ConvergeMethodName))
|
||||||
|
return;
|
||||||
|
|
||||||
|
var convergeMethodInfo = MethodSerializer.Deserialize(metadata.ConvergeMethodName);
|
||||||
|
if (convergeMethodInfo == null)
|
||||||
|
throw new InvalidDataException($"Flow ID {flowId} references converge method '{metadata.ConvergeMethodName}' which no longer exists (continuation Id = {continuationId})");
|
||||||
|
|
||||||
|
// Converge methods are not message handlers themselves
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public Task<Guid?> FindFlowID(Guid continuationID)
|
public Task<Guid?> FindFlowID(Guid continuationID)
|
||||||
{
|
{
|
||||||
@ -119,7 +161,7 @@ namespace Tapeti.Flow.Default
|
|||||||
if (flowLock == null)
|
if (flowLock == null)
|
||||||
throw new ObjectDisposedException("FlowStateLock");
|
throw new ObjectDisposedException("FlowStateLock");
|
||||||
|
|
||||||
return Task.FromResult(cachedFlowState.FlowState?.Clone());
|
return Task.FromResult(cachedFlowState?.FlowState?.Clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task StoreFlowState(FlowState newFlowState, bool persistent)
|
public async Task StoreFlowState(FlowState newFlowState, bool persistent)
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
using System.Reflection;
|
using System;
|
||||||
|
using System.Reflection;
|
||||||
|
using System.Text.RegularExpressions;
|
||||||
|
|
||||||
namespace Tapeti.Flow.FlowHelpers
|
namespace Tapeti.Flow.FlowHelpers
|
||||||
{
|
{
|
||||||
@ -15,5 +17,35 @@ namespace Tapeti.Flow.FlowHelpers
|
|||||||
{
|
{
|
||||||
return method.Name + '@' + method.DeclaringType?.Assembly.GetName().Name + ':' + method.DeclaringType?.FullName;
|
return method.Name + '@' + method.DeclaringType?.Assembly.GetName().Name + ':' + method.DeclaringType?.FullName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static readonly Regex DeserializeRegex = new Regex("^(?<method>.+?)@(?<assembly>.+?):(?<type>.+?)$");
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Deserializes the serialized method representation back into it's MethodInfo, or null if not found.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="serializedMethod"></param>
|
||||||
|
public static MethodInfo Deserialize(string serializedMethod)
|
||||||
|
{
|
||||||
|
var match = DeserializeRegex.Match(serializedMethod);
|
||||||
|
if (!match.Success)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
Assembly assembly;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
assembly = Assembly.Load(match.Groups["assembly"].Value);
|
||||||
|
if (assembly == null)
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
var declaringType = assembly.GetType(match.Groups["type"].Value);
|
||||||
|
return declaringType?.GetMethod(match.Groups["method"].Value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
using Tapeti.Config;
|
using Tapeti.Config;
|
||||||
using ISerilogLogger = Serilog.ILogger;
|
using ISerilogLogger = Serilog.ILogger;
|
||||||
|
|
||||||
@ -93,6 +94,14 @@ namespace Tapeti.Serilog
|
|||||||
seriLogger.Information("Tapeti: declaring {queueType} queue {queueName}", durable ? "durable" : "dynamic", queueName);
|
seriLogger.Information("Tapeti: declaring {queueType} queue {queueName}", durable ? "durable" : "dynamic", queueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public void QueueExistsWarning(string queueName, Dictionary<string, string> arguments)
|
||||||
|
{
|
||||||
|
seriLogger.Warning("Tapeti: durable queue {queueName} exists with incompatible x-arguments ({arguments}) and will not be redeclared, queue will be consumed as-is",
|
||||||
|
queueName,
|
||||||
|
arguments);
|
||||||
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
|
public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
|
||||||
{
|
{
|
||||||
|
@ -59,7 +59,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Ta
|
|||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
|
||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "07-ParallelizationTest", "07-ParallelizationTest\07-ParallelizationTest.csproj", "{E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "07-ParallelizationTest", "Examples\07-ParallelizationTest\07-ParallelizationTest.csproj", "{E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}"
|
||||||
EndProject
|
EndProject
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
|
@ -54,9 +54,7 @@ namespace Tapeti.Config
|
|||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled.
|
/// Determines if durable queues are verified at startup if DeclareDurableQueues is disabled.
|
||||||
/// Defaults to true. Disable if you have queues with additional properties like a deadletter
|
/// Defaults to true.
|
||||||
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
|
|
||||||
/// while verifying.
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
bool VerifyDurableQueues { get; }
|
bool VerifyDurableQueues { get; }
|
||||||
}
|
}
|
||||||
|
125
Tapeti/Connection/TapetiChannel.cs
Normal file
125
Tapeti/Connection/TapetiChannel.cs
Normal file
@ -0,0 +1,125 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using RabbitMQ.Client;
|
||||||
|
using RabbitMQ.Client.Exceptions;
|
||||||
|
using Tapeti.Tasks;
|
||||||
|
|
||||||
|
namespace Tapeti.Connection
|
||||||
|
{
|
||||||
|
internal interface ITapetiChannelModelProvider
|
||||||
|
{
|
||||||
|
void WithChannel(Action<IModel> operation);
|
||||||
|
void WithRetryableChannel(Action<IModel> operation);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Represents both a RabbitMQ Client Channel (IModel) as well as it's associated single-thread task queue.
|
||||||
|
/// Access to the IModel is limited by design to enforce this relationship.
|
||||||
|
/// </summary>
|
||||||
|
internal class TapetiChannel
|
||||||
|
{
|
||||||
|
private readonly Func<IModel> modelFactory;
|
||||||
|
private readonly object taskQueueLock = new();
|
||||||
|
private SingleThreadTaskQueue taskQueue;
|
||||||
|
private readonly ModelProvider modelProvider;
|
||||||
|
|
||||||
|
|
||||||
|
public TapetiChannel(Func<IModel> modelFactory)
|
||||||
|
{
|
||||||
|
this.modelFactory = modelFactory;
|
||||||
|
modelProvider = new ModelProvider(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public async Task Reset()
|
||||||
|
{
|
||||||
|
SingleThreadTaskQueue capturedTaskQueue;
|
||||||
|
|
||||||
|
lock (taskQueueLock)
|
||||||
|
{
|
||||||
|
capturedTaskQueue = taskQueue;
|
||||||
|
taskQueue = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (capturedTaskQueue == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
await capturedTaskQueue.Add(() => { });
|
||||||
|
capturedTaskQueue.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public Task Queue(Action<IModel> operation)
|
||||||
|
{
|
||||||
|
return GetTaskQueue().Add(() =>
|
||||||
|
{
|
||||||
|
modelProvider.WithChannel(operation);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public Task QueueRetryable(Action<IModel> operation)
|
||||||
|
{
|
||||||
|
return GetTaskQueue().Add(() =>
|
||||||
|
{
|
||||||
|
modelProvider.WithRetryableChannel(operation);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public Task QueueWithProvider(Func<ITapetiChannelModelProvider, Task> operation)
|
||||||
|
{
|
||||||
|
return GetTaskQueue().Add(async () =>
|
||||||
|
{
|
||||||
|
await operation(modelProvider);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private SingleThreadTaskQueue GetTaskQueue()
|
||||||
|
{
|
||||||
|
lock (taskQueueLock)
|
||||||
|
{
|
||||||
|
return taskQueue ??= new SingleThreadTaskQueue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private class ModelProvider : ITapetiChannelModelProvider
|
||||||
|
{
|
||||||
|
private readonly TapetiChannel owner;
|
||||||
|
|
||||||
|
|
||||||
|
public ModelProvider(TapetiChannel owner)
|
||||||
|
{
|
||||||
|
this.owner = owner;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void WithChannel(Action<IModel> operation)
|
||||||
|
{
|
||||||
|
operation(owner.modelFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void WithRetryableChannel(Action<IModel> operation)
|
||||||
|
{
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
operation(owner.modelFactory());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
catch (AlreadyClosedException)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -13,10 +13,16 @@ using RabbitMQ.Client.Exceptions;
|
|||||||
using Tapeti.Config;
|
using Tapeti.Config;
|
||||||
using Tapeti.Default;
|
using Tapeti.Default;
|
||||||
using Tapeti.Exceptions;
|
using Tapeti.Exceptions;
|
||||||
using Tapeti.Tasks;
|
|
||||||
|
|
||||||
namespace Tapeti.Connection
|
namespace Tapeti.Connection
|
||||||
{
|
{
|
||||||
|
internal enum TapetiChannelType
|
||||||
|
{
|
||||||
|
Consume,
|
||||||
|
Publish
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Implementation of ITapetiClient for the RabbitMQ Client library
|
/// Implementation of ITapetiClient for the RabbitMQ Client library
|
||||||
@ -39,23 +45,27 @@ namespace Tapeti.Connection
|
|||||||
public IConnectionEventListener ConnectionEventListener { get; set; }
|
public IConnectionEventListener ConnectionEventListener { get; set; }
|
||||||
|
|
||||||
|
|
||||||
private readonly Lazy<SingleThreadTaskQueue> taskQueue = new Lazy<SingleThreadTaskQueue>();
|
private readonly TapetiChannel consumeChannel;
|
||||||
|
private readonly TapetiChannel publishChannel;
|
||||||
|
private readonly HttpClient managementClient;
|
||||||
|
|
||||||
|
// These fields must be locked using connectionLock
|
||||||
// These fields are for use in the taskQueue only!
|
private readonly object connectionLock = new();
|
||||||
private RabbitMQ.Client.IConnection connection;
|
private RabbitMQ.Client.IConnection connection;
|
||||||
|
private IModel consumeChannelModel;
|
||||||
|
private IModel publishChannelModel;
|
||||||
private bool isClosing;
|
private bool isClosing;
|
||||||
private bool isReconnect;
|
private bool isReconnect;
|
||||||
private IModel channelInstance;
|
|
||||||
private ulong lastDeliveryTag;
|
|
||||||
private DateTime connectedDateTime;
|
private DateTime connectedDateTime;
|
||||||
private readonly HttpClient managementClient;
|
|
||||||
private readonly HashSet<string> deletedQueues = new HashSet<string>();
|
|
||||||
|
|
||||||
// These fields must be locked, since the callbacks for BasicAck/BasicReturn can run in a different thread
|
// These fields are for use in a single TapetiChannel's queue only!
|
||||||
private readonly object confirmLock = new object();
|
private ulong lastDeliveryTag;
|
||||||
private readonly Dictionary<ulong, ConfirmMessageInfo> confirmMessages = new Dictionary<ulong, ConfirmMessageInfo>();
|
private readonly HashSet<string> deletedQueues = new();
|
||||||
private readonly Dictionary<string, ReturnInfo> returnRoutingKeys = new Dictionary<string, ReturnInfo>();
|
|
||||||
|
// These fields must be locked using confirmLock, since the callbacks for BasicAck/BasicReturn can run in a different thread
|
||||||
|
private readonly object confirmLock = new();
|
||||||
|
private readonly Dictionary<ulong, ConfirmMessageInfo> confirmMessages = new();
|
||||||
|
private readonly Dictionary<string, ReturnInfo> returnRoutingKeys = new();
|
||||||
|
|
||||||
|
|
||||||
private class ConfirmMessageInfo
|
private class ConfirmMessageInfo
|
||||||
@ -79,6 +89,9 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
logger = config.DependencyResolver.Resolve<ILogger>();
|
logger = config.DependencyResolver.Resolve<ILogger>();
|
||||||
|
|
||||||
|
consumeChannel = new TapetiChannel(() => GetModel(TapetiChannelType.Consume));
|
||||||
|
publishChannel = new TapetiChannel(() => GetModel(TapetiChannelType.Publish));
|
||||||
|
|
||||||
|
|
||||||
var handler = new HttpClientHandler
|
var handler = new HttpClientHandler
|
||||||
{
|
{
|
||||||
@ -100,7 +113,8 @@ namespace Tapeti.Connection
|
|||||||
if (string.IsNullOrEmpty(routingKey))
|
if (string.IsNullOrEmpty(routingKey))
|
||||||
throw new ArgumentNullException(nameof(routingKey));
|
throw new ArgumentNullException(nameof(routingKey));
|
||||||
|
|
||||||
await taskQueue.Value.Add(async () =>
|
|
||||||
|
await GetTapetiChannel(TapetiChannelType.Publish).QueueWithProvider(async channelProvider =>
|
||||||
{
|
{
|
||||||
Task<int> publishResultTask = null;
|
Task<int> publishResultTask = null;
|
||||||
var messageInfo = new ConfirmMessageInfo
|
var messageInfo = new ConfirmMessageInfo
|
||||||
@ -110,7 +124,7 @@ namespace Tapeti.Connection
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
WithRetryableChannel(channel =>
|
channelProvider.WithRetryableChannel(channel =>
|
||||||
{
|
{
|
||||||
DeclareExchange(channel, exchange);
|
DeclareExchange(channel, exchange);
|
||||||
|
|
||||||
@ -169,15 +183,18 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
var replyCode = publishResultTask.Result;
|
var replyCode = publishResultTask.Result;
|
||||||
|
|
||||||
|
switch (replyCode)
|
||||||
|
{
|
||||||
// There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code
|
// There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code
|
||||||
// at the time of writing...
|
// at the time of writing...
|
||||||
if (replyCode == 312)
|
case 312:
|
||||||
throw new NoRouteException(
|
throw new NoRouteException(
|
||||||
$"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' does not have a route");
|
$"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' does not have a route");
|
||||||
|
|
||||||
if (replyCode > 0)
|
case > 0:
|
||||||
throw new NoRouteException(
|
throw new NoRouteException(
|
||||||
$"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' could not be delivered, reply code: {replyCode}");
|
$"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' could not be delivered, reply code: {replyCode}");
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,7 +211,7 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
string consumerTag = null;
|
string consumerTag = null;
|
||||||
|
|
||||||
await QueueWithRetryableChannel(channel =>
|
await GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(channel =>
|
||||||
{
|
{
|
||||||
if (cancellationToken.IsCancellationRequested)
|
if (cancellationToken.IsCancellationRequested)
|
||||||
return;
|
return;
|
||||||
@ -215,7 +232,7 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
// No need for a retryable channel here, if the connection is lost
|
// No need for a retryable channel here, if the connection is lost
|
||||||
// so is the consumer.
|
// so is the consumer.
|
||||||
await Queue(channel =>
|
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
|
||||||
{
|
{
|
||||||
channel.BasicCancel(consumerTag);
|
channel.BasicCancel(consumerTag);
|
||||||
});
|
});
|
||||||
@ -224,7 +241,7 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
private async Task Respond(ulong deliveryTag, ConsumeResult result)
|
private async Task Respond(ulong deliveryTag, ConsumeResult result)
|
||||||
{
|
{
|
||||||
await Queue(channel =>
|
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
|
||||||
{
|
{
|
||||||
// No need for a retryable channel here, if the connection is lost we can't
|
// No need for a retryable channel here, if the connection is lost we can't
|
||||||
// use the deliveryTag anymore.
|
// use the deliveryTag anymore.
|
||||||
@ -250,21 +267,42 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private async Task<bool> GetDurableQueueDeclareRequired(string queueName)
|
||||||
|
{
|
||||||
|
var existingQueue = await GetQueueInfo(queueName);
|
||||||
|
if (existingQueue == null)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (!existingQueue.Durable || existingQueue.AutoDelete || existingQueue.Exclusive)
|
||||||
|
throw new InvalidOperationException($"Durable queue {queueName} already exists with incompatible parameters, durable = {existingQueue.Durable} (expected True), autoDelete = {existingQueue.AutoDelete} (expected False), exclusive = {existingQueue.Exclusive} (expected False)");
|
||||||
|
|
||||||
|
if (existingQueue.Arguments.Count <= 0)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
(logger as IBindingLogger)?.QueueExistsWarning(queueName, existingQueue.Arguments);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async Task DurableQueueDeclare(CancellationToken cancellationToken, string queueName, IEnumerable<QueueBinding> bindings)
|
public async Task DurableQueueDeclare(CancellationToken cancellationToken, string queueName, IEnumerable<QueueBinding> bindings)
|
||||||
{
|
{
|
||||||
|
var declareRequired = await GetDurableQueueDeclareRequired(queueName);
|
||||||
|
|
||||||
var existingBindings = (await GetQueueBindings(queueName)).ToList();
|
var existingBindings = (await GetQueueBindings(queueName)).ToList();
|
||||||
var currentBindings = bindings.ToList();
|
var currentBindings = bindings.ToList();
|
||||||
var bindingLogger = logger as IBindingLogger;
|
var bindingLogger = logger as IBindingLogger;
|
||||||
|
|
||||||
await Queue(channel =>
|
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
|
||||||
{
|
{
|
||||||
if (cancellationToken.IsCancellationRequested)
|
if (cancellationToken.IsCancellationRequested)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
if (declareRequired)
|
||||||
|
{
|
||||||
bindingLogger?.QueueDeclare(queueName, true, false);
|
bindingLogger?.QueueDeclare(queueName, true, false);
|
||||||
channel.QueueDeclare(queueName, true, false, false);
|
channel.QueueDeclare(queueName, true, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
foreach (var binding in currentBindings.Except(existingBindings))
|
foreach (var binding in currentBindings.Except(existingBindings))
|
||||||
{
|
{
|
||||||
@ -284,7 +322,10 @@ namespace Tapeti.Connection
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async Task DurableQueueVerify(CancellationToken cancellationToken, string queueName)
|
public async Task DurableQueueVerify(CancellationToken cancellationToken, string queueName)
|
||||||
{
|
{
|
||||||
await Queue(channel =>
|
if (!await GetDurableQueueDeclareRequired(queueName))
|
||||||
|
return;
|
||||||
|
|
||||||
|
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
|
||||||
{
|
{
|
||||||
if (cancellationToken.IsCancellationRequested)
|
if (cancellationToken.IsCancellationRequested)
|
||||||
return;
|
return;
|
||||||
@ -302,7 +343,7 @@ namespace Tapeti.Connection
|
|||||||
{
|
{
|
||||||
uint deletedMessages = 0;
|
uint deletedMessages = 0;
|
||||||
|
|
||||||
await Queue(channel =>
|
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
|
||||||
{
|
{
|
||||||
if (cancellationToken.IsCancellationRequested)
|
if (cancellationToken.IsCancellationRequested)
|
||||||
return;
|
return;
|
||||||
@ -316,7 +357,7 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
await taskQueue.Value.Add(async () =>
|
await GetTapetiChannel(TapetiChannelType.Consume).QueueWithProvider(async channelProvider =>
|
||||||
{
|
{
|
||||||
bool retry;
|
bool retry;
|
||||||
do
|
do
|
||||||
@ -343,8 +384,10 @@ namespace Tapeti.Connection
|
|||||||
// includes the GetQueueInfo, the next time around it should have Messages > 0
|
// includes the GetQueueInfo, the next time around it should have Messages > 0
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var channel = GetChannel();
|
channelProvider.WithChannel(channel =>
|
||||||
|
{
|
||||||
channel.QueueDelete(queueName, false, true);
|
channel.QueueDelete(queueName, false, true);
|
||||||
|
});
|
||||||
|
|
||||||
deletedQueues.Add(queueName);
|
deletedQueues.Add(queueName);
|
||||||
(logger as IBindingLogger)?.QueueObsolete(queueName, true, 0);
|
(logger as IBindingLogger)?.QueueObsolete(queueName, true, 0);
|
||||||
@ -364,10 +407,11 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
if (existingBindings.Count > 0)
|
if (existingBindings.Count > 0)
|
||||||
{
|
{
|
||||||
var channel = GetChannel();
|
channelProvider.WithChannel(channel =>
|
||||||
|
{
|
||||||
foreach (var binding in existingBindings)
|
foreach (var binding in existingBindings)
|
||||||
channel.QueueUnbind(queueName, binding.Exchange, binding.RoutingKey);
|
channel.QueueUnbind(queueName, binding.Exchange, binding.RoutingKey);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
(logger as IBindingLogger)?.QueueObsolete(queueName, false, queueInfo.Messages);
|
(logger as IBindingLogger)?.QueueObsolete(queueName, false, queueInfo.Messages);
|
||||||
@ -383,7 +427,7 @@ namespace Tapeti.Connection
|
|||||||
string queueName = null;
|
string queueName = null;
|
||||||
var bindingLogger = logger as IBindingLogger;
|
var bindingLogger = logger as IBindingLogger;
|
||||||
|
|
||||||
await Queue(channel =>
|
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
|
||||||
{
|
{
|
||||||
if (cancellationToken.IsCancellationRequested)
|
if (cancellationToken.IsCancellationRequested)
|
||||||
return;
|
return;
|
||||||
@ -407,7 +451,7 @@ namespace Tapeti.Connection
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding)
|
public async Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding)
|
||||||
{
|
{
|
||||||
await Queue(channel =>
|
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
|
||||||
{
|
{
|
||||||
if (cancellationToken.IsCancellationRequested)
|
if (cancellationToken.IsCancellationRequested)
|
||||||
return;
|
return;
|
||||||
@ -422,32 +466,46 @@ namespace Tapeti.Connection
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async Task Close()
|
public async Task Close()
|
||||||
{
|
{
|
||||||
if (!taskQueue.IsValueCreated)
|
IModel capturedConsumeModel;
|
||||||
return;
|
IModel capturedPublishModel;
|
||||||
|
RabbitMQ.Client.IConnection capturedConnection;
|
||||||
|
|
||||||
await taskQueue.Value.Add(() =>
|
lock (connectionLock)
|
||||||
{
|
{
|
||||||
isClosing = true;
|
isClosing = true;
|
||||||
|
capturedConsumeModel = consumeChannelModel;
|
||||||
|
capturedPublishModel = publishChannelModel;
|
||||||
|
capturedConnection = connection;
|
||||||
|
|
||||||
if (channelInstance != null)
|
consumeChannelModel = null;
|
||||||
{
|
publishChannelModel = null;
|
||||||
channelInstance.Dispose();
|
|
||||||
channelInstance = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReSharper disable once InvertIf
|
|
||||||
if (connection != null)
|
|
||||||
{
|
|
||||||
connection.Dispose();
|
|
||||||
connection = null;
|
connection = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
taskQueue.Value.Dispose();
|
// Empty the queue
|
||||||
});
|
await consumeChannel.Reset();
|
||||||
|
await publishChannel.Reset();
|
||||||
|
|
||||||
|
// No need to close the channels as the connection will be closed
|
||||||
|
capturedConsumeModel.Dispose();
|
||||||
|
capturedPublishModel.Dispose();
|
||||||
|
|
||||||
|
// ReSharper disable once InvertIf
|
||||||
|
if (capturedConnection != null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
capturedConnection.Close();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
capturedConnection.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static readonly List<HttpStatusCode> TransientStatusCodes = new List<HttpStatusCode>
|
private static readonly List<HttpStatusCode> TransientStatusCodes = new()
|
||||||
{
|
{
|
||||||
HttpStatusCode.GatewayTimeout,
|
HttpStatusCode.GatewayTimeout,
|
||||||
HttpStatusCode.RequestTimeout,
|
HttpStatusCode.RequestTimeout,
|
||||||
@ -457,6 +515,24 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
private class ManagementQueueInfo
|
private class ManagementQueueInfo
|
||||||
{
|
{
|
||||||
|
[JsonProperty("name")]
|
||||||
|
public string Name { get; set; }
|
||||||
|
|
||||||
|
[JsonProperty("vhost")]
|
||||||
|
public string VHost { get; set; }
|
||||||
|
|
||||||
|
[JsonProperty("durable")]
|
||||||
|
public bool Durable { get; set; }
|
||||||
|
|
||||||
|
[JsonProperty("auto_delete")]
|
||||||
|
public bool AutoDelete { get; set; }
|
||||||
|
|
||||||
|
[JsonProperty("exclusive")]
|
||||||
|
public bool Exclusive { get; set; }
|
||||||
|
|
||||||
|
[JsonProperty("arguments")]
|
||||||
|
public Dictionary<string, string> Arguments { get; set; }
|
||||||
|
|
||||||
[JsonProperty("messages")]
|
[JsonProperty("messages")]
|
||||||
public uint Messages { get; set; }
|
public uint Messages { get; set; }
|
||||||
}
|
}
|
||||||
@ -543,10 +619,15 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
private async Task<T> WithRetryableManagementAPI<T>(string path, Func<HttpResponseMessage, Task<T>> handleResponse)
|
private async Task<T> WithRetryableManagementAPI<T>(string path, Func<HttpResponseMessage, Task<T>> handleResponse)
|
||||||
{
|
{
|
||||||
var requestUri = new Uri($"http://{connectionParams.HostName}:{connectionParams.ManagementPort}/api/{path}");
|
// Workaround for: https://github.com/dotnet/runtime/issues/23581#issuecomment-354391321
|
||||||
|
// "localhost" can cause a 1 second delay *per call*. Not an issue in production scenarios, but annoying while debugging.
|
||||||
|
var hostName = connectionParams.HostName;
|
||||||
|
if (hostName.Equals("localhost", StringComparison.InvariantCultureIgnoreCase))
|
||||||
|
hostName = "127.0.0.1";
|
||||||
|
|
||||||
using (var request = new HttpRequestMessage(HttpMethod.Get, requestUri))
|
var requestUri = new Uri($"http://{hostName}:{connectionParams.ManagementPort}/api/{path}");
|
||||||
{
|
|
||||||
|
using var request = new HttpRequestMessage(HttpMethod.Get, requestUri);
|
||||||
var retryDelayIndex = 0;
|
var retryDelayIndex = 0;
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
@ -574,10 +655,9 @@ namespace Tapeti.Connection
|
|||||||
retryDelayIndex++;
|
retryDelayIndex++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private readonly HashSet<string> declaredExchanges = new HashSet<string>();
|
private readonly HashSet<string> declaredExchanges = new();
|
||||||
|
|
||||||
private void DeclareExchange(IModel channel, string exchange)
|
private void DeclareExchange(IModel channel, string exchange)
|
||||||
{
|
{
|
||||||
@ -593,22 +673,11 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private async Task Queue(Action<IModel> operation)
|
private TapetiChannel GetTapetiChannel(TapetiChannelType channelType)
|
||||||
{
|
{
|
||||||
await taskQueue.Value.Add(() =>
|
return channelType == TapetiChannelType.Publish
|
||||||
{
|
? publishChannel
|
||||||
var channel = GetChannel();
|
: consumeChannel;
|
||||||
operation(channel);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private async Task QueueWithRetryableChannel(Action<IModel> operation)
|
|
||||||
{
|
|
||||||
await taskQueue.Value.Add(() =>
|
|
||||||
{
|
|
||||||
WithRetryableChannel(operation);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -616,30 +685,16 @@ namespace Tapeti.Connection
|
|||||||
/// Only call this from a task in the taskQueue to ensure IModel is only used
|
/// Only call this from a task in the taskQueue to ensure IModel is only used
|
||||||
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
|
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
|
||||||
/// </remarks>
|
/// </remarks>
|
||||||
private void WithRetryableChannel(Action<IModel> operation)
|
private IModel GetModel(TapetiChannelType channelType)
|
||||||
{
|
{
|
||||||
while (true)
|
lock (connectionLock)
|
||||||
{
|
{
|
||||||
try
|
var channel = channelType == TapetiChannelType.Publish
|
||||||
{
|
? publishChannelModel
|
||||||
operation(GetChannel());
|
: consumeChannelModel;
|
||||||
break;
|
|
||||||
}
|
|
||||||
catch (AlreadyClosedException)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if (channel != null && channel.IsOpen)
|
||||||
/// <remarks>
|
return channel;
|
||||||
/// Only call this from a task in the taskQueue to ensure IModel is only used
|
|
||||||
/// by a single thread, as is recommended in the RabbitMQ .NET Client documentation.
|
|
||||||
/// </remarks>
|
|
||||||
private IModel GetChannel()
|
|
||||||
{
|
|
||||||
if (channelInstance != null && channelInstance.IsOpen)
|
|
||||||
return channelInstance;
|
|
||||||
|
|
||||||
// If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ
|
// If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ
|
||||||
// not related to the connection), wait for a bit to avoid spamming the connection
|
// not related to the connection), wait for a bit to avoid spamming the connection
|
||||||
@ -673,14 +728,32 @@ namespace Tapeti.Connection
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
if (connection != null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
connection.Close();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
connection.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
connection = null;
|
||||||
|
}
|
||||||
|
|
||||||
logger.Connect(new ConnectContext(connectionParams, isReconnect));
|
logger.Connect(new ConnectContext(connectionParams, isReconnect));
|
||||||
|
|
||||||
connection = connectionFactory.CreateConnection();
|
connection = connectionFactory.CreateConnection();
|
||||||
channelInstance = connection.CreateModel();
|
consumeChannelModel = connection.CreateModel();
|
||||||
|
if (consumeChannel == null)
|
||||||
if (channelInstance == null)
|
|
||||||
throw new BrokerUnreachableException(null);
|
throw new BrokerUnreachableException(null);
|
||||||
|
|
||||||
|
publishChannelModel = connection.CreateModel();
|
||||||
|
if (publishChannel == null)
|
||||||
|
throw new BrokerUnreachableException(null);
|
||||||
|
|
||||||
|
|
||||||
if (config.Features.PublisherConfirms)
|
if (config.Features.PublisherConfirms)
|
||||||
{
|
{
|
||||||
lastDeliveryTag = 0;
|
lastDeliveryTag = 0;
|
||||||
@ -698,14 +771,23 @@ namespace Tapeti.Connection
|
|||||||
Monitor.Exit(confirmLock);
|
Monitor.Exit(confirmLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
channelInstance.ConfirmSelect();
|
publishChannelModel.ConfirmSelect();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (connectionParams.PrefetchCount > 0)
|
if (connectionParams.PrefetchCount > 0)
|
||||||
channelInstance.BasicQos(0, connectionParams.PrefetchCount, false);
|
consumeChannelModel.BasicQos(0, connectionParams.PrefetchCount, false);
|
||||||
|
|
||||||
channelInstance.ModelShutdown += (sender, e) =>
|
var capturedConsumeChannelModel = consumeChannelModel;
|
||||||
|
consumeChannelModel.ModelShutdown += (_, e) =>
|
||||||
{
|
{
|
||||||
|
lock (connectionLock)
|
||||||
|
{
|
||||||
|
if (consumeChannelModel == null || consumeChannelModel != capturedConsumeChannelModel)
|
||||||
|
return;
|
||||||
|
|
||||||
|
consumeChannelModel = null;
|
||||||
|
}
|
||||||
|
|
||||||
ConnectionEventListener?.Disconnected(new DisconnectedEventArgs
|
ConnectionEventListener?.Disconnected(new DisconnectedEventArgs
|
||||||
{
|
{
|
||||||
ReplyCode = e.ReplyCode,
|
ReplyCode = e.ReplyCode,
|
||||||
@ -714,15 +796,29 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText));
|
logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText));
|
||||||
|
|
||||||
channelInstance = null;
|
// Reconnect if the disconnect was unexpected
|
||||||
|
|
||||||
if (!isClosing)
|
if (!isClosing)
|
||||||
taskQueue.Value.Add(() => WithRetryableChannel(channel => { }));
|
GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(_ => { });
|
||||||
};
|
};
|
||||||
|
|
||||||
channelInstance.BasicReturn += HandleBasicReturn;
|
var capturedPublishChannelModel = publishChannelModel;
|
||||||
channelInstance.BasicAcks += HandleBasicAck;
|
publishChannelModel.ModelShutdown += (_, _) =>
|
||||||
channelInstance.BasicNacks += HandleBasicNack;
|
{
|
||||||
|
lock (connectionLock)
|
||||||
|
{
|
||||||
|
if (publishChannelModel == null || publishChannelModel != capturedPublishChannelModel)
|
||||||
|
return;
|
||||||
|
|
||||||
|
publishChannelModel = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// No need to reconnect, the next Publish will
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
publishChannelModel.BasicReturn += HandleBasicReturn;
|
||||||
|
publishChannelModel.BasicAcks += HandleBasicAck;
|
||||||
|
publishChannelModel.BasicNacks += HandleBasicNack;
|
||||||
|
|
||||||
connectedDateTime = DateTime.UtcNow;
|
connectedDateTime = DateTime.UtcNow;
|
||||||
|
|
||||||
@ -749,7 +845,10 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return channelInstance;
|
return channelType == TapetiChannelType.Publish
|
||||||
|
? publishChannelModel
|
||||||
|
: consumeChannelModel;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -42,11 +42,18 @@ namespace Tapeti.Connection
|
|||||||
public async Task<ConsumeResult> Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body)
|
public async Task<ConsumeResult> Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body)
|
||||||
{
|
{
|
||||||
object message = null;
|
object message = null;
|
||||||
|
try
|
||||||
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
message = messageSerializer.Deserialize(body, properties);
|
message = messageSerializer.Deserialize(body, properties);
|
||||||
if (message == null)
|
if (message == null)
|
||||||
throw new ArgumentException("Message body could not be deserialized into a message object", nameof(body));
|
throw new ArgumentException($"Message body for routing key '{routingKey}' could not be deserialized into a message object", nameof(body));
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
throw new ArgumentException($"Message body for routing key '{routingKey}' could not be deserialized into a message object: {e.Message}", nameof(body), e);
|
||||||
|
}
|
||||||
|
|
||||||
return await DispatchMessage(message, new MessageContextData
|
return await DispatchMessage(message, new MessageContextData
|
||||||
{
|
{
|
||||||
@ -57,7 +64,7 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
catch (Exception dispatchException)
|
catch (Exception dispatchException)
|
||||||
{
|
{
|
||||||
using (var emptyContext = new MessageContext
|
await using var emptyContext = new MessageContext
|
||||||
{
|
{
|
||||||
Config = config,
|
Config = config,
|
||||||
Queue = queueName,
|
Queue = queueName,
|
||||||
@ -66,14 +73,13 @@ namespace Tapeti.Connection
|
|||||||
Message = message,
|
Message = message,
|
||||||
Properties = properties,
|
Properties = properties,
|
||||||
Binding = null
|
Binding = null
|
||||||
})
|
};
|
||||||
{
|
|
||||||
var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException);
|
var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException);
|
||||||
HandleException(exceptionContext);
|
HandleException(exceptionContext);
|
||||||
return exceptionContext.ConsumeResult;
|
return exceptionContext.ConsumeResult;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private async Task<ConsumeResult> DispatchMessage(object message, MessageContextData messageContextData)
|
private async Task<ConsumeResult> DispatchMessage(object message, MessageContextData messageContextData)
|
||||||
@ -100,7 +106,7 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
private async Task<ConsumeResult> InvokeUsingBinding(object message, MessageContextData messageContextData, IBinding binding)
|
private async Task<ConsumeResult> InvokeUsingBinding(object message, MessageContextData messageContextData, IBinding binding)
|
||||||
{
|
{
|
||||||
using (var context = new MessageContext
|
await using var context = new MessageContext
|
||||||
{
|
{
|
||||||
Config = config,
|
Config = config,
|
||||||
Queue = queueName,
|
Queue = queueName,
|
||||||
@ -109,8 +115,8 @@ namespace Tapeti.Connection
|
|||||||
Message = message,
|
Message = message,
|
||||||
Properties = messageContextData.Properties,
|
Properties = messageContextData.Properties,
|
||||||
Binding = binding
|
Binding = binding
|
||||||
})
|
};
|
||||||
{
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await MiddlewareHelper.GoAsync(config.Middleware.Message,
|
await MiddlewareHelper.GoAsync(config.Middleware.Message,
|
||||||
@ -129,7 +135,6 @@ namespace Tapeti.Connection
|
|||||||
return exceptionContext.ConsumeResult;
|
return exceptionContext.ConsumeResult;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private void HandleException(ExceptionStrategyContext exceptionContext)
|
private void HandleException(ExceptionStrategyContext exceptionContext)
|
||||||
@ -158,18 +163,12 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
private static bool IgnoreExceptionDuringShutdown(Exception e)
|
private static bool IgnoreExceptionDuringShutdown(Exception e)
|
||||||
{
|
{
|
||||||
switch (e)
|
return e switch
|
||||||
{
|
{
|
||||||
case AggregateException aggregateException:
|
AggregateException aggregateException => aggregateException.InnerExceptions.Any(IgnoreExceptionDuringShutdown),
|
||||||
return aggregateException.InnerExceptions.Any(IgnoreExceptionDuringShutdown);
|
TaskCanceledException or OperationCanceledException => true,
|
||||||
|
_ => e.InnerException != null && IgnoreExceptionDuringShutdown(e.InnerException)
|
||||||
case TaskCanceledException _:
|
};
|
||||||
case OperationCanceledException _: // thrown by CancellationTokenSource.ThrowIfCancellationRequested
|
|
||||||
return true;
|
|
||||||
|
|
||||||
default:
|
|
||||||
return e.InnerException != null && IgnoreExceptionDuringShutdown(e.InnerException);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -118,9 +118,7 @@ namespace Tapeti.Connection
|
|||||||
{
|
{
|
||||||
var writableProperties = new MessageProperties(properties);
|
var writableProperties = new MessageProperties(properties);
|
||||||
|
|
||||||
if (!writableProperties.Timestamp.HasValue)
|
writableProperties.Timestamp ??= DateTime.UtcNow;
|
||||||
writableProperties.Timestamp = DateTime.UtcNow;
|
|
||||||
|
|
||||||
writableProperties.Persistent = true;
|
writableProperties.Persistent = true;
|
||||||
|
|
||||||
|
|
||||||
|
@ -13,7 +13,7 @@ namespace Tapeti.Connection
|
|||||||
private readonly Func<ITapetiClient> clientFactory;
|
private readonly Func<ITapetiClient> clientFactory;
|
||||||
private readonly ITapetiConfig config;
|
private readonly ITapetiConfig config;
|
||||||
private bool consuming;
|
private bool consuming;
|
||||||
private readonly List<string> consumerTags = new List<string>();
|
private readonly List<string> consumerTags = new();
|
||||||
|
|
||||||
private CancellationTokenSource initializeCancellationTokenSource;
|
private CancellationTokenSource initializeCancellationTokenSource;
|
||||||
|
|
||||||
@ -166,7 +166,7 @@ namespace Tapeti.Connection
|
|||||||
public List<Type> MessageClasses;
|
public List<Type> MessageClasses;
|
||||||
}
|
}
|
||||||
|
|
||||||
private readonly Dictionary<string, List<DynamicQueueInfo>> dynamicQueues = new Dictionary<string, List<DynamicQueueInfo>>();
|
private readonly Dictionary<string, List<DynamicQueueInfo>> dynamicQueues = new();
|
||||||
|
|
||||||
|
|
||||||
protected CustomBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken)
|
protected CustomBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken)
|
||||||
@ -277,8 +277,8 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
private class DeclareDurableQueuesBindingTarget : CustomBindingTarget
|
private class DeclareDurableQueuesBindingTarget : CustomBindingTarget
|
||||||
{
|
{
|
||||||
private readonly Dictionary<string, List<Type>> durableQueues = new Dictionary<string, List<Type>>();
|
private readonly Dictionary<string, List<Type>> durableQueues = new();
|
||||||
private readonly HashSet<string> obsoleteDurableQueues = new HashSet<string>();
|
private readonly HashSet<string> obsoleteDurableQueues = new();
|
||||||
|
|
||||||
|
|
||||||
public DeclareDurableQueuesBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
|
public DeclareDurableQueuesBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
|
||||||
@ -358,7 +358,7 @@ namespace Tapeti.Connection
|
|||||||
|
|
||||||
private class PassiveDurableQueuesBindingTarget : CustomBindingTarget
|
private class PassiveDurableQueuesBindingTarget : CustomBindingTarget
|
||||||
{
|
{
|
||||||
private readonly List<string> durableQueues = new List<string>();
|
private readonly List<string> durableQueues = new();
|
||||||
|
|
||||||
|
|
||||||
public PassiveDurableQueuesBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
|
public PassiveDurableQueuesBindingTarget(Func<ITapetiClient> clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken)
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Text;
|
||||||
using Tapeti.Config;
|
using Tapeti.Config;
|
||||||
|
|
||||||
namespace Tapeti.Default
|
namespace Tapeti.Default
|
||||||
@ -60,6 +62,21 @@ namespace Tapeti.Default
|
|||||||
: $"[Tapeti] Verifying durable queue {queueName}");
|
: $"[Tapeti] Verifying durable queue {queueName}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public void QueueExistsWarning(string queueName, Dictionary<string, string> arguments)
|
||||||
|
{
|
||||||
|
var argumentsText = new StringBuilder();
|
||||||
|
foreach (var pair in arguments)
|
||||||
|
{
|
||||||
|
if (argumentsText.Length > 0)
|
||||||
|
argumentsText.Append(", ");
|
||||||
|
|
||||||
|
argumentsText.Append($"{pair.Key} = {pair.Value}");
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"[Tapeti] Durable queue {queueName} exists with incompatible x-arguments ({argumentsText}) and will not be redeclared, queue will be consumed as-is");
|
||||||
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
|
public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
|
||||||
{
|
{
|
||||||
|
@ -9,7 +9,7 @@ namespace Tapeti.Default
|
|||||||
internal class ControllerBindingContext : IControllerBindingContext
|
internal class ControllerBindingContext : IControllerBindingContext
|
||||||
{
|
{
|
||||||
private BindingTargetMode? bindingTargetMode;
|
private BindingTargetMode? bindingTargetMode;
|
||||||
private readonly List<IControllerMiddlewareBase> middleware = new List<IControllerMiddlewareBase>();
|
private readonly List<IControllerMiddlewareBase> middleware = new();
|
||||||
private readonly List<ControllerBindingParameter> parameters;
|
private readonly List<ControllerBindingParameter> parameters;
|
||||||
private readonly ControllerBindingResult result;
|
private readonly ControllerBindingResult result;
|
||||||
|
|
||||||
|
@ -161,11 +161,11 @@ namespace Tapeti.Default
|
|||||||
{
|
{
|
||||||
var controller = dependencyResolver.Resolve(bindingInfo.ControllerType);
|
var controller = dependencyResolver.Resolve(bindingInfo.ControllerType);
|
||||||
|
|
||||||
using (var controllerContext = new ControllerMessageContext(context)
|
await using var controllerContext = new ControllerMessageContext(context)
|
||||||
{
|
{
|
||||||
Controller = controller
|
Controller = controller
|
||||||
})
|
};
|
||||||
{
|
|
||||||
if (!await FilterAllowed(controllerContext))
|
if (!await FilterAllowed(controllerContext))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -175,23 +175,21 @@ namespace Tapeti.Default
|
|||||||
async (handler, next) => await handler.Handle(controllerContext, next),
|
async (handler, next) => await handler.Handle(controllerContext, next),
|
||||||
async () => await messageHandler(controllerContext));
|
async () => await messageHandler(controllerContext));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
|
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
|
||||||
{
|
{
|
||||||
using (var controllerContext = new ControllerMessageContext(context)
|
await using var controllerContext = new ControllerMessageContext(context)
|
||||||
{
|
{
|
||||||
Controller = null
|
Controller = null
|
||||||
})
|
};
|
||||||
{
|
|
||||||
await MiddlewareHelper.GoAsync(
|
await MiddlewareHelper.GoAsync(
|
||||||
bindingInfo.CleanupMiddleware,
|
bindingInfo.CleanupMiddleware,
|
||||||
async (handler, next) => await handler.Cleanup(controllerContext, consumeResult, next),
|
async (handler, next) => await handler.Cleanup(controllerContext, consumeResult, next),
|
||||||
() => Task.CompletedTask);
|
() => Task.CompletedTask);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private async Task<bool> FilterAllowed(IControllerMessageContext context)
|
private async Task<bool> FilterAllowed(IControllerMessageContext context)
|
||||||
@ -213,7 +211,7 @@ namespace Tapeti.Default
|
|||||||
private delegate Task MessageHandlerFunc(IControllerMessageContext context);
|
private delegate Task MessageHandlerFunc(IControllerMessageContext context);
|
||||||
|
|
||||||
|
|
||||||
private static MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameterFactories, ResultHandler resultHandler)
|
private MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameterFactories, ResultHandler resultHandler)
|
||||||
{
|
{
|
||||||
if (resultHandler != null)
|
if (resultHandler != null)
|
||||||
return WrapResultHandlerMethod(method, parameterFactories, resultHandler);
|
return WrapResultHandlerMethod(method, parameterFactories, resultHandler);
|
||||||
@ -231,49 +229,98 @@ namespace Tapeti.Default
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static MessageHandlerFunc WrapResultHandlerMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories, ResultHandler resultHandler)
|
private MessageHandlerFunc WrapResultHandlerMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories, ResultHandler resultHandler)
|
||||||
{
|
{
|
||||||
return context =>
|
return context =>
|
||||||
|
{
|
||||||
|
try
|
||||||
{
|
{
|
||||||
var result = method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
|
var result = method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
|
||||||
return resultHandler(context, result);
|
return resultHandler(context, result);
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
AddExceptionData(e);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private static MessageHandlerFunc WrapNullMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
|
private MessageHandlerFunc WrapNullMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
|
||||||
{
|
{
|
||||||
return context =>
|
return context =>
|
||||||
|
{
|
||||||
|
try
|
||||||
{
|
{
|
||||||
method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
|
method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
AddExceptionData(e);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static MessageHandlerFunc WrapTaskMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
|
private MessageHandlerFunc WrapTaskMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
|
||||||
{
|
{
|
||||||
return context => (Task)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
|
return context =>
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return (Task) method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
AddExceptionData(e);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static MessageHandlerFunc WrapGenericTaskMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
|
private MessageHandlerFunc WrapGenericTaskMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
|
||||||
{
|
{
|
||||||
return context =>
|
return context =>
|
||||||
|
{
|
||||||
|
try
|
||||||
{
|
{
|
||||||
return (Task<object>)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
|
return (Task<object>)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
AddExceptionData(e);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static MessageHandlerFunc WrapObjectMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
|
private MessageHandlerFunc WrapObjectMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
|
||||||
{
|
{
|
||||||
return context =>
|
return context =>
|
||||||
|
{
|
||||||
|
try
|
||||||
{
|
{
|
||||||
return Task.FromResult(method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()));
|
return Task.FromResult(method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()));
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
AddExceptionData(e);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void AddExceptionData(Exception exception)
|
||||||
|
{
|
||||||
|
exception.Data["Tapeti.Controller.Name"] = bindingInfo.ControllerType?.FullName;
|
||||||
|
exception.Data["Tapeti.Controller.Method"] = bindingInfo.Method?.Name;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Contains information about the queue linked to the controller method.
|
/// Contains information about the queue linked to the controller method.
|
||||||
|
@ -16,8 +16,8 @@ namespace Tapeti.Default
|
|||||||
private const string ClassTypeHeader = "classType";
|
private const string ClassTypeHeader = "classType";
|
||||||
|
|
||||||
|
|
||||||
private readonly ConcurrentDictionary<string, Type> deserializedTypeNames = new ConcurrentDictionary<string, Type>();
|
private readonly ConcurrentDictionary<string, Type> deserializedTypeNames = new();
|
||||||
private readonly ConcurrentDictionary<Type, string> serializedTypeNames = new ConcurrentDictionary<Type, string>();
|
private readonly ConcurrentDictionary<Type, string> serializedTypeNames = new();
|
||||||
private readonly JsonSerializerSettings serializerSettings;
|
private readonly JsonSerializerSettings serializerSettings;
|
||||||
|
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ namespace Tapeti.Default
|
|||||||
{
|
{
|
||||||
internal class MessageContext : IMessageContext
|
internal class MessageContext : IMessageContext
|
||||||
{
|
{
|
||||||
private readonly Dictionary<string, object> items = new Dictionary<string, object>();
|
private readonly Dictionary<string, object> items = new();
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
|
@ -10,7 +10,7 @@ namespace Tapeti.Default
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public class MessageProperties : IMessageProperties
|
public class MessageProperties : IMessageProperties
|
||||||
{
|
{
|
||||||
private readonly Dictionary<string, string> headers = new Dictionary<string, string>();
|
private readonly Dictionary<string, string> headers = new();
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
|
@ -13,7 +13,7 @@ namespace Tapeti.Default
|
|||||||
/// </example>
|
/// </example>
|
||||||
public class NamespaceMatchExchangeStrategy : IExchangeStrategy
|
public class NamespaceMatchExchangeStrategy : IExchangeStrategy
|
||||||
{
|
{
|
||||||
private static readonly Regex NamespaceRegex = new Regex("^(Messaging\\.)?(?<exchange>[^\\.]+)", RegexOptions.Compiled | RegexOptions.Singleline);
|
private static readonly Regex NamespaceRegex = new("^(Messaging\\.)?(?<exchange>[^\\.]+)", RegexOptions.Compiled | RegexOptions.Singleline);
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
|
@ -87,8 +87,7 @@ namespace Tapeti.Default
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public void SetHeader(string name, string value)
|
public void SetHeader(string name, string value)
|
||||||
{
|
{
|
||||||
if (BasicProperties.Headers == null)
|
BasicProperties.Headers ??= new Dictionary<string, object>();
|
||||||
BasicProperties.Headers = new Dictionary<string, object>();
|
|
||||||
|
|
||||||
if (BasicProperties.Headers.ContainsKey(name))
|
if (BasicProperties.Headers.ContainsKey(name))
|
||||||
BasicProperties.Headers[name] = Encoding.UTF8.GetBytes(value);
|
BasicProperties.Headers[name] = Encoding.UTF8.GetBytes(value);
|
||||||
|
@ -28,9 +28,9 @@ namespace Tapeti.Default
|
|||||||
(?(?<=[A-Z])[A-Z](?=[a-z])|[A-Z])
|
(?(?<=[A-Z])[A-Z](?=[a-z])|[A-Z])
|
||||||
)";
|
)";
|
||||||
|
|
||||||
private static readonly Regex SeparatorRegex = new Regex(SeparatorPattern, RegexOptions.IgnorePatternWhitespace | RegexOptions.Compiled);
|
private static readonly Regex SeparatorRegex = new(SeparatorPattern, RegexOptions.IgnorePatternWhitespace | RegexOptions.Compiled);
|
||||||
|
|
||||||
private static readonly ConcurrentDictionary<Type, string> RoutingKeyCache = new ConcurrentDictionary<Type, string>();
|
private static readonly ConcurrentDictionary<Type, string> RoutingKeyCache = new();
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
|
@ -8,7 +8,7 @@ namespace Tapeti.Helpers
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public class ConnectionStringParser
|
public class ConnectionStringParser
|
||||||
{
|
{
|
||||||
private readonly TapetiConnectionParams result = new TapetiConnectionParams();
|
private readonly TapetiConnectionParams result = new();
|
||||||
|
|
||||||
private readonly string connectionstring;
|
private readonly string connectionstring;
|
||||||
private int pos = -1;
|
private int pos = -1;
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
using Tapeti.Config;
|
using Tapeti.Config;
|
||||||
|
|
||||||
// ReSharper disable UnusedMember.Global
|
// ReSharper disable UnusedMember.Global
|
||||||
@ -130,6 +131,16 @@ namespace Tapeti
|
|||||||
/// <param name="passive">Indicates whether the queue was declared as passive (to verify durable queues)</param>
|
/// <param name="passive">Indicates whether the queue was declared as passive (to verify durable queues)</param>
|
||||||
void QueueDeclare(string queueName, bool durable, bool passive);
|
void QueueDeclare(string queueName, bool durable, bool passive);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Called when a durable queue would be declared but already exists with incompatible x-arguments. The existing
|
||||||
|
/// queue will be consumed without declaring to prevent errors during startup. This is used for compatibility with existing queues
|
||||||
|
/// not declared by Tapeti.
|
||||||
|
/// If the queue already exists but should be compatible QueueDeclare will be called instead.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="queueName">The name of the queue that is declared</param>
|
||||||
|
/// <param name="arguments">The x-arguments of the existing queue</param>
|
||||||
|
void QueueExistsWarning(string queueName, Dictionary<string, string> arguments);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Called before a binding is added to a queue.
|
/// Called before a binding is added to a queue.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
// ReSharper disable UnusedMember.Global
|
// ReSharper disable UnusedMember.Global
|
||||||
|
// ReSharper disable UnusedMemberInSuper.Global
|
||||||
|
|
||||||
namespace Tapeti
|
namespace Tapeti
|
||||||
{
|
{
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
<PackageLicenseExpression>Unlicense</PackageLicenseExpression>
|
<PackageLicenseExpression>Unlicense</PackageLicenseExpression>
|
||||||
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
|
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
|
||||||
<PackageIcon>Tapeti.png</PackageIcon>
|
<PackageIcon>Tapeti.png</PackageIcon>
|
||||||
|
<LangVersion>latest</LangVersion>
|
||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
|
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
|
||||||
|
@ -18,7 +18,7 @@ namespace Tapeti
|
|||||||
public class TapetiConfig : ITapetiConfigBuilder, ITapetiConfigBuilderAccess
|
public class TapetiConfig : ITapetiConfigBuilder, ITapetiConfigBuilderAccess
|
||||||
{
|
{
|
||||||
private Config config;
|
private Config config;
|
||||||
private readonly List<IControllerBindingMiddleware> bindingMiddleware = new List<IControllerBindingMiddleware>();
|
private readonly List<IControllerBindingMiddleware> bindingMiddleware = new();
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
@ -112,7 +112,7 @@ namespace Tapeti
|
|||||||
|
|
||||||
default:
|
default:
|
||||||
throw new ArgumentException(
|
throw new ArgumentException(
|
||||||
$"Unsupported middleware implementation: {(middleware == null ? "null" : middleware.GetType().Name)}");
|
$"Unsupported middleware implementation: {middleware?.GetType().Name ?? "null"}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -224,9 +224,9 @@ namespace Tapeti
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
internal class Config : ITapetiConfig
|
internal class Config : ITapetiConfig
|
||||||
{
|
{
|
||||||
private readonly ConfigFeatures features = new ConfigFeatures();
|
private readonly ConfigFeatures features = new();
|
||||||
private readonly ConfigMiddleware middleware = new ConfigMiddleware();
|
private readonly ConfigMiddleware middleware = new();
|
||||||
private readonly ConfigBindings bindings = new ConfigBindings();
|
private readonly ConfigBindings bindings = new();
|
||||||
|
|
||||||
public IDependencyResolver DependencyResolver { get; }
|
public IDependencyResolver DependencyResolver { get; }
|
||||||
public ITapetiConfigFeatues Features => features;
|
public ITapetiConfigFeatues Features => features;
|
||||||
@ -290,8 +290,8 @@ namespace Tapeti
|
|||||||
|
|
||||||
internal class ConfigMiddleware : ITapetiConfigMiddleware
|
internal class ConfigMiddleware : ITapetiConfigMiddleware
|
||||||
{
|
{
|
||||||
private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>();
|
private readonly List<IMessageMiddleware> messageMiddleware = new();
|
||||||
private readonly List<IPublishMiddleware> publishMiddleware = new List<IPublishMiddleware>();
|
private readonly List<IPublishMiddleware> publishMiddleware = new();
|
||||||
|
|
||||||
|
|
||||||
public IReadOnlyList<IMessageMiddleware> Message => messageMiddleware;
|
public IReadOnlyList<IMessageMiddleware> Message => messageMiddleware;
|
||||||
|
@ -60,7 +60,7 @@ namespace Tapeti
|
|||||||
/// will be overwritten. See DefaultClientProperties in Connection.cs in the RabbitMQ .NET client source for the default values.
|
/// will be overwritten. See DefaultClientProperties in Connection.cs in the RabbitMQ .NET client source for the default values.
|
||||||
/// </remarks>
|
/// </remarks>
|
||||||
public IDictionary<string, string> ClientProperties {
|
public IDictionary<string, string> ClientProperties {
|
||||||
get => clientProperties ?? (clientProperties = new Dictionary<string, string>());
|
get => clientProperties ??= new Dictionary<string, string>();
|
||||||
set => clientProperties = value;
|
set => clientProperties = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,10 +12,10 @@ namespace Tapeti.Tasks
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public class SingleThreadTaskQueue : IDisposable
|
public class SingleThreadTaskQueue : IDisposable
|
||||||
{
|
{
|
||||||
private readonly object previousTaskLock = new object();
|
private readonly object previousTaskLock = new();
|
||||||
private Task previousTask = Task.CompletedTask;
|
private Task previousTask = Task.CompletedTask;
|
||||||
|
|
||||||
private readonly Lazy<SingleThreadTaskScheduler> singleThreadScheduler = new Lazy<SingleThreadTaskScheduler>();
|
private readonly Lazy<SingleThreadTaskScheduler> singleThreadScheduler = new();
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -26,7 +26,7 @@ namespace Tapeti.Tasks
|
|||||||
{
|
{
|
||||||
lock (previousTaskLock)
|
lock (previousTaskLock)
|
||||||
{
|
{
|
||||||
previousTask = previousTask.ContinueWith(t => action(), CancellationToken.None
|
previousTask = previousTask.ContinueWith(_ => action(), CancellationToken.None
|
||||||
, TaskContinuationOptions.None
|
, TaskContinuationOptions.None
|
||||||
, singleThreadScheduler.Value);
|
, singleThreadScheduler.Value);
|
||||||
|
|
||||||
@ -43,7 +43,7 @@ namespace Tapeti.Tasks
|
|||||||
{
|
{
|
||||||
lock (previousTaskLock)
|
lock (previousTaskLock)
|
||||||
{
|
{
|
||||||
var task = previousTask.ContinueWith(t => func(), CancellationToken.None
|
var task = previousTask.ContinueWith(_ => func(), CancellationToken.None
|
||||||
, TaskContinuationOptions.None
|
, TaskContinuationOptions.None
|
||||||
, singleThreadScheduler.Value);
|
, singleThreadScheduler.Value);
|
||||||
|
|
||||||
@ -70,7 +70,7 @@ namespace Tapeti.Tasks
|
|||||||
public override int MaximumConcurrencyLevel => 1;
|
public override int MaximumConcurrencyLevel => 1;
|
||||||
|
|
||||||
|
|
||||||
private readonly Queue<Task> scheduledTasks = new Queue<Task>();
|
private readonly Queue<Task> scheduledTasks = new();
|
||||||
private bool disposed;
|
private bool disposed;
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user