From 77de28d8b88ccd1a9f9204fa7a63d8eadedaee36 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 17 Nov 2016 17:33:27 +0100 Subject: [PATCH] Added TaskQueue --- Connection/TapetiPublisher.cs | 10 +++++++++- Connection/TapetiSubscriber.cs | 4 ++-- Connection/TapetiWorker.cs | 28 ++++++++++++++++++++++++++-- Connection/TaskQueue.cs | 24 ++++++++++++++++++++++++ IPublisher.cs | 5 ++++- Tapeti.csproj | 1 + 6 files changed, 66 insertions(+), 6 deletions(-) create mode 100644 Connection/TaskQueue.cs diff --git a/Connection/TapetiPublisher.cs b/Connection/TapetiPublisher.cs index 60af7c6..ca021f9 100644 --- a/Connection/TapetiPublisher.cs +++ b/Connection/TapetiPublisher.cs @@ -1,4 +1,6 @@ -namespace Tapeti.Connection +using System.Threading.Tasks; + +namespace Tapeti.Connection { public class TapetiPublisher : IPublisher { @@ -9,5 +11,11 @@ { this.worker = worker; } + + + public Task Publish(object message) + { + return worker.Publish(message); + } } } diff --git a/Connection/TapetiSubscriber.cs b/Connection/TapetiSubscriber.cs index c83fc70..7adb01e 100644 --- a/Connection/TapetiSubscriber.cs +++ b/Connection/TapetiSubscriber.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Threading.Tasks; namespace Tapeti.Connection { @@ -12,14 +13,13 @@ namespace Tapeti.Connection this.worker = worker; ApplyTopology(registrations); - } private void ApplyTopology(IEnumerable registrations) { foreach (var registration in registrations) - registration.ApplyTopology(worker.GetChannel()); + worker.ApplyTopology(registration); } } } diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index b1719ff..7cbe533 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -1,4 +1,5 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; using RabbitMQ.Client; namespace Tapeti.Connection @@ -14,6 +15,22 @@ namespace Tapeti.Connection private IConnection connection; private IModel channel; + private readonly Lazy taskQueue = new Lazy(); + + + public Task Publish(object message) + { + return taskQueue.Value.Add(() => + { + //GetChannel().BasicPublish(); + }); + } + + + public void ApplyTopology(IMessageHandlerRegistration registration) + { + registration.ApplyTopology(GetChannel()); + } public Task Close() @@ -24,6 +41,7 @@ namespace Tapeti.Connection channel = null; } + // ReSharper disable once InvertIf if (connection != null) { connection.Dispose(); @@ -34,7 +52,7 @@ namespace Tapeti.Connection } - public IModel GetChannel() + private IModel GetChannel() { if (channel != null) return channel; @@ -54,5 +72,11 @@ namespace Tapeti.Connection return channel; } + + + private class ScheduledWorkItem + { + + } } } diff --git a/Connection/TaskQueue.cs b/Connection/TaskQueue.cs new file mode 100644 index 0000000..0dae6b5 --- /dev/null +++ b/Connection/TaskQueue.cs @@ -0,0 +1,24 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Tapeti.Connection +{ + public class TaskQueue + { + private readonly object previousTaskLock = new object(); + private Task previousTask = Task.CompletedTask; + + + public Task Add(Action action) + { + lock (previousTaskLock) + { + previousTask = previousTask.ContinueWith(t => action(), CancellationToken.None + , TaskContinuationOptions.None + , TaskScheduler.Default); + return previousTask; + } + } + } +} diff --git a/IPublisher.cs b/IPublisher.cs index c92496a..f54128f 100644 --- a/IPublisher.cs +++ b/IPublisher.cs @@ -1,6 +1,9 @@ -namespace Tapeti +using System.Threading.Tasks; + +namespace Tapeti { public interface IPublisher { + Task Publish(object message); } } diff --git a/Tapeti.csproj b/Tapeti.csproj index aa5aedc..dae1b5d 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -50,6 +50,7 @@ +