diff --git a/07-ParallelizationTest/07-ParallelizationTest.csproj b/07-ParallelizationTest/07-ParallelizationTest.csproj
new file mode 100644
index 0000000..90bd0e1
--- /dev/null
+++ b/07-ParallelizationTest/07-ParallelizationTest.csproj
@@ -0,0 +1,20 @@
+
+
+
+ Exe
+ netcoreapp3.1
+ _07_ParallelizationTest
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/07-ParallelizationTest/IMessageParallelization.cs b/07-ParallelizationTest/IMessageParallelization.cs
new file mode 100644
index 0000000..fff522a
--- /dev/null
+++ b/07-ParallelizationTest/IMessageParallelization.cs
@@ -0,0 +1,9 @@
+using System.Threading.Tasks;
+
+namespace _07_ParallelizationTest
+{
+ public interface IMessageParallelization
+ {
+ Task WaitForBatch();
+ }
+}
diff --git a/07-ParallelizationTest/ParallelizationMessageController.cs b/07-ParallelizationTest/ParallelizationMessageController.cs
new file mode 100644
index 0000000..66377a7
--- /dev/null
+++ b/07-ParallelizationTest/ParallelizationMessageController.cs
@@ -0,0 +1,24 @@
+using System.Threading.Tasks;
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+
+namespace _07_ParallelizationTest
+{
+ [MessageController]
+ [DynamicQueue("tapeti.example.07")]
+ public class ParallelizationMessageController
+ {
+ private readonly IMessageParallelization messageParallelization;
+
+ public ParallelizationMessageController(IMessageParallelization messageParallelization)
+ {
+ this.messageParallelization = messageParallelization;
+ }
+
+
+ public async Task HandleSpeedTestMessage(SpeedTestMessage message)
+ {
+ await messageParallelization.WaitForBatch();
+ }
+ }
+}
diff --git a/07-ParallelizationTest/Program.cs b/07-ParallelizationTest/Program.cs
new file mode 100644
index 0000000..437c737
--- /dev/null
+++ b/07-ParallelizationTest/Program.cs
@@ -0,0 +1,148 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using ExampleLib;
+using Messaging.TapetiExample;
+using SimpleInjector;
+using Tapeti;
+using Tapeti.Default;
+using Tapeti.SimpleInjector;
+
+namespace _07_ParallelizationTest
+{
+ public class Program
+ {
+ private const int MessageCount = 3000;
+ private const int RepeatBatch = 4;
+
+
+ public static void Main()
+ {
+ var container = new Container();
+ var dependencyResolver = new SimpleInjectorDependencyResolver(container);
+
+ container.Register();
+
+ var helper = new ExampleConsoleApp(dependencyResolver);
+ helper.Run(MainAsync);
+ }
+
+
+ internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone)
+ {
+ var doneCount = 0;
+
+ var container = (IDependencyContainer) dependencyResolver;
+ container.RegisterDefaultSingleton(new MessageParallelization(MessageCount, () =>
+ {
+ doneCount++;
+ Console.WriteLine($"Processed batch #{doneCount}");
+
+ if (doneCount != RepeatBatch)
+ return false;
+
+ var exampleState = dependencyResolver.Resolve();
+ exampleState.Done();
+ return true;
+ }, count =>
+ {
+ Console.WriteLine($"Timeout while processing batch after processing {count} messages");
+
+ var exampleState = dependencyResolver.Resolve();
+ exampleState.Done();
+ }));
+
+
+
+ var config = new TapetiConfig(dependencyResolver)
+ .RegisterAllControllers()
+ .Build();
+
+
+ await using var connection = new TapetiConnection(config)
+ {
+ Params = new TapetiConnectionParams
+ {
+ // Default is 50, which means we'll get a timeout after 50 messages
+ PrefetchCount = MessageCount
+ }
+ };
+
+ var subscriber = await connection.Subscribe(false);
+
+
+ var publisher = dependencyResolver.Resolve();
+ Console.WriteLine($"Publishing {MessageCount * RepeatBatch} messages...");
+
+ await PublishMessages(publisher, MessageCount * RepeatBatch);
+
+
+
+ Console.WriteLine("Consuming messages...");
+ await subscriber.Resume();
+ await waitForDone();
+ }
+
+
+ internal static async Task PublishMessages(IPublisher publisher, int messageCount)
+ {
+ for (var i = 0; i < messageCount; i++)
+ {
+ await publisher.Publish(new SpeedTestMessage
+ {
+ PublishCount = i
+ });
+ }
+ }
+ }
+
+
+ internal class MessageParallelization : IMessageParallelization
+ {
+ private readonly int max;
+ private readonly Func done;
+ private readonly Action timeout;
+ private int count;
+ private readonly object waitLock = new object();
+ private TaskCompletionSource batchReachedTask = new TaskCompletionSource();
+ private Timer messageExpectedTimer;
+ private readonly TimeSpan messageExpectedTimeout = TimeSpan.FromMilliseconds(5000);
+
+
+ public MessageParallelization(int max, Func done, Action timeout)
+ {
+ this.max = max;
+ this.done = done;
+ this.timeout = timeout;
+ }
+
+
+ public Task WaitForBatch()
+ {
+ lock (waitLock)
+ {
+ if (messageExpectedTimer == null)
+ messageExpectedTimer = new Timer(state =>
+ {
+ timeout(count);
+ }, null, messageExpectedTimeout, Timeout.InfiniteTimeSpan);
+ else
+ messageExpectedTimer.Change(messageExpectedTimeout, Timeout.InfiniteTimeSpan);
+
+ count++;
+ if (count != max)
+ return batchReachedTask.Task;
+
+ if (done())
+ messageExpectedTimer.Dispose();
+
+ count = 0;
+
+ batchReachedTask.SetResult(true);
+ batchReachedTask = new TaskCompletionSource();
+
+ return Task.CompletedTask;
+ }
+ }
+ }
+}
diff --git a/Tapeti.sln b/Tapeti.sln
index 63a14c7..3a1a45c 100644
--- a/Tapeti.sln
+++ b/Tapeti.sln
@@ -1,7 +1,7 @@
Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio 15
-VisualStudioVersion = 15.0.27703.2026
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.31005.135
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Annotations", "Tapeti.Annotations\Tapeti.Annotations.csproj", "{4B742AB2-59DD-4792-8E0F-D80B5366B844}"
EndProject
@@ -57,7 +57,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tools", "Tools", "{62002327
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "07-ParallelizationTest", "07-ParallelizationTest\07-ParallelizationTest.csproj", "{E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -157,6 +159,10 @@ Global
{152227AA-3165-4550-8997-6EA80C84516E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -184,6 +190,7 @@ Global
{29478B10-FC53-4E93-ADEF-A775D9408131} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F}
{C8728BFC-7F97-41BC-956B-690A57B634EC} = {62002327-46B0-4B72-B95A-594CE7F8C80D}
{152227AA-3165-4550-8997-6EA80C84516E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
+ {E69E6BA5-68E7-4A4D-A38C-B2526AA66E96} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B09CC2BF-B2AF-4CB6-8728-5D1D8E5C50FA}
diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs
index 7dd969a..3e9934f 100644
--- a/Tapeti/Connection/TapetiBasicConsumer.cs
+++ b/Tapeti/Connection/TapetiBasicConsumer.cs
@@ -32,11 +32,19 @@ namespace Tapeti.Connection
IBasicProperties properties,
ReadOnlyMemory body)
{
+ // RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing
+ // from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support
+ // is implemented we need to rethink the way the body is passed around and maybe deserialize it sooner
+ // (which changes exception handling, which is now done in TapetiConsumer exclusively).
+ //
+ // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
+ var bodyArray = body.ToArray();
+
Task.Run(async () =>
{
try
{
- var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), body.ToArray());
+ var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
await onRespond(deliveryTag, response);
}
catch