Added TaskQueue

This commit is contained in:
Mark van Renswoude 2016-11-17 17:33:27 +01:00
parent 9d101f5d0d
commit 77de28d8b8
6 changed files with 66 additions and 6 deletions

View File

@ -1,4 +1,6 @@
namespace Tapeti.Connection
using System.Threading.Tasks;
namespace Tapeti.Connection
{
public class TapetiPublisher : IPublisher
{
@ -9,5 +11,11 @@
{
this.worker = worker;
}
public Task Publish(object message)
{
return worker.Publish(message);
}
}
}

View File

@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Tapeti.Connection
{
@ -12,14 +13,13 @@ namespace Tapeti.Connection
this.worker = worker;
ApplyTopology(registrations);
}
private void ApplyTopology(IEnumerable<IMessageHandlerRegistration> registrations)
{
foreach (var registration in registrations)
registration.ApplyTopology(worker.GetChannel());
worker.ApplyTopology(registration);
}
}
}

View File

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using RabbitMQ.Client;
namespace Tapeti.Connection
@ -14,6 +15,22 @@ namespace Tapeti.Connection
private IConnection connection;
private IModel channel;
private readonly Lazy<TaskQueue> taskQueue = new Lazy<TaskQueue>();
public Task Publish(object message)
{
return taskQueue.Value.Add(() =>
{
//GetChannel().BasicPublish();
});
}
public void ApplyTopology(IMessageHandlerRegistration registration)
{
registration.ApplyTopology(GetChannel());
}
public Task Close()
@ -24,6 +41,7 @@ namespace Tapeti.Connection
channel = null;
}
// ReSharper disable once InvertIf
if (connection != null)
{
connection.Dispose();
@ -34,7 +52,7 @@ namespace Tapeti.Connection
}
public IModel GetChannel()
private IModel GetChannel()
{
if (channel != null)
return channel;
@ -54,5 +72,11 @@ namespace Tapeti.Connection
return channel;
}
private class ScheduledWorkItem
{
}
}
}

24
Connection/TaskQueue.cs Normal file
View File

@ -0,0 +1,24 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Tapeti.Connection
{
public class TaskQueue
{
private readonly object previousTaskLock = new object();
private Task previousTask = Task.CompletedTask;
public Task Add(Action action)
{
lock (previousTaskLock)
{
previousTask = previousTask.ContinueWith(t => action(), CancellationToken.None
, TaskContinuationOptions.None
, TaskScheduler.Default);
return previousTask;
}
}
}
}

View File

@ -1,6 +1,9 @@
namespace Tapeti
using System.Threading.Tasks;
namespace Tapeti
{
public interface IPublisher
{
Task Publish(object message);
}
}

View File

@ -50,6 +50,7 @@
<Compile Include="Connection\TapetiPublisher.cs" />
<Compile Include="Connection\TapetiSubscriber.cs" />
<Compile Include="Connection\TapetiWorker.cs" />
<Compile Include="Connection\TaskQueue.cs" />
<Compile Include="Default\DefaultControllerFactory.cs" />
<Compile Include="Default\DefaultRoutingKeyStrategy.cs" />
<Compile Include="IControllerFactory.cs" />