diff --git a/Examples/05-SpeedTest/05-SpeedTest.csproj b/Examples/05-SpeedTest/05-SpeedTest.csproj new file mode 100644 index 0000000..f85333c --- /dev/null +++ b/Examples/05-SpeedTest/05-SpeedTest.csproj @@ -0,0 +1,19 @@ + + + + Exe + netcoreapp2.1 + _05_SpeedTest + + + + + + + + + + + + + diff --git a/Examples/05-SpeedTest/IMessageCounter.cs b/Examples/05-SpeedTest/IMessageCounter.cs new file mode 100644 index 0000000..e5e9aaf --- /dev/null +++ b/Examples/05-SpeedTest/IMessageCounter.cs @@ -0,0 +1,7 @@ +namespace _05_SpeedTest +{ + public interface IMessageCounter + { + void Add(); + } +} diff --git a/Examples/05-SpeedTest/Program.cs b/Examples/05-SpeedTest/Program.cs new file mode 100644 index 0000000..6c399d7 --- /dev/null +++ b/Examples/05-SpeedTest/Program.cs @@ -0,0 +1,140 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using ExampleLib; +using Messaging.TapetiExample; +using SimpleInjector; +using Tapeti; +using Tapeti.Default; +using Tapeti.SimpleInjector; + +namespace _05_SpeedTest +{ + public class Program + { + private const int MessageCount = 20000; + + // This does not make a massive difference, since internally Tapeti uses a single thread + // to perform all channel operations as recommended by the RabbitMQ .NET client library. + private const int ConcurrentTasks = 20; + + + public static void Main(string[] args) + { + var container = new Container(); + var dependencyResolver = new SimpleInjectorDependencyResolver(container); + + container.Register(); + + var helper = new ExampleConsoleApp(dependencyResolver); + helper.Run(MainAsync); + } + + + internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone) + { + var container = (IDependencyContainer)dependencyResolver; + container.RegisterDefaultSingleton(new MessageCounter(MessageCount, () => + { + var exampleState = dependencyResolver.Resolve(); + exampleState.Done(); + })); + + + + var config = new TapetiConfig(dependencyResolver) + // On a developer test machine, this makes the difference between 2200 messages/sec and 3000 messages/sec published. + // Interesting, but only if speed is more important than guaranteed delivery. + //.DisablePublisherConfirms() + .RegisterAllControllers() + .Build(); + + + using (var connection = new TapetiConnection(config)) + { + var subscriber = await connection.Subscribe(false); + + + var publisher = dependencyResolver.Resolve(); + Console.WriteLine($"Publishing {MessageCount} messages..."); + + var stopwatch = new Stopwatch(); + stopwatch.Start(); + + await PublishMessages(publisher); + + stopwatch.Stop(); + Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec"); + + + + Console.WriteLine("Consuming messages..."); + await subscriber.Resume(); + + stopwatch.Restart(); + + await waitForDone(); + + stopwatch.Stop(); + Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec"); + } + } + + + internal static async Task PublishMessages(IPublisher publisher) + { + var semaphore = new SemaphoreSlim(ConcurrentTasks); + var tasks = new List(); + + for (var i = 0; i < MessageCount; i++) + { + var item = i; + var task = Task.Run(async () => + { + try + { + await semaphore.WaitAsync(); + await publisher.Publish(new SpeedTestMessage + { + PublishCount = item + }); + } + finally + { + semaphore.Release(); + } + }); + + tasks.Add(task); + } + + await Task.WhenAll(tasks); + } + } + + + internal class MessageCounter : IMessageCounter + { + private readonly int max; + private readonly Action done; + private int count; + + + public MessageCounter(int max, Action done) + { + this.max = max; + this.done = done; + } + + + public void Add() + { + // With a prefetchcount > 1 the consumers are running in multiple threads, + // beware of this when using singletons. + if (Interlocked.Increment(ref count) == max) + done(); + } + } +} diff --git a/Examples/05-SpeedTest/SpeedMessageController.cs b/Examples/05-SpeedTest/SpeedMessageController.cs new file mode 100644 index 0000000..83af176 --- /dev/null +++ b/Examples/05-SpeedTest/SpeedMessageController.cs @@ -0,0 +1,23 @@ +using Messaging.TapetiExample; +using Tapeti.Annotations; + +namespace _05_SpeedTest +{ + [MessageController] + [DynamicQueue("tapeti.example.05")] + public class SpeedMessageController + { + private readonly IMessageCounter messageCounter; + + public SpeedMessageController(IMessageCounter messageCounter) + { + this.messageCounter = messageCounter; + } + + + public void HandleSpeedTestMessage(SpeedTestMessage message) + { + messageCounter.Add(); + } + } +} diff --git a/Examples/Messaging.TapetiExample/SpeedTestMessage.cs b/Examples/Messaging.TapetiExample/SpeedTestMessage.cs new file mode 100644 index 0000000..08e03de --- /dev/null +++ b/Examples/Messaging.TapetiExample/SpeedTestMessage.cs @@ -0,0 +1,9 @@ +using System.ComponentModel.DataAnnotations; + +namespace Messaging.TapetiExample +{ + public class SpeedTestMessage + { + public int PublishCount { get; set; } + } +} diff --git a/Tapeti.sln b/Tapeti.sln index de43216..801f5ae 100644 --- a/Tapeti.sln +++ b/Tapeti.sln @@ -15,8 +15,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Flow.SQL", "Tapeti.F EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.SimpleInjector", "Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj", "{A190C736-E95A-4BDA-AA80-6211226DFCAD}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Test", "Test\Test.csproj", "{1A4B7136-B7DF-41EA-BEA2-E87B4607D420}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Tests", "Tapeti.Tests\Tapeti.Tests.csproj", "{334F3715-63CF-4D13-B09A-38E2A616D4F5}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Serilog", "Tapeti.Serilog\Tapeti.Serilog.csproj", "{43AA5DF3-49D5-4795-A290-D6511502B564}" @@ -37,7 +35,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "02-DeclareDurableQueues", " EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "03-FlowRequestResponse", "Examples\03-FlowRequestResponse\03-FlowRequestResponse.csproj", "{463A12CE-E221-450D-ADEA-91A599612DFA}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "04-Transient", "Examples\04-Transient\04-Transient.csproj", "{46DFC131-A398-435F-A7DF-3C41B656BF11}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "04-Transient", "Examples\04-Transient\04-Transient.csproj", "{46DFC131-A398-435F-A7DF-3C41B656BF11}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "05-SpeedTest", "Examples\05-SpeedTest\05-SpeedTest.csproj", "{330D05CE-5321-4C7D-8017-2070B891289E}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -69,10 +69,6 @@ Global {A190C736-E95A-4BDA-AA80-6211226DFCAD}.Debug|Any CPU.Build.0 = Debug|Any CPU {A190C736-E95A-4BDA-AA80-6211226DFCAD}.Release|Any CPU.ActiveCfg = Release|Any CPU {A190C736-E95A-4BDA-AA80-6211226DFCAD}.Release|Any CPU.Build.0 = Release|Any CPU - {1A4B7136-B7DF-41EA-BEA2-E87B4607D420}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {1A4B7136-B7DF-41EA-BEA2-E87B4607D420}.Debug|Any CPU.Build.0 = Debug|Any CPU - {1A4B7136-B7DF-41EA-BEA2-E87B4607D420}.Release|Any CPU.ActiveCfg = Release|Any CPU - {1A4B7136-B7DF-41EA-BEA2-E87B4607D420}.Release|Any CPU.Build.0 = Release|Any CPU {334F3715-63CF-4D13-B09A-38E2A616D4F5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {334F3715-63CF-4D13-B09A-38E2A616D4F5}.Debug|Any CPU.Build.0 = Debug|Any CPU {334F3715-63CF-4D13-B09A-38E2A616D4F5}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -113,6 +109,10 @@ Global {46DFC131-A398-435F-A7DF-3C41B656BF11}.Debug|Any CPU.Build.0 = Debug|Any CPU {46DFC131-A398-435F-A7DF-3C41B656BF11}.Release|Any CPU.ActiveCfg = Release|Any CPU {46DFC131-A398-435F-A7DF-3C41B656BF11}.Release|Any CPU.Build.0 = Release|Any CPU + {330D05CE-5321-4C7D-8017-2070B891289E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {330D05CE-5321-4C7D-8017-2070B891289E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {330D05CE-5321-4C7D-8017-2070B891289E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {330D05CE-5321-4C7D-8017-2070B891289E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -124,6 +124,7 @@ Global {85511282-EF91-4B56-B7DC-9E8706556D6E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56} {463A12CE-E221-450D-ADEA-91A599612DFA} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56} {46DFC131-A398-435F-A7DF-3C41B656BF11} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56} + {330D05CE-5321-4C7D-8017-2070B891289E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B09CC2BF-B2AF-4CB6-8728-5D1D8E5C50FA} diff --git a/Test/App.config b/Test/App.config deleted file mode 100644 index b55cd45..0000000 --- a/Test/App.config +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/Test/FlowEndController.cs b/Test/FlowEndController.cs deleted file mode 100644 index 0184339..0000000 --- a/Test/FlowEndController.cs +++ /dev/null @@ -1,81 +0,0 @@ -using System; -using Tapeti.Annotations; -using Tapeti.Flow; -using Tapeti.Flow.Annotations; - -// ReSharper disable UnusedMember.Global - -namespace Test -{ - [MessageController] - [DynamicQueue] - public class FlowEndController - { - private readonly IFlowProvider flowProvider; - - public FlowEndController(IFlowProvider flowProvider) - { - this.flowProvider = flowProvider; - } - - - [Start] - public IYieldPoint StartFlow(PingMessage message) - { - Console.WriteLine("PingMessage received, calling flowProvider.End() directly"); - - if (DateTime.Now < new DateTime(2000, 1, 1)) - { - //never true - return flowProvider - .YieldWithRequestSync - (new PingConfirmationRequestMessage { StoredInState = "Ping:" }, - HandlePingConfirmationResponse); - } - - return Finish(); - } - - - [Continuation] - public IYieldPoint HandlePingConfirmationResponse(PingConfirmationResponseMessage msg) - { - Console.WriteLine("Ending ping flow: " + msg.Answer); - return Finish(); - } - - - private IYieldPoint Finish() - { - return flowProvider.End(); - } - - - public class PingMessage - { - - } - - [Request(Response = typeof(PingConfirmationResponseMessage))] - public class PingConfirmationRequestMessage - { - public string StoredInState { get; set; } - } - - - public class PingConfirmationResponseMessage - { - public string Answer { get; set; } - } - - public PingConfirmationResponseMessage PingConfirmation(PingConfirmationRequestMessage message) - { - Console.WriteLine(">> receive Ping (returning pong)"); - - return new PingConfirmationResponseMessage - { - Answer = message.StoredInState + " Pong!" - }; - } - } -} diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs deleted file mode 100644 index e16a1eb..0000000 --- a/Test/MarcoController.cs +++ /dev/null @@ -1,188 +0,0 @@ -using System; -using System.ComponentModel.DataAnnotations; -using System.Threading.Tasks; -using Tapeti.Annotations; -using Tapeti.Flow; -using Tapeti.Flow.Annotations; - -// ReSharper disable UnusedMember.Global - -namespace Test -{ - [MessageController] - [DynamicQueue] - public class MarcoController - { - //private readonly IPublisher publisher; - private readonly IFlowProvider flowProvider; - //private readonly Visualizer visualizer; - - // Public properties are automatically stored and retrieved while in a flow - public Guid StateTestGuid { get; set; } - - public int Phase; - - public MarcoController(/*IPublisher publisher, */IFlowProvider flowProvider/*, Visualizer visualizer*/) - { - //this.publisher = publisher; - this.flowProvider = flowProvider; - //this.visualizer = visualizer; - } - - - [Start] - public async Task StartFlow(bool go) - { - Console.WriteLine("Phase = " + Phase + " Starting stand-alone flow"); - await Task.Delay(10); - - Phase = 1; - - if (go) - return flowProvider.YieldWithRequestSync - (new PoloConfirmationRequestMessage(), - HandlePoloConfirmationResponse); - - Console.WriteLine("Phase = " + Phase + " Ending stand-alone flow prematurely"); - return flowProvider.End(); - } - - - [Continuation] - public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage msg) - { - Console.WriteLine("Phase = " + Phase + " Handling the first response and sending the second..."); - - Phase = 2; - - return flowProvider.YieldWithRequestSync - (new PoloConfirmationRequestMessage(), - HandlePoloConfirmationResponseEnd); - } - - - [Continuation] - public IYieldPoint HandlePoloConfirmationResponseEnd(PoloConfirmationResponseMessage msg) - { - Console.WriteLine("Phase = " + Phase + " Handling the second response and Ending stand-alone flow"); - return flowProvider.End(); - } - - - [Start] - public IYieldPoint TestParallelRequest() - { - Console.WriteLine(">> Marco (yielding with request)"); - - StateTestGuid = Guid.NewGuid(); - Console.WriteLine($"Starting parallel request with StateTestGuid {StateTestGuid}"); - - return flowProvider.YieldWithParallelRequest() - .AddRequestSync(new PoloConfirmationRequestMessage - { - StoredInState = StateTestGuid, - EnumValue = TestEnum.Value1 - - }, HandlePoloConfirmationResponse1) - - .AddRequestSync(new PoloConfirmationRequestMessage - { - StoredInState = StateTestGuid, - EnumValue = TestEnum.Value2, - OptionalEnumValue = TestEnum.Value1 - }, HandlePoloConfirmationResponse2) - - .YieldSync(ContinuePoloConfirmation); - } - - - [Continuation] - public void HandlePoloConfirmationResponse1(PoloConfirmationResponseMessage message) - { - Console.WriteLine(">> HandlePoloConfirmationResponse1"); - Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!"); - } - - - [Continuation] - public void HandlePoloConfirmationResponse2(PoloConfirmationResponseMessage message) - { - Console.WriteLine(">> HandlePoloConfirmationResponse2"); - Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!"); - } - - - private IYieldPoint ContinuePoloConfirmation() - { - Console.WriteLine("> ConvergePoloConfirmation (ending flow)"); - return flowProvider.End(); - } - - - /** - * For simple request response patterns, the return type can be used. - * This will automatically include the correlationId in the response and - * use the replyTo header of the request if provided. - */ - [DurableQueue("tapeti.test.durable")] - public async Task PoloConfirmation(PoloConfirmationRequestMessage message) - { - Console.WriteLine(">> PoloConfirmation (returning confirmation)"); - await Task.Delay(100); - - return new PoloConfirmationResponseMessage - { - ShouldMatchState = message.StoredInState, - EnumValue = message.EnumValue, - OptionalEnumValue = message.OptionalEnumValue - }; - } - - - - [DynamicQueue("custom.prefix")] - public void Polo(PoloMessage message) - { - Console.WriteLine(">> Polo"); - } - } - - - public enum TestEnum - { - Value1, - Value2 - } - - - [Request(Response = typeof(PoloMessage))] - public class MarcoMessage - { - } - - - public class PoloMessage - { - } - - - [Request(Response = typeof(PoloConfirmationResponseMessage))] - public class PoloConfirmationRequestMessage - { - [Required] - public Guid StoredInState { get; set; } - - public TestEnum EnumValue; - public TestEnum? OptionalEnumValue; - } - - - public class PoloConfirmationResponseMessage - { - [Required] - public Guid ShouldMatchState { get; set; } - - public TestEnum EnumValue; - public TestEnum? OptionalEnumValue; - } -} diff --git a/Test/MarcoEmitter.cs b/Test/MarcoEmitter.cs deleted file mode 100644 index e584739..0000000 --- a/Test/MarcoEmitter.cs +++ /dev/null @@ -1,52 +0,0 @@ -using System.Threading.Tasks; - -// ReSharper disable UnusedMember.Global - -namespace Test -{ - public class MarcoEmitter - { - //private readonly IPublisher publisher; - - - /*public MarcoEmitter(IPublisher publisher) - { - this.publisher = publisher; - } - */ - - - public async Task Run() - { - //await publisher.Publish(new MarcoMessage()); - - /* - var concurrent = new SemaphoreSlim(20); - - while (true) - { - for (var x = 0; x < 200; x++) - { - await concurrent.WaitAsync(); - try - { - await publisher.Publish(new MarcoMessage()); - } - finally - { - concurrent.Release(); - } - } - - await Task.Delay(200); - } - */ - - while (true) - { - await Task.Delay(1000); - } - // ReSharper disable once FunctionNeverReturns - } - } -} diff --git a/Test/Program.cs b/Test/Program.cs deleted file mode 100644 index b1291c1..0000000 --- a/Test/Program.cs +++ /dev/null @@ -1,89 +0,0 @@ -using System; -using SimpleInjector; -using Tapeti; -using Tapeti.DataAnnotations; -using Tapeti.Flow; -using Tapeti.SimpleInjector; -using System.Threading; -using Tapeti.Transient; - -namespace Test -{ - internal class Program - { - private static void Main() - { - // TODO logging - //try - { - var container = new Container(); - container.Register(); - container.Register(); - container.Register(); - - var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) - //.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true") - .WithFlow() - .WithDataAnnotations() - .WithTransient(TimeSpan.FromSeconds(30)) - .EnableDeclareDurableQueues() - .RegisterAllControllers() - //.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails - .Build(); - - using (var connection = new TapetiConnection(config) - { - Params = new TapetiAppSettingsConnectionParams() - }) - { - var flowStore = container.GetInstance(); - var flowStore2 = container.GetInstance(); - Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2)); - - flowStore.Load().Wait(); - - connection.Connected += (sender, e) => { Console.WriteLine("Event Connected"); }; - connection.Disconnected += (sender, e) => { Console.WriteLine("Event Disconnected"); }; - connection.Reconnected += (sender, e) => { Console.WriteLine("Event Reconnected"); }; - - Console.WriteLine("Subscribing..."); - var subscriber = connection.Subscribe(false).Result; - - Console.WriteLine("Consuming..."); - subscriber.Resume().Wait(); - - Console.WriteLine("Done!"); - - /* - var response = container.GetInstance() - .RequestResponse( - new PoloConfirmationRequestMessage - { - StoredInState = new Guid("309088d8-9906-4ef3-bc64-56976538d3ab") - }).Result; - - Console.WriteLine(response.ShouldMatchState); - */ - - //connection.GetPublisher().Publish(new FlowEndController.PingMessage()); - - container.GetInstance().Start(c => c.StartFlow, true).Wait(); - container.GetInstance().Start(c => c.TestParallelRequest).Wait(); - - Thread.Sleep(1000); - - //var emitter = container.GetInstance(); - //emitter.Run().Wait(); - - Console.WriteLine("Press any Enter to continue"); - Console.ReadLine(); - } - } - //catch (Exception e) - { - // Console.WriteLine(e.ToString()); - // Console.ReadKey(); - } - } - } -} diff --git a/Test/Test.csproj b/Test/Test.csproj deleted file mode 100644 index 00f9c1e..0000000 --- a/Test/Test.csproj +++ /dev/null @@ -1,18 +0,0 @@ - - - - Exe - netcoreapp2.1 - - - - - - - - - - - - - diff --git a/Test/Visualizer.cs b/Test/Visualizer.cs deleted file mode 100644 index c99af85..0000000 --- a/Test/Visualizer.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; -using System.Threading.Tasks; - -// ReSharper disable UnusedMember.Global - -namespace Test -{ - public class Visualizer - { - public Task VisualizeMarco() - { - Console.WriteLine("Marco!"); - return Task.CompletedTask; - } - - public Task VisualizePolo() - { - Console.WriteLine("Polo!"); - return Task.CompletedTask; - } - } -}