1
0
mirror of synced 2025-01-22 16:13:07 +01:00

Finally removed the old mess of a test project, all functionality should be covered by the examples

Added SpeedTest example
This commit is contained in:
Mark van Renswoude 2019-08-18 11:41:29 +02:00
parent 84ee6f090d
commit 23f86b3597
13 changed files with 206 additions and 463 deletions

View File

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>_05_SpeedTest</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.6.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,7 @@
namespace _05_SpeedTest
{
public interface IMessageCounter
{
void Add();
}
}

View File

@ -0,0 +1,140 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using ExampleLib;
using Messaging.TapetiExample;
using SimpleInjector;
using Tapeti;
using Tapeti.Default;
using Tapeti.SimpleInjector;
namespace _05_SpeedTest
{
public class Program
{
private const int MessageCount = 20000;
// This does not make a massive difference, since internally Tapeti uses a single thread
// to perform all channel operations as recommended by the RabbitMQ .NET client library.
private const int ConcurrentTasks = 20;
public static void Main(string[] args)
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
container.Register<ILogger, ConsoleLogger>();
var helper = new ExampleConsoleApp(dependencyResolver);
helper.Run(MainAsync);
}
internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func<Task> waitForDone)
{
var container = (IDependencyContainer)dependencyResolver;
container.RegisterDefaultSingleton<IMessageCounter>(new MessageCounter(MessageCount, () =>
{
var exampleState = dependencyResolver.Resolve<IExampleState>();
exampleState.Done();
}));
var config = new TapetiConfig(dependencyResolver)
// On a developer test machine, this makes the difference between 2200 messages/sec and 3000 messages/sec published.
// Interesting, but only if speed is more important than guaranteed delivery.
//.DisablePublisherConfirms()
.RegisterAllControllers()
.Build();
using (var connection = new TapetiConnection(config))
{
var subscriber = await connection.Subscribe(false);
var publisher = dependencyResolver.Resolve<IPublisher>();
Console.WriteLine($"Publishing {MessageCount} messages...");
var stopwatch = new Stopwatch();
stopwatch.Start();
await PublishMessages(publisher);
stopwatch.Stop();
Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
Console.WriteLine("Consuming messages...");
await subscriber.Resume();
stopwatch.Restart();
await waitForDone();
stopwatch.Stop();
Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
}
}
internal static async Task PublishMessages(IPublisher publisher)
{
var semaphore = new SemaphoreSlim(ConcurrentTasks);
var tasks = new List<Task>();
for (var i = 0; i < MessageCount; i++)
{
var item = i;
var task = Task.Run(async () =>
{
try
{
await semaphore.WaitAsync();
await publisher.Publish(new SpeedTestMessage
{
PublishCount = item
});
}
finally
{
semaphore.Release();
}
});
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
}
internal class MessageCounter : IMessageCounter
{
private readonly int max;
private readonly Action done;
private int count;
public MessageCounter(int max, Action done)
{
this.max = max;
this.done = done;
}
public void Add()
{
// With a prefetchcount > 1 the consumers are running in multiple threads,
// beware of this when using singletons.
if (Interlocked.Increment(ref count) == max)
done();
}
}
}

View File

@ -0,0 +1,23 @@
using Messaging.TapetiExample;
using Tapeti.Annotations;
namespace _05_SpeedTest
{
[MessageController]
[DynamicQueue("tapeti.example.05")]
public class SpeedMessageController
{
private readonly IMessageCounter messageCounter;
public SpeedMessageController(IMessageCounter messageCounter)
{
this.messageCounter = messageCounter;
}
public void HandleSpeedTestMessage(SpeedTestMessage message)
{
messageCounter.Add();
}
}
}

View File

@ -0,0 +1,9 @@
using System.ComponentModel.DataAnnotations;
namespace Messaging.TapetiExample
{
public class SpeedTestMessage
{
public int PublishCount { get; set; }
}
}

View File

@ -15,8 +15,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Flow.SQL", "Tapeti.F
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.SimpleInjector", "Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj", "{A190C736-E95A-4BDA-AA80-6211226DFCAD}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Test", "Test\Test.csproj", "{1A4B7136-B7DF-41EA-BEA2-E87B4607D420}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Tests", "Tapeti.Tests\Tapeti.Tests.csproj", "{334F3715-63CF-4D13-B09A-38E2A616D4F5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Serilog", "Tapeti.Serilog\Tapeti.Serilog.csproj", "{43AA5DF3-49D5-4795-A290-D6511502B564}"
@ -37,7 +35,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "02-DeclareDurableQueues", "
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "03-FlowRequestResponse", "Examples\03-FlowRequestResponse\03-FlowRequestResponse.csproj", "{463A12CE-E221-450D-ADEA-91A599612DFA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "04-Transient", "Examples\04-Transient\04-Transient.csproj", "{46DFC131-A398-435F-A7DF-3C41B656BF11}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "04-Transient", "Examples\04-Transient\04-Transient.csproj", "{46DFC131-A398-435F-A7DF-3C41B656BF11}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "05-SpeedTest", "Examples\05-SpeedTest\05-SpeedTest.csproj", "{330D05CE-5321-4C7D-8017-2070B891289E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -69,10 +69,6 @@ Global
{A190C736-E95A-4BDA-AA80-6211226DFCAD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A190C736-E95A-4BDA-AA80-6211226DFCAD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A190C736-E95A-4BDA-AA80-6211226DFCAD}.Release|Any CPU.Build.0 = Release|Any CPU
{1A4B7136-B7DF-41EA-BEA2-E87B4607D420}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1A4B7136-B7DF-41EA-BEA2-E87B4607D420}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1A4B7136-B7DF-41EA-BEA2-E87B4607D420}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1A4B7136-B7DF-41EA-BEA2-E87B4607D420}.Release|Any CPU.Build.0 = Release|Any CPU
{334F3715-63CF-4D13-B09A-38E2A616D4F5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{334F3715-63CF-4D13-B09A-38E2A616D4F5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{334F3715-63CF-4D13-B09A-38E2A616D4F5}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -113,6 +109,10 @@ Global
{46DFC131-A398-435F-A7DF-3C41B656BF11}.Debug|Any CPU.Build.0 = Debug|Any CPU
{46DFC131-A398-435F-A7DF-3C41B656BF11}.Release|Any CPU.ActiveCfg = Release|Any CPU
{46DFC131-A398-435F-A7DF-3C41B656BF11}.Release|Any CPU.Build.0 = Release|Any CPU
{330D05CE-5321-4C7D-8017-2070B891289E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{330D05CE-5321-4C7D-8017-2070B891289E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{330D05CE-5321-4C7D-8017-2070B891289E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{330D05CE-5321-4C7D-8017-2070B891289E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -124,6 +124,7 @@ Global
{85511282-EF91-4B56-B7DC-9E8706556D6E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
{463A12CE-E221-450D-ADEA-91A599612DFA} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
{46DFC131-A398-435F-A7DF-3C41B656BF11} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
{330D05CE-5321-4C7D-8017-2070B891289E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B09CC2BF-B2AF-4CB6-8728-5D1D8E5C50FA}

View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="rabbitmq:hostname" value="localhost" />
</appSettings>
</configuration>

View File

@ -1,81 +0,0 @@
using System;
using Tapeti.Annotations;
using Tapeti.Flow;
using Tapeti.Flow.Annotations;
// ReSharper disable UnusedMember.Global
namespace Test
{
[MessageController]
[DynamicQueue]
public class FlowEndController
{
private readonly IFlowProvider flowProvider;
public FlowEndController(IFlowProvider flowProvider)
{
this.flowProvider = flowProvider;
}
[Start]
public IYieldPoint StartFlow(PingMessage message)
{
Console.WriteLine("PingMessage received, calling flowProvider.End() directly");
if (DateTime.Now < new DateTime(2000, 1, 1))
{
//never true
return flowProvider
.YieldWithRequestSync<PingConfirmationRequestMessage, PingConfirmationResponseMessage>
(new PingConfirmationRequestMessage { StoredInState = "Ping:" },
HandlePingConfirmationResponse);
}
return Finish();
}
[Continuation]
public IYieldPoint HandlePingConfirmationResponse(PingConfirmationResponseMessage msg)
{
Console.WriteLine("Ending ping flow: " + msg.Answer);
return Finish();
}
private IYieldPoint Finish()
{
return flowProvider.End();
}
public class PingMessage
{
}
[Request(Response = typeof(PingConfirmationResponseMessage))]
public class PingConfirmationRequestMessage
{
public string StoredInState { get; set; }
}
public class PingConfirmationResponseMessage
{
public string Answer { get; set; }
}
public PingConfirmationResponseMessage PingConfirmation(PingConfirmationRequestMessage message)
{
Console.WriteLine(">> receive Ping (returning pong)");
return new PingConfirmationResponseMessage
{
Answer = message.StoredInState + " Pong!"
};
}
}
}

View File

@ -1,188 +0,0 @@
using System;
using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks;
using Tapeti.Annotations;
using Tapeti.Flow;
using Tapeti.Flow.Annotations;
// ReSharper disable UnusedMember.Global
namespace Test
{
[MessageController]
[DynamicQueue]
public class MarcoController
{
//private readonly IPublisher publisher;
private readonly IFlowProvider flowProvider;
//private readonly Visualizer visualizer;
// Public properties are automatically stored and retrieved while in a flow
public Guid StateTestGuid { get; set; }
public int Phase;
public MarcoController(/*IPublisher publisher, */IFlowProvider flowProvider/*, Visualizer visualizer*/)
{
//this.publisher = publisher;
this.flowProvider = flowProvider;
//this.visualizer = visualizer;
}
[Start]
public async Task<IYieldPoint> StartFlow(bool go)
{
Console.WriteLine("Phase = " + Phase + " Starting stand-alone flow");
await Task.Delay(10);
Phase = 1;
if (go)
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>
(new PoloConfirmationRequestMessage(),
HandlePoloConfirmationResponse);
Console.WriteLine("Phase = " + Phase + " Ending stand-alone flow prematurely");
return flowProvider.End();
}
[Continuation]
public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage msg)
{
Console.WriteLine("Phase = " + Phase + " Handling the first response and sending the second...");
Phase = 2;
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>
(new PoloConfirmationRequestMessage(),
HandlePoloConfirmationResponseEnd);
}
[Continuation]
public IYieldPoint HandlePoloConfirmationResponseEnd(PoloConfirmationResponseMessage msg)
{
Console.WriteLine("Phase = " + Phase + " Handling the second response and Ending stand-alone flow");
return flowProvider.End();
}
[Start]
public IYieldPoint TestParallelRequest()
{
Console.WriteLine(">> Marco (yielding with request)");
StateTestGuid = Guid.NewGuid();
Console.WriteLine($"Starting parallel request with StateTestGuid {StateTestGuid}");
return flowProvider.YieldWithParallelRequest()
.AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
{
StoredInState = StateTestGuid,
EnumValue = TestEnum.Value1
}, HandlePoloConfirmationResponse1)
.AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
{
StoredInState = StateTestGuid,
EnumValue = TestEnum.Value2,
OptionalEnumValue = TestEnum.Value1
}, HandlePoloConfirmationResponse2)
.YieldSync(ContinuePoloConfirmation);
}
[Continuation]
public void HandlePoloConfirmationResponse1(PoloConfirmationResponseMessage message)
{
Console.WriteLine(">> HandlePoloConfirmationResponse1");
Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!");
}
[Continuation]
public void HandlePoloConfirmationResponse2(PoloConfirmationResponseMessage message)
{
Console.WriteLine(">> HandlePoloConfirmationResponse2");
Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!");
}
private IYieldPoint ContinuePoloConfirmation()
{
Console.WriteLine("> ConvergePoloConfirmation (ending flow)");
return flowProvider.End();
}
/**
* For simple request response patterns, the return type can be used.
* This will automatically include the correlationId in the response and
* use the replyTo header of the request if provided.
*/
[DurableQueue("tapeti.test.durable")]
public async Task<PoloConfirmationResponseMessage> PoloConfirmation(PoloConfirmationRequestMessage message)
{
Console.WriteLine(">> PoloConfirmation (returning confirmation)");
await Task.Delay(100);
return new PoloConfirmationResponseMessage
{
ShouldMatchState = message.StoredInState,
EnumValue = message.EnumValue,
OptionalEnumValue = message.OptionalEnumValue
};
}
[DynamicQueue("custom.prefix")]
public void Polo(PoloMessage message)
{
Console.WriteLine(">> Polo");
}
}
public enum TestEnum
{
Value1,
Value2
}
[Request(Response = typeof(PoloMessage))]
public class MarcoMessage
{
}
public class PoloMessage
{
}
[Request(Response = typeof(PoloConfirmationResponseMessage))]
public class PoloConfirmationRequestMessage
{
[Required]
public Guid StoredInState { get; set; }
public TestEnum EnumValue;
public TestEnum? OptionalEnumValue;
}
public class PoloConfirmationResponseMessage
{
[Required]
public Guid ShouldMatchState { get; set; }
public TestEnum EnumValue;
public TestEnum? OptionalEnumValue;
}
}

View File

@ -1,52 +0,0 @@
using System.Threading.Tasks;
// ReSharper disable UnusedMember.Global
namespace Test
{
public class MarcoEmitter
{
//private readonly IPublisher publisher;
/*public MarcoEmitter(IPublisher publisher)
{
this.publisher = publisher;
}
*/
public async Task Run()
{
//await publisher.Publish(new MarcoMessage());
/*
var concurrent = new SemaphoreSlim(20);
while (true)
{
for (var x = 0; x < 200; x++)
{
await concurrent.WaitAsync();
try
{
await publisher.Publish(new MarcoMessage());
}
finally
{
concurrent.Release();
}
}
await Task.Delay(200);
}
*/
while (true)
{
await Task.Delay(1000);
}
// ReSharper disable once FunctionNeverReturns
}
}
}

View File

@ -1,89 +0,0 @@
using System;
using SimpleInjector;
using Tapeti;
using Tapeti.DataAnnotations;
using Tapeti.Flow;
using Tapeti.SimpleInjector;
using System.Threading;
using Tapeti.Transient;
namespace Test
{
internal class Program
{
private static void Main()
{
// TODO logging
//try
{
var container = new Container();
container.Register<MarcoEmitter>();
container.Register<Visualizer>();
container.Register<ILogger, Tapeti.Default.ConsoleLogger>();
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
//.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true")
.WithFlow()
.WithDataAnnotations()
.WithTransient(TimeSpan.FromSeconds(30))
.EnableDeclareDurableQueues()
.RegisterAllControllers()
//.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails
.Build();
using (var connection = new TapetiConnection(config)
{
Params = new TapetiAppSettingsConnectionParams()
})
{
var flowStore = container.GetInstance<IFlowStore>();
var flowStore2 = container.GetInstance<IFlowStore>();
Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2));
flowStore.Load().Wait();
connection.Connected += (sender, e) => { Console.WriteLine("Event Connected"); };
connection.Disconnected += (sender, e) => { Console.WriteLine("Event Disconnected"); };
connection.Reconnected += (sender, e) => { Console.WriteLine("Event Reconnected"); };
Console.WriteLine("Subscribing...");
var subscriber = connection.Subscribe(false).Result;
Console.WriteLine("Consuming...");
subscriber.Resume().Wait();
Console.WriteLine("Done!");
/*
var response = container.GetInstance<ITransientPublisher>()
.RequestResponse<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(
new PoloConfirmationRequestMessage
{
StoredInState = new Guid("309088d8-9906-4ef3-bc64-56976538d3ab")
}).Result;
Console.WriteLine(response.ShouldMatchState);
*/
//connection.GetPublisher().Publish(new FlowEndController.PingMessage());
container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true).Wait();
container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.TestParallelRequest).Wait();
Thread.Sleep(1000);
//var emitter = container.GetInstance<MarcoEmitter>();
//emitter.Run().Wait();
Console.WriteLine("Press any Enter to continue");
Console.ReadLine();
}
}
//catch (Exception e)
{
// Console.WriteLine(e.ToString());
// Console.ReadKey();
}
}
}
}

View File

@ -1,18 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti.Annotations\Tapeti.Annotations.csproj" />
<ProjectReference Include="..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
<ProjectReference Include="..\Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj" />
<ProjectReference Include="..\Tapeti.Flow\Tapeti.Flow.csproj" />
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\Tapeti.Transient\Tapeti.Transient.csproj" />
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>
</Project>

View File

@ -1,22 +0,0 @@
using System;
using System.Threading.Tasks;
// ReSharper disable UnusedMember.Global
namespace Test
{
public class Visualizer
{
public Task VisualizeMarco()
{
Console.WriteLine("Marco!");
return Task.CompletedTask;
}
public Task VisualizePolo()
{
Console.WriteLine("Polo!");
return Task.CompletedTask;
}
}
}