From 52965b23728a29327055c8beb50fb2f3c3d75ab4 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 5 Jul 2021 10:28:49 +0200 Subject: [PATCH] Fixed #30 Test if new Tapeti async consumer runs parallel Alternative fix for #29 Deserialization errors due to not buffering RabbitMQ Client's body --- .../07-ParallelizationTest.csproj | 20 +++ .../IMessageParallelization.cs | 9 ++ .../ParallelizationMessageController.cs | 24 +++ 07-ParallelizationTest/Program.cs | 148 ++++++++++++++++++ Tapeti.sln | 13 +- Tapeti/Connection/TapetiBasicConsumer.cs | 10 +- 6 files changed, 220 insertions(+), 4 deletions(-) create mode 100644 07-ParallelizationTest/07-ParallelizationTest.csproj create mode 100644 07-ParallelizationTest/IMessageParallelization.cs create mode 100644 07-ParallelizationTest/ParallelizationMessageController.cs create mode 100644 07-ParallelizationTest/Program.cs diff --git a/07-ParallelizationTest/07-ParallelizationTest.csproj b/07-ParallelizationTest/07-ParallelizationTest.csproj new file mode 100644 index 0000000..90bd0e1 --- /dev/null +++ b/07-ParallelizationTest/07-ParallelizationTest.csproj @@ -0,0 +1,20 @@ + + + + Exe + netcoreapp3.1 + _07_ParallelizationTest + + + + + + + + + + + + + + diff --git a/07-ParallelizationTest/IMessageParallelization.cs b/07-ParallelizationTest/IMessageParallelization.cs new file mode 100644 index 0000000..fff522a --- /dev/null +++ b/07-ParallelizationTest/IMessageParallelization.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace _07_ParallelizationTest +{ + public interface IMessageParallelization + { + Task WaitForBatch(); + } +} diff --git a/07-ParallelizationTest/ParallelizationMessageController.cs b/07-ParallelizationTest/ParallelizationMessageController.cs new file mode 100644 index 0000000..66377a7 --- /dev/null +++ b/07-ParallelizationTest/ParallelizationMessageController.cs @@ -0,0 +1,24 @@ +using System.Threading.Tasks; +using Messaging.TapetiExample; +using Tapeti.Annotations; + +namespace _07_ParallelizationTest +{ + [MessageController] + [DynamicQueue("tapeti.example.07")] + public class ParallelizationMessageController + { + private readonly IMessageParallelization messageParallelization; + + public ParallelizationMessageController(IMessageParallelization messageParallelization) + { + this.messageParallelization = messageParallelization; + } + + + public async Task HandleSpeedTestMessage(SpeedTestMessage message) + { + await messageParallelization.WaitForBatch(); + } + } +} diff --git a/07-ParallelizationTest/Program.cs b/07-ParallelizationTest/Program.cs new file mode 100644 index 0000000..437c737 --- /dev/null +++ b/07-ParallelizationTest/Program.cs @@ -0,0 +1,148 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using ExampleLib; +using Messaging.TapetiExample; +using SimpleInjector; +using Tapeti; +using Tapeti.Default; +using Tapeti.SimpleInjector; + +namespace _07_ParallelizationTest +{ + public class Program + { + private const int MessageCount = 3000; + private const int RepeatBatch = 4; + + + public static void Main() + { + var container = new Container(); + var dependencyResolver = new SimpleInjectorDependencyResolver(container); + + container.Register(); + + var helper = new ExampleConsoleApp(dependencyResolver); + helper.Run(MainAsync); + } + + + internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone) + { + var doneCount = 0; + + var container = (IDependencyContainer) dependencyResolver; + container.RegisterDefaultSingleton(new MessageParallelization(MessageCount, () => + { + doneCount++; + Console.WriteLine($"Processed batch #{doneCount}"); + + if (doneCount != RepeatBatch) + return false; + + var exampleState = dependencyResolver.Resolve(); + exampleState.Done(); + return true; + }, count => + { + Console.WriteLine($"Timeout while processing batch after processing {count} messages"); + + var exampleState = dependencyResolver.Resolve(); + exampleState.Done(); + })); + + + + var config = new TapetiConfig(dependencyResolver) + .RegisterAllControllers() + .Build(); + + + await using var connection = new TapetiConnection(config) + { + Params = new TapetiConnectionParams + { + // Default is 50, which means we'll get a timeout after 50 messages + PrefetchCount = MessageCount + } + }; + + var subscriber = await connection.Subscribe(false); + + + var publisher = dependencyResolver.Resolve(); + Console.WriteLine($"Publishing {MessageCount * RepeatBatch} messages..."); + + await PublishMessages(publisher, MessageCount * RepeatBatch); + + + + Console.WriteLine("Consuming messages..."); + await subscriber.Resume(); + await waitForDone(); + } + + + internal static async Task PublishMessages(IPublisher publisher, int messageCount) + { + for (var i = 0; i < messageCount; i++) + { + await publisher.Publish(new SpeedTestMessage + { + PublishCount = i + }); + } + } + } + + + internal class MessageParallelization : IMessageParallelization + { + private readonly int max; + private readonly Func done; + private readonly Action timeout; + private int count; + private readonly object waitLock = new object(); + private TaskCompletionSource batchReachedTask = new TaskCompletionSource(); + private Timer messageExpectedTimer; + private readonly TimeSpan messageExpectedTimeout = TimeSpan.FromMilliseconds(5000); + + + public MessageParallelization(int max, Func done, Action timeout) + { + this.max = max; + this.done = done; + this.timeout = timeout; + } + + + public Task WaitForBatch() + { + lock (waitLock) + { + if (messageExpectedTimer == null) + messageExpectedTimer = new Timer(state => + { + timeout(count); + }, null, messageExpectedTimeout, Timeout.InfiniteTimeSpan); + else + messageExpectedTimer.Change(messageExpectedTimeout, Timeout.InfiniteTimeSpan); + + count++; + if (count != max) + return batchReachedTask.Task; + + if (done()) + messageExpectedTimer.Dispose(); + + count = 0; + + batchReachedTask.SetResult(true); + batchReachedTask = new TaskCompletionSource(); + + return Task.CompletedTask; + } + } + } +} diff --git a/Tapeti.sln b/Tapeti.sln index 63a14c7..3a1a45c 100644 --- a/Tapeti.sln +++ b/Tapeti.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 15 -VisualStudioVersion = 15.0.27703.2026 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.31005.135 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Annotations", "Tapeti.Annotations\Tapeti.Annotations.csproj", "{4B742AB2-59DD-4792-8E0F-D80B5366B844}" EndProject @@ -57,7 +57,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tools", "Tools", "{62002327 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "07-ParallelizationTest", "07-ParallelizationTest\07-ParallelizationTest.csproj", "{E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -157,6 +159,10 @@ Global {152227AA-3165-4550-8997-6EA80C84516E}.Debug|Any CPU.Build.0 = Debug|Any CPU {152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.ActiveCfg = Release|Any CPU {152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.Build.0 = Release|Any CPU + {E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -184,6 +190,7 @@ Global {29478B10-FC53-4E93-ADEF-A775D9408131} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F} {C8728BFC-7F97-41BC-956B-690A57B634EC} = {62002327-46B0-4B72-B95A-594CE7F8C80D} {152227AA-3165-4550-8997-6EA80C84516E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56} + {E69E6BA5-68E7-4A4D-A38C-B2526AA66E96} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B09CC2BF-B2AF-4CB6-8728-5D1D8E5C50FA} diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs index 7dd969a..3e9934f 100644 --- a/Tapeti/Connection/TapetiBasicConsumer.cs +++ b/Tapeti/Connection/TapetiBasicConsumer.cs @@ -32,11 +32,19 @@ namespace Tapeti.Connection IBasicProperties properties, ReadOnlyMemory body) { + // RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing + // from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support + // is implemented we need to rethink the way the body is passed around and maybe deserialize it sooner + // (which changes exception handling, which is now done in TapetiConsumer exclusively). + // + // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 + var bodyArray = body.ToArray(); + Task.Run(async () => { try { - var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), body.ToArray()); + var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); await onRespond(deliveryTag, response); } catch