1
0
mirror of synced 2024-06-26 14:27:38 +00:00

Merge branch 'develop'

This commit is contained in:
Mark van Renswoude 2021-07-05 10:29:01 +02:00
commit 6a6f6e5e67
9 changed files with 237 additions and 20 deletions

View File

@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<RootNamespace>_07_ParallelizationTest</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="5.3.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Examples\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Examples\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,9 @@
using System.Threading.Tasks;
namespace _07_ParallelizationTest
{
public interface IMessageParallelization
{
Task WaitForBatch();
}
}

View File

@ -0,0 +1,24 @@
using System.Threading.Tasks;
using Messaging.TapetiExample;
using Tapeti.Annotations;
namespace _07_ParallelizationTest
{
[MessageController]
[DynamicQueue("tapeti.example.07")]
public class ParallelizationMessageController
{
private readonly IMessageParallelization messageParallelization;
public ParallelizationMessageController(IMessageParallelization messageParallelization)
{
this.messageParallelization = messageParallelization;
}
public async Task HandleSpeedTestMessage(SpeedTestMessage message)
{
await messageParallelization.WaitForBatch();
}
}
}

View File

@ -0,0 +1,148 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using ExampleLib;
using Messaging.TapetiExample;
using SimpleInjector;
using Tapeti;
using Tapeti.Default;
using Tapeti.SimpleInjector;
namespace _07_ParallelizationTest
{
public class Program
{
private const int MessageCount = 3000;
private const int RepeatBatch = 4;
public static void Main()
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
container.Register<ILogger, ConsoleLogger>();
var helper = new ExampleConsoleApp(dependencyResolver);
helper.Run(MainAsync);
}
internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func<Task> waitForDone)
{
var doneCount = 0;
var container = (IDependencyContainer) dependencyResolver;
container.RegisterDefaultSingleton<IMessageParallelization>(new MessageParallelization(MessageCount, () =>
{
doneCount++;
Console.WriteLine($"Processed batch #{doneCount}");
if (doneCount != RepeatBatch)
return false;
var exampleState = dependencyResolver.Resolve<IExampleState>();
exampleState.Done();
return true;
}, count =>
{
Console.WriteLine($"Timeout while processing batch after processing {count} messages");
var exampleState = dependencyResolver.Resolve<IExampleState>();
exampleState.Done();
}));
var config = new TapetiConfig(dependencyResolver)
.RegisterAllControllers()
.Build();
await using var connection = new TapetiConnection(config)
{
Params = new TapetiConnectionParams
{
// Default is 50, which means we'll get a timeout after 50 messages
PrefetchCount = MessageCount
}
};
var subscriber = await connection.Subscribe(false);
var publisher = dependencyResolver.Resolve<IPublisher>();
Console.WriteLine($"Publishing {MessageCount * RepeatBatch} messages...");
await PublishMessages(publisher, MessageCount * RepeatBatch);
Console.WriteLine("Consuming messages...");
await subscriber.Resume();
await waitForDone();
}
internal static async Task PublishMessages(IPublisher publisher, int messageCount)
{
for (var i = 0; i < messageCount; i++)
{
await publisher.Publish(new SpeedTestMessage
{
PublishCount = i
});
}
}
}
internal class MessageParallelization : IMessageParallelization
{
private readonly int max;
private readonly Func<bool> done;
private readonly Action<int> timeout;
private int count;
private readonly object waitLock = new object();
private TaskCompletionSource<bool> batchReachedTask = new TaskCompletionSource<bool>();
private Timer messageExpectedTimer;
private readonly TimeSpan messageExpectedTimeout = TimeSpan.FromMilliseconds(5000);
public MessageParallelization(int max, Func<bool> done, Action<int> timeout)
{
this.max = max;
this.done = done;
this.timeout = timeout;
}
public Task WaitForBatch()
{
lock (waitLock)
{
if (messageExpectedTimer == null)
messageExpectedTimer = new Timer(state =>
{
timeout(count);
}, null, messageExpectedTimeout, Timeout.InfiniteTimeSpan);
else
messageExpectedTimer.Change(messageExpectedTimeout, Timeout.InfiniteTimeSpan);
count++;
if (count != max)
return batchReachedTask.Task;
if (done())
messageExpectedTimer.Dispose();
count = 0;
batchReachedTask.SetResult(true);
batchReachedTask = new TaskCompletionSource<bool>();
return Task.CompletedTask;
}
}
}
}

View File

@ -1,7 +1,7 @@
 
Microsoft Visual Studio Solution File, Format Version 12.00 Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15 # Visual Studio Version 16
VisualStudioVersion = 15.0.27703.2026 VisualStudioVersion = 16.0.31005.135
MinimumVisualStudioVersion = 10.0.40219.1 MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Annotations", "Tapeti.Annotations\Tapeti.Annotations.csproj", "{4B742AB2-59DD-4792-8E0F-D80B5366B844}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Annotations", "Tapeti.Annotations\Tapeti.Annotations.csproj", "{4B742AB2-59DD-4792-8E0F-D80B5366B844}"
EndProject EndProject
@ -57,7 +57,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tools", "Tools", "{62002327
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Cmd", "Tapeti.Cmd\Tapeti.Cmd.csproj", "{C8728BFC-7F97-41BC-956B-690A57B634EC}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "07-ParallelizationTest", "07-ParallelizationTest\07-ParallelizationTest.csproj", "{E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -157,6 +159,10 @@ Global
{152227AA-3165-4550-8997-6EA80C84516E}.Debug|Any CPU.Build.0 = Debug|Any CPU {152227AA-3165-4550-8997-6EA80C84516E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.ActiveCfg = Release|Any CPU {152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.Build.0 = Release|Any CPU {152227AA-3165-4550-8997-6EA80C84516E}.Release|Any CPU.Build.0 = Release|Any CPU
{E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E69E6BA5-68E7-4A4D-A38C-B2526AA66E96}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -184,6 +190,7 @@ Global
{29478B10-FC53-4E93-ADEF-A775D9408131} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F} {29478B10-FC53-4E93-ADEF-A775D9408131} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F}
{C8728BFC-7F97-41BC-956B-690A57B634EC} = {62002327-46B0-4B72-B95A-594CE7F8C80D} {C8728BFC-7F97-41BC-956B-690A57B634EC} = {62002327-46B0-4B72-B95A-594CE7F8C80D}
{152227AA-3165-4550-8997-6EA80C84516E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56} {152227AA-3165-4550-8997-6EA80C84516E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
{E69E6BA5-68E7-4A4D-A38C-B2526AA66E96} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B09CC2BF-B2AF-4CB6-8728-5D1D8E5C50FA} SolutionGuid = {B09CC2BF-B2AF-4CB6-8728-5D1D8E5C50FA}

View File

@ -9,7 +9,7 @@ namespace Tapeti.Connection
/// <summary> /// <summary>
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
/// </summary> /// </summary>
internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer internal class TapetiBasicConsumer : DefaultBasicConsumer
{ {
private readonly IConsumer consumer; private readonly IConsumer consumer;
private readonly Func<ulong, ConsumeResult, Task> onRespond; private readonly Func<ulong, ConsumeResult, Task> onRespond;
@ -24,23 +24,34 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public override async Task HandleBasicDeliver(string consumerTag, public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag, ulong deliveryTag,
bool redelivered, bool redelivered,
string exchange, string exchange,
string routingKey, string routingKey,
IBasicProperties properties, IBasicProperties properties,
ReadOnlyMemory<byte> body) ReadOnlyMemory<byte> body)
{
// RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing
// from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support
// is implemented we need to rethink the way the body is passed around and maybe deserialize it sooner
// (which changes exception handling, which is now done in TapetiConsumer exclusively).
//
// See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
var bodyArray = body.ToArray();
Task.Run(async () =>
{ {
try try
{ {
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), body); var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
await onRespond(deliveryTag, response); await onRespond(deliveryTag, response);
} }
catch catch
{ {
await onRespond(deliveryTag, ConsumeResult.Error); await onRespond(deliveryTag, ConsumeResult.Error);
} }
});
} }
} }
} }

View File

@ -656,8 +656,7 @@ namespace Tapeti.Connection
Password = connectionParams.Password, Password = connectionParams.Password,
AutomaticRecoveryEnabled = false, AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false, TopologyRecoveryEnabled = false,
RequestedHeartbeat = TimeSpan.FromSeconds(30), RequestedHeartbeat = TimeSpan.FromSeconds(30)
DispatchConsumersAsync = true
}; };
if (connectionParams.ClientProperties != null) if (connectionParams.ClientProperties != null)

View File

@ -39,12 +39,12 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public async Task<ConsumeResult> Consume(string exchange, string routingKey, IMessageProperties properties, ReadOnlyMemory<byte> body) public async Task<ConsumeResult> Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body)
{ {
object message = null; object message = null;
try try
{ {
message = messageSerializer.Deserialize(body.ToArray(), properties); message = messageSerializer.Deserialize(body, properties);
if (message == null) if (message == null)
throw new ArgumentException("Message body could not be deserialized into a message object", nameof(body)); throw new ArgumentException("Message body could not be deserialized into a message object", nameof(body));

View File

@ -1,5 +1,4 @@
using System; using System.Threading.Tasks;
using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
namespace Tapeti namespace Tapeti
@ -17,6 +16,6 @@ namespace Tapeti
/// <param name="properties">Metadata included in the message</param> /// <param name="properties">Metadata included in the message</param>
/// <param name="body">The raw body of the message</param> /// <param name="body">The raw body of the message</param>
/// <returns></returns> /// <returns></returns>
Task<ConsumeResult> Consume(string exchange, string routingKey, IMessageProperties properties, ReadOnlyMemory<byte> body); Task<ConsumeResult> Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body);
} }
} }