1
0
mirror of synced 2024-11-21 17:03:50 +00:00

Merge branch 'release/2.0' into master

This commit is contained in:
Mark van Renswoude 2019-10-21 14:05:58 +02:00
commit 9351861fcc
218 changed files with 9090 additions and 3070 deletions

2
.gitignore vendored
View File

@ -8,3 +8,5 @@ packages/
publish/
*.sublime-workspace
docs/_build/
Tapeti.Cmd/Properties/launchSettings.json

View File

@ -0,0 +1,29 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>_01_PublishSubscribe</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Autofac" Version="4.9.4" />
<PackageReference Include="Castle.Windsor" Version="5.0.0" />
<PackageReference Include="Ninject" Version="3.3.4" />
<PackageReference Include="SimpleInjector" Version="4.6.2" />
<PackageReference Include="Unity" Version="5.11.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Tapeti.Autofac\Tapeti.Autofac.csproj" />
<ProjectReference Include="..\..\Tapeti.CastleWindsor\Tapeti.CastleWindsor.csproj" />
<ProjectReference Include="..\..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
<ProjectReference Include="..\..\Tapeti.Ninject\Tapeti.Ninject.csproj" />
<ProjectReference Include="..\..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\..\Tapeti.UnityContainer\Tapeti.UnityContainer.csproj" />
<ProjectReference Include="..\..\Tapeti\Tapeti.csproj" />
<ProjectReference Include="..\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,27 @@
using System;
using ExampleLib;
using Messaging.TapetiExample;
using Tapeti.Annotations;
namespace _01_PublishSubscribe
{
[MessageController]
[DynamicQueue("tapeti.example.01")]
public class ExampleMessageController
{
private readonly IExampleState exampleState;
public ExampleMessageController(IExampleState exampleState)
{
this.exampleState = exampleState;
}
public void HandlePublishSubscribeMessage(PublishSubscribeMessage message)
{
Console.WriteLine("Received message: " + message.Greeting);
exampleState.Done();
}
}
}

View File

@ -0,0 +1,45 @@
using System;
using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks;
using Messaging.TapetiExample;
using Tapeti;
namespace _01_PublishSubscribe
{
public class ExamplePublisher
{
private readonly IPublisher publisher;
/// <summary>
/// Shows that the IPublisher is registered in the container by Tapeti
/// </summary>
/// <param name="publisher"></param>
public ExamplePublisher(IPublisher publisher)
{
this.publisher = publisher;
}
public async Task SendTestMessage()
{
await publisher.Publish(new PublishSubscribeMessage
{
Greeting = "Hello world of messaging!"
});
// Demonstrates what happens when DataAnnotations is enabled
// and the message is invalid
try
{
await publisher.Publish(new PublishSubscribeMessage());
Console.WriteLine("This is not supposed to show. Did you disable the DataAnnotations extension?");
}
catch (ValidationException e)
{
Console.WriteLine("As expected, the DataAnnotations check failed: " + e.Message);
}
}
}
}

View File

@ -0,0 +1,154 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Autofac;
using Castle.MicroKernel.Registration;
using Castle.Windsor;
using ExampleLib;
using Ninject;
using Tapeti;
using Tapeti.Autofac;
using Tapeti.CastleWindsor;
using Tapeti.DataAnnotations;
using Tapeti.Default;
using Tapeti.Ninject;
using Tapeti.SimpleInjector;
using Tapeti.UnityContainer;
using Unity;
using Container = SimpleInjector.Container;
namespace _01_PublishSubscribe
{
public class Program
{
public static void Main(string[] args)
{
var dependencyResolver = GetSimpleInjectorDependencyResolver();
// or use your IoC container of choice:
//var dependencyResolver = GetAutofacDependencyResolver();
//var dependencyResolver = GetCastleWindsorDependencyResolver();
//var dependencyResolver = GetUnityDependencyResolver();
//var dependencyResolver = GetNinjectDependencyResolver();
// This helper is used because this example is not run as a service. You do not
// need it in your own applications.
var helper = new ExampleConsoleApp(dependencyResolver);
helper.Run(MainAsync);
}
internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func<Task> waitForDone)
{
var config = new TapetiConfig(dependencyResolver)
.WithDataAnnotations()
.RegisterAllControllers()
.Build();
using (var connection = new TapetiConnection(config)
{
// Params is optional if you want to use the defaults, but we'll set it
// explicitly for this example
Params = new TapetiConnectionParams
{
HostName = "localhost",
Username = "guest",
Password = "guest",
// These properties allow you to identify the connection in the RabbitMQ Management interface
ClientProperties = new Dictionary<string, string>
{
{ "example", "01 - Publish Subscribe" }
}
}
})
{
// IoC containers that separate the builder from the resolver (Autofac) must be built after
// creating a TapetConnection, as it modifies the container by injecting IPublisher.
(dependencyResolver as AutofacDependencyResolver)?.Build();
// Create the queues and start consuming immediately.
// If you need to do some processing before processing messages, but after the
// queues have initialized, pass false as the startConsuming parameter and store
// the returned ISubscriber. Then call Resume on it later.
await connection.Subscribe();
// We could get an IPublisher from the container directly, but since you'll usually use
// it as an injected constructor parameter this shows
await dependencyResolver.Resolve<ExamplePublisher>().SendTestMessage();
// Wait for the controller to signal that the message has been received
await waitForDone();
}
}
internal static IDependencyContainer GetSimpleInjectorDependencyResolver()
{
var container = new Container();
container.Register<ILogger, ConsoleLogger>();
container.Register<ExamplePublisher>();
return new SimpleInjectorDependencyResolver(container);
}
internal static IDependencyContainer GetAutofacDependencyResolver()
{
var containerBuilder = new ContainerBuilder();
containerBuilder
.RegisterType<ConsoleLogger>()
.As<ILogger>();
containerBuilder
.RegisterType<ExamplePublisher>()
.AsSelf();
return new AutofacDependencyResolver(containerBuilder);
}
internal static IDependencyContainer GetCastleWindsorDependencyResolver()
{
var container = new WindsorContainer();
// This exact combination is registered by TapetiConfig when running in a console,
// and Windsor will throw an exception for that. This is specific to the WindsorDependencyResolver as it
// relies on the "first one wins" behaviour of Windsor and does not check the registrations.
//
// You can of course register another ILogger instead, like DevNullLogger.
//container.Register(Component.For<ILogger>().ImplementedBy<ConsoleLogger>());
container.Register(Component.For<ExamplePublisher>());
return new WindsorDependencyResolver(container);
}
internal static IDependencyContainer GetUnityDependencyResolver()
{
var container = new UnityContainer();
container.RegisterType<ILogger, ConsoleLogger>();
container.RegisterType<ExamplePublisher>();
return new UnityDependencyResolver(container);
}
internal static IDependencyContainer GetNinjectDependencyResolver()
{
var kernel = new StandardKernel();
kernel.Bind<ILogger>().To<ConsoleLogger>();
kernel.Bind<ExamplePublisher>().ToSelf();
return new NinjectDependencyResolver(kernel);
}
}
}

View File

@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>_02_DeclareDurableQueues</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.6.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
<ProjectReference Include="..\..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\..\Tapeti\Tapeti.csproj" />
<ProjectReference Include="..\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,28 @@
using System;
using ExampleLib;
using Messaging.TapetiExample;
using Tapeti.Annotations;
namespace _02_DeclareDurableQueues
{
[MessageController]
[DurableQueue("tapeti.example.02")]
public class ExampleMessageController
{
private readonly IExampleState exampleState;
public ExampleMessageController(IExampleState exampleState)
{
this.exampleState = exampleState;
}
public void HandlePublishSubscribeMessage(PublishSubscribeMessage message)
{
// Note that if you run example 01 after 02, it's message will also be in this durable queue
Console.WriteLine("Received message: " + message.Greeting);
exampleState.Done();
}
}
}

View File

@ -0,0 +1,48 @@
using System;
using System.Threading.Tasks;
using ExampleLib;
using Messaging.TapetiExample;
using SimpleInjector;
using Tapeti;
using Tapeti.Default;
using Tapeti.SimpleInjector;
namespace _02_DeclareDurableQueues
{
public class Program
{
public static void Main(string[] args)
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
container.Register<ILogger, ConsoleLogger>();
var helper = new ExampleConsoleApp(dependencyResolver);
helper.Run(MainAsync);
}
internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func<Task> waitForDone)
{
var config = new TapetiConfig(dependencyResolver)
.RegisterAllControllers()
.EnableDeclareDurableQueues()
.Build();
using (var connection = new TapetiConnection(config))
{
// This creates or updates the durable queue
await connection.Subscribe();
await dependencyResolver.Resolve<IPublisher>().Publish(new PublishSubscribeMessage
{
Greeting = "Hello durable queue!"
});
// Wait for the controller to signal that the message has been received
await waitForDone();
}
}
}
}

View File

@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>_03_FlowRequestResponse</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.6.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
<ProjectReference Include="..\..\Tapeti.Flow\Tapeti.Flow.csproj" />
<ProjectReference Include="..\..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\..\Tapeti\Tapeti.csproj" />
<ProjectReference Include="..\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,73 @@
using System;
using ExampleLib;
using Messaging.TapetiExample;
using Tapeti.Annotations;
using Tapeti.Flow;
using Tapeti.Flow.Annotations;
namespace _03_FlowRequestResponse
{
[MessageController]
[DynamicQueue("tapeti.example.03")]
public class ParallelFlowController
{
private readonly IFlowProvider flowProvider;
private readonly IExampleState exampleState;
public string FirstQuote;
public string SecondQuote;
public ParallelFlowController(IFlowProvider flowProvider, IExampleState exampleState)
{
this.flowProvider = flowProvider;
this.exampleState = exampleState;
}
[Start]
public IYieldPoint StartFlow()
{
return flowProvider.YieldWithParallelRequest()
.AddRequestSync<QuoteRequestMessage, QuoteResponseMessage>(
new QuoteRequestMessage
{
Amount = 1
},
HandleFirstQuoteResponse)
.AddRequestSync<QuoteRequestMessage, QuoteResponseMessage>(
new QuoteRequestMessage
{
Amount = 2
},
HandleSecondQuoteResponse)
.YieldSync(AllQuotesReceived);
}
[Continuation]
public void HandleFirstQuoteResponse(QuoteResponseMessage message)
{
Console.WriteLine("[ParallelFlowController] First quote response received");
FirstQuote = message.Quote;
}
[Continuation]
public void HandleSecondQuoteResponse(QuoteResponseMessage message)
{
Console.WriteLine("[ParallelFlowController] Second quote response received");
SecondQuote = message.Quote;
}
private IYieldPoint AllQuotesReceived()
{
Console.WriteLine("[ParallelFlowController] First quote: " + FirstQuote);
Console.WriteLine("[ParallelFlowController] Second quote: " + SecondQuote);
exampleState.Done();
return flowProvider.End();
}
}
}

View File

@ -0,0 +1,66 @@
using System;
using System.Threading.Tasks;
using ExampleLib;
using SimpleInjector;
using Tapeti;
using Tapeti.DataAnnotations;
using Tapeti.Default;
using Tapeti.Flow;
using Tapeti.SimpleInjector;
namespace _03_FlowRequestResponse
{
public class Program
{
public static void Main(string[] args)
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
container.Register<ILogger, ConsoleLogger>();
var helper = new ExampleConsoleApp(dependencyResolver);
helper.Run(MainAsync);
}
internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func<Task> waitForDone)
{
var config = new TapetiConfig(dependencyResolver)
.WithDataAnnotations()
.WithFlow()
.RegisterAllControllers()
.Build();
using (var connection = new TapetiConnection(config))
{
// Must be called before using any flow. When using a persistent repository like the
// SQL server implementation, you can run any required update scripts (for example, using DbUp)
// before calling this Load method.
// Call after creating the TapetiConnection, as it modifies the container to inject IPublisher.
await dependencyResolver.Resolve<IFlowStore>().Load();
await connection.Subscribe();
var flowStarter = dependencyResolver.Resolve<IFlowStarter>();
var startData = new SimpleFlowController.StartData
{
RequestStartTime = DateTime.Now,
Amount = 1
};
await flowStarter.Start<SimpleFlowController, SimpleFlowController.StartData>(c => c.StartFlow, startData);
await flowStarter.Start<ParallelFlowController>(c => c.StartFlow);
// Wait for the controller to signal that the message has been received
await waitForDone();
}
}
}
}

View File

@ -0,0 +1,42 @@
using System.Threading.Tasks;
using Messaging.TapetiExample;
using Tapeti.Annotations;
namespace _03_FlowRequestResponse
{
[MessageController]
[DynamicQueue("tapeti.example.03")]
public class ReceivingMessageController
{
// No publisher required, responses can simply be returned
public async Task<QuoteResponseMessage> HandleQuoteRequest(QuoteRequestMessage message)
{
string quote;
switch (message.Amount)
{
case 1:
// Well, they asked for it... :-)
quote = "'";
break;
case 2:
quote = "\"";
break;
default:
// We have to return a response.
quote = null;
break;
}
// Just gonna let them wait for a bit, to demonstrate async message handlers
await Task.Delay(1000);
return new QuoteResponseMessage
{
Quote = quote
};
}
}
}

View File

@ -0,0 +1,73 @@
using System;
using ExampleLib;
using Messaging.TapetiExample;
using Tapeti.Annotations;
using Tapeti.Flow;
using Tapeti.Flow.Annotations;
namespace _03_FlowRequestResponse
{
[MessageController]
[DynamicQueue("tapeti.example.03")]
public class SimpleFlowController
{
private readonly IFlowProvider flowProvider;
private readonly IExampleState exampleState;
// Shows how multiple values can be passed to a start method
public struct StartData
{
public DateTime RequestStartTime;
public int Amount;
}
// Private and protected fields are lost between method calls because the controller is
// recreated when a response arrives. When using a persistent flow repository this may
// even be after a restart of the application.
private bool nonPersistentState;
// Public fields will be stored.
public DateTime RequestStartTime;
public SimpleFlowController(IFlowProvider flowProvider, IExampleState exampleState)
{
this.flowProvider = flowProvider;
this.exampleState = exampleState;
}
[Start]
public IYieldPoint StartFlow(StartData startData)
{
nonPersistentState = true;
RequestStartTime = startData.RequestStartTime;
return flowProvider.YieldWithRequestSync<QuoteRequestMessage, QuoteResponseMessage>(
new QuoteRequestMessage
{
Amount = startData.Amount
},
HandleQuoteResponse);
}
[Continuation]
public IYieldPoint HandleQuoteResponse(QuoteResponseMessage message)
{
if (nonPersistentState)
Console.WriteLine("[SimpleFlowController] This is not supposed to show. NonPersistentState should not be retained. Someone please check http://www.hasthelargehadroncolliderdestroyedtheworldyet.com.");
Console.WriteLine("[SimpleFlowController] Request start: " + RequestStartTime.ToLongTimeString());
Console.WriteLine("[SimpleFlowController] Response time: " + DateTime.Now.ToLongTimeString());
Console.WriteLine("[SimpleFlowController] Quote: " + message.Quote);
exampleState.Done();
return flowProvider.End();
}
}
}

View File

@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>_04_Transient</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.6.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
<ProjectReference Include="..\..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\..\Tapeti.Transient\Tapeti.Transient.csproj" />
<ProjectReference Include="..\..\Tapeti\Tapeti.csproj" />
<ProjectReference Include="..\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,55 @@
using System;
using System.Threading.Tasks;
using ExampleLib;
using Messaging.TapetiExample;
using SimpleInjector;
using Tapeti;
using Tapeti.DataAnnotations;
using Tapeti.Default;
using Tapeti.SimpleInjector;
using Tapeti.Transient;
namespace _04_Transient
{
public class Program
{
public static void Main(string[] args)
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
container.Register<ILogger, ConsoleLogger>();
var helper = new ExampleConsoleApp(dependencyResolver);
helper.Run(MainAsync);
}
internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func<Task> waitForDone)
{
var config = new TapetiConfig(dependencyResolver)
.WithDataAnnotations()
.WithTransient(TimeSpan.FromSeconds(5), "tapeti.example.04.transient")
.RegisterAllControllers()
.Build();
using (var connection = new TapetiConnection(config))
{
await connection.Subscribe();
Console.WriteLine("Sending request...");
var transientPublisher = dependencyResolver.Resolve<ITransientPublisher>();
var response = await transientPublisher.RequestResponse<LoggedInUsersRequestMessage, LoggedInUsersResponseMessage>(
new LoggedInUsersRequestMessage());
Console.WriteLine("Response: " + response.Count);
// Unlike the other example, there is no need to call waitForDone, once we're here the response has been handled.
}
}
}
}

View File

@ -0,0 +1,24 @@
using System;
using System.Threading.Tasks;
using Messaging.TapetiExample;
using Tapeti.Annotations;
namespace _04_Transient
{
[MessageController]
[DynamicQueue("tapeti.example.04")]
public class UsersMessageController
{
// No publisher required, responses can simply be returned
public async Task<LoggedInUsersResponseMessage> HandleQuoteRequest(LoggedInUsersRequestMessage message)
{
// Simulate the response taking some time
await Task.Delay(1000);
return new LoggedInUsersResponseMessage
{
Count = new Random().Next(0, 100)
};
}
}
}

View File

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<RootNamespace>_05_SpeedTest</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.6.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,7 @@
namespace _05_SpeedTest
{
public interface IMessageCounter
{
void Add();
}
}

View File

@ -0,0 +1,140 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using ExampleLib;
using Messaging.TapetiExample;
using SimpleInjector;
using Tapeti;
using Tapeti.Default;
using Tapeti.SimpleInjector;
namespace _05_SpeedTest
{
public class Program
{
private const int MessageCount = 20000;
// This does not make a massive difference, since internally Tapeti uses a single thread
// to perform all channel operations as recommended by the RabbitMQ .NET client library.
private const int ConcurrentTasks = 20;
public static void Main(string[] args)
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
container.Register<ILogger, ConsoleLogger>();
var helper = new ExampleConsoleApp(dependencyResolver);
helper.Run(MainAsync);
}
internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func<Task> waitForDone)
{
var container = (IDependencyContainer)dependencyResolver;
container.RegisterDefaultSingleton<IMessageCounter>(new MessageCounter(MessageCount, () =>
{
var exampleState = dependencyResolver.Resolve<IExampleState>();
exampleState.Done();
}));
var config = new TapetiConfig(dependencyResolver)
// On a developer test machine, this makes the difference between 2200 messages/sec and 3000 messages/sec published.
// Interesting, but only if speed is more important than guaranteed delivery.
//.DisablePublisherConfirms()
.RegisterAllControllers()
.Build();
using (var connection = new TapetiConnection(config))
{
var subscriber = await connection.Subscribe(false);
var publisher = dependencyResolver.Resolve<IPublisher>();
Console.WriteLine($"Publishing {MessageCount} messages...");
var stopwatch = new Stopwatch();
stopwatch.Start();
await PublishMessages(publisher);
stopwatch.Stop();
Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
Console.WriteLine("Consuming messages...");
await subscriber.Resume();
stopwatch.Restart();
await waitForDone();
stopwatch.Stop();
Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
}
}
internal static async Task PublishMessages(IPublisher publisher)
{
var semaphore = new SemaphoreSlim(ConcurrentTasks);
var tasks = new List<Task>();
for (var i = 0; i < MessageCount; i++)
{
var item = i;
var task = Task.Run(async () =>
{
try
{
await semaphore.WaitAsync();
await publisher.Publish(new SpeedTestMessage
{
PublishCount = item
});
}
finally
{
semaphore.Release();
}
});
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
}
internal class MessageCounter : IMessageCounter
{
private readonly int max;
private readonly Action done;
private int count;
public MessageCounter(int max, Action done)
{
this.max = max;
this.done = done;
}
public void Add()
{
// With a prefetchcount > 1 the consumers are running in multiple threads,
// beware of this when using singletons.
if (Interlocked.Increment(ref count) == max)
done();
}
}
}

View File

@ -0,0 +1,23 @@
using Messaging.TapetiExample;
using Tapeti.Annotations;
namespace _05_SpeedTest
{
[MessageController]
[DynamicQueue("tapeti.example.05")]
public class SpeedMessageController
{
private readonly IMessageCounter messageCounter;
public SpeedMessageController(IMessageCounter messageCounter)
{
this.messageCounter = messageCounter;
}
public void HandleSpeedTestMessage(SpeedTestMessage message)
{
messageCounter.Add();
}
}
}

View File

@ -0,0 +1,109 @@
using System;
using System.Threading.Tasks;
using Tapeti;
namespace ExampleLib
{
/// <summary>
/// Callback method for ExampleConsoleApp.Run
/// </summary>
/// <param name="dependencyResolver">A reference to the dependency resolver passed to the ExampleConsoleApp</param>
/// <param name="waitForDone">Await this function to wait for the Done signal</param>
public delegate Task AsyncFunc(IDependencyResolver dependencyResolver, Func<Task> waitForDone);
/// <summary>
/// Since the examples do not run as a service, we need to know when the example has run
/// to completion. This helper injects IExampleState into the container which
/// can be used to signal that it has finished. It also provides the Wait
/// method to wait for this signal.
/// </summary>
public class ExampleConsoleApp
{
private readonly IDependencyContainer dependencyResolver;
private readonly TaskCompletionSource<bool> doneSignal = new TaskCompletionSource<bool>();
/// <param name="dependencyResolver">Uses Tapeti's IDependencyContainer interface so you can easily switch an example to your favourite IoC container</param>
public ExampleConsoleApp(IDependencyContainer dependencyResolver)
{
this.dependencyResolver = dependencyResolver;
dependencyResolver.RegisterDefault<IExampleState>(() => new ExampleState(this));
}
/// <summary>
/// Runs the specified async method and waits for completion. Handles exceptions and waiting
/// for user input when the example application finishes.
/// </summary>
/// <param name="asyncFunc"></param>
public void Run(AsyncFunc asyncFunc)
{
try
{
asyncFunc(dependencyResolver, WaitAsync).Wait();
}
catch (Exception e)
{
Console.WriteLine(UnwrapException(e));
}
finally
{
Console.WriteLine("Press any Enter key to continue...");
Console.ReadLine();
}
}
/// <summary>
/// Returns a Task which completed when IExampleState.Done is called
/// </summary>
public async Task WaitAsync()
{
await doneSignal.Task;
// This is a hack, because the signal is often given in a message handler before the message can be
// acknowledged, causing it to be put back on the queue because the connection is closed.
// This short delay allows consumers to finish. This is not an issue in a proper service application.
await Task.Delay(500);
}
internal Exception UnwrapException(Exception e)
{
while (true)
{
if (!(e is AggregateException aggregateException))
return e;
if (aggregateException.InnerExceptions.Count != 1)
return e;
e = aggregateException.InnerExceptions[0];
}
}
internal void Done()
{
doneSignal.TrySetResult(true);
}
private class ExampleState : IExampleState
{
private readonly ExampleConsoleApp owner;
public ExampleState(ExampleConsoleApp owner)
{
this.owner = owner;
}
public void Done()
{
owner.Done();
}
}
}
}

View File

@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\Tapeti\Tapeti.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,14 @@
namespace ExampleLib
{
/// <summary>
/// Since the examples do not run as a service, this interface provides a way
/// for the implementation to signal that it has finished and the example can be closed.
/// </summary>
public interface IExampleState
{
/// <summary>
/// Signals the Program that the example has finished and the application can be closed.
/// </summary>
void Done();
}
}

View File

@ -0,0 +1,15 @@
using Tapeti.Annotations;
namespace Messaging.TapetiExample
{
[Request(Response = typeof(LoggedInUsersResponseMessage))]
public class LoggedInUsersRequestMessage
{
}
public class LoggedInUsersResponseMessage
{
public int Count { get; set; }
}
}

View File

@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="System.ComponentModel.Annotations" Version="4.5.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Tapeti.Annotations\Tapeti.Annotations.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,13 @@
using System.ComponentModel.DataAnnotations;
namespace Messaging.TapetiExample
{
/// <summary>
/// Example of a simple broadcast message used in the standard publish - subscribe pattern
/// </summary>
public class PublishSubscribeMessage
{
[Required(ErrorMessage = "Don't be impolite, supply a {0}")]
public string Greeting { get; set; }
}
}

View File

@ -0,0 +1,16 @@
using Tapeti.Annotations;
namespace Messaging.TapetiExample
{
[Request(Response = typeof(QuoteResponseMessage))]
public class QuoteRequestMessage
{
public int Amount { get; set; }
}
public class QuoteResponseMessage
{
public string Quote { get; set; }
}
}

View File

@ -0,0 +1,9 @@
using System.ComponentModel.DataAnnotations;
namespace Messaging.TapetiExample
{
public class SpeedTestMessage
{
public int PublishCount { get; set; }
}
}

View File

@ -1,19 +1,58 @@
## Introduction
Tapeti is a wrapper for the RabbitMQ .NET Client designed for long-running microservices. Its main goal is to minimize the amount of messaging code required, and instead focus on the higher-level flow.
## Key features
* Consumers are declared using MVC-style controllers and are registered automatically based on annotations
* Publishing requires only the message class, no transport details such as exchange and routing key
* Flow extension (stateful request - response handling with support for parallel requests)
* No inheritance required
* Graceful recovery in case of connection issues, and in contrast to most libraries not designed for services, during startup as well
* Extensible using middleware
## Show me the code!
Below is a bare minimum message controller from the first example project to get a feel for how messages are handled using Tapeti.
```csharp
/// <summary>
/// Example of a simple broadcast message used in the standard publish - subscribe pattern
/// </summary>
public class PublishSubscribeMessage
{
[Required(ErrorMessage = "Don't be impolite, supply a {0}")]
public string Greeting { get; set; }
}
[MessageController]
[DynamicQueue("tapeti.example.01")]
public class ExampleMessageController
{
public ExampleMessageController() { }
public void HandlePublishSubscribeMessage(PublishSubscribeMessage message)
{
Console.WriteLine("Received message: " + message.Greeting);
}
}
```
More details and examples can be found in the documentation as well as the example projects included with the source.
## Documentation
The documentation for Tapeti is available on Read the Docs:
[Develop branch](http://tapeti.readthedocs.io/en/latest/)<br />
[![Documentation Status](https://readthedocs.org/projects/tapeti/badge/?version=latest)](http://tapeti.readthedocs.io/en/latest/?badge=latest)
[Master branch (stable release)](http://tapeti.readthedocs.io/en/stable/introduction.html)<br />
[![Documentation Status](https://readthedocs.org/projects/tapeti/badge/?version=stable)](http://tapeti.readthedocs.io/en/stable/introduction.html?badge=stable)
[Develop branch](http://tapeti.readthedocs.io/en/latest/introduction.html)<br />
[![Documentation Status](https://readthedocs.org/projects/tapeti/badge/?version=latest)](http://tapeti.readthedocs.io/en/latest/introduction.html?badge=latest)
[Master branch](http://tapeti.readthedocs.io/en/stable/)<br />
[![Documentation Status](https://readthedocs.org/projects/tapeti/badge/?version=stable)](http://tapeti.readthedocs.io/en/stable/?badge=stable)
## Builds
Builds are automatically run using AppVeyor, with the resulting packages being pushed to NuGet.
Master build (stable release)
[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x/branch/master?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti/branch/master)
Latest build
[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti)
Master build
[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x/branch/master?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti/branch/master)

View File

@ -8,11 +8,6 @@ namespace Tapeti.Annotations
/// Binds to an existing durable queue to receive messages. Can be used
/// on an entire MessageController class or on individual methods.
/// </summary>
/// <remarks>
/// At the moment there is no support for creating a durable queue and managing the
/// bindings. The author recommends https://git.x2software.net/pub/RabbitMetaQueue
/// for deploy-time management of durable queues (shameless plug intended).
/// </remarks>
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
[MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)]
public class DurableQueueAttribute : Attribute

View File

@ -30,6 +30,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. */
using System;
// ReSharper disable InheritdocConsiderUsage
#pragma warning disable 1591
// ReSharper disable UnusedMember.Global
@ -96,9 +97,9 @@ namespace JetBrains.Annotations
TargetFlags = targetFlags;
}
public ImplicitUseKindFlags UseKindFlags { get; private set; }
public ImplicitUseKindFlags UseKindFlags { get; }
public ImplicitUseTargetFlags TargetFlags { get; private set; }
public ImplicitUseTargetFlags TargetFlags { get; }
}
/// <summary>
@ -142,7 +143,7 @@ namespace JetBrains.Annotations
/// </summary>
InstantiatedWithFixedConstructorSignature = 4,
/// <summary>Indicates implicit instantiation of a type.</summary>
InstantiatedNoFixedConstructorSignature = 8,
InstantiatedNoFixedConstructorSignature = 8
}
/// <summary>
@ -174,6 +175,6 @@ namespace JetBrains.Annotations
Comment = comment;
}
[CanBeNull] public string Comment { get; private set; }
[CanBeNull] public string Comment { get; }
}
}

View File

@ -7,7 +7,7 @@ namespace Tapeti.Annotations
/// Can be attached to a message class to specify that the receiver of the message must
/// provide a response message of the type specified in the Response attribute. This response
/// must be sent by either returning it from the message handler method or using
/// YieldWithResponse when using Tapeti Flow. These methods will respond directly
/// EndWithResponse when using Tapeti Flow. These methods will respond directly
/// to the queue specified in the reply-to header automatically added by Tapeti.
/// </summary>
[AttributeUsage(AttributeTargets.Class)]

View File

@ -3,6 +3,11 @@
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Version>2.0.0</Version>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702</NoWarn>
</PropertyGroup>
</Project>

View File

@ -6,7 +6,7 @@
<title>Tapeti Annotations</title>
<authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners>
<licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<license type="expression">Unlicense</license>
<projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Annotations.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>

View File

@ -0,0 +1,148 @@
using System;
using Autofac;
using Autofac.Builder;
namespace Tapeti.Autofac
{
/// <inheritdoc />
/// <summary>
/// Dependency resolver and container implementation for Autofac.
/// Since this class needs access to both the ContainerBuilder and the built IContainer,
/// either let AutofacDependencyResolver build the container by calling it's Build method,
/// or set the Container property manually.
/// </summary>
public class AutofacDependencyResolver : IDependencyContainer
{
private ContainerBuilder containerBuilder;
private IContainer container;
/// <summary>
/// The built container. Either set directly, or use the Build method to built the
/// update this reference.
/// </summary>
public IContainer Container
{
get => container;
set
{
container = value;
if (value != null)
containerBuilder = null;
}
}
/// <inheritdoc />
public AutofacDependencyResolver(ContainerBuilder containerBuilder)
{
this.containerBuilder = containerBuilder;
}
/// <summary>
/// Builds the container, updates the Container property and returns the newly built IContainer.
/// </summary>
public IContainer Build(ContainerBuildOptions options = ContainerBuildOptions.None)
{
CheckContainerBuilder();
Container = containerBuilder.Build(options);
return container;
}
/// <inheritdoc />
public T Resolve<T>() where T : class
{
CheckContainer();
return Container.Resolve<T>();
}
/// <inheritdoc />
public object Resolve(Type type)
{
CheckContainer();
return Container.Resolve(type);
}
/// <inheritdoc />
public void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService
{
CheckContainerBuilder();
containerBuilder
.RegisterType<TImplementation>()
.As<TService>()
.PreserveExistingDefaults();
}
/// <inheritdoc />
public void RegisterDefault<TService>(Func<TService> factory) where TService : class
{
CheckContainerBuilder();
containerBuilder
.Register(context => factory())
.As<TService>()
.PreserveExistingDefaults();
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService, TImplementation>() where TService : class where TImplementation : class, TService
{
CheckContainerBuilder();
containerBuilder
.RegisterType<TImplementation>()
.As<TService>()
.SingleInstance()
.PreserveExistingDefaults();
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService>(TService instance) where TService : class
{
CheckContainerBuilder();
containerBuilder
.RegisterInstance(instance)
.As<TService>()
.SingleInstance()
.PreserveExistingDefaults();
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService>(Func<TService> factory) where TService : class
{
CheckContainerBuilder();
containerBuilder
.Register(context => factory())
.As<TService>()
.SingleInstance()
.PreserveExistingDefaults();
}
/// <inheritdoc />
public void RegisterController(Type type)
{
CheckContainerBuilder();
containerBuilder
.RegisterType(type)
.AsSelf();
}
private void CheckContainer()
{
if (container == null)
throw new InvalidOperationException("Container property has not been set yet on AutofacDependencyResolver");
}
private void CheckContainerBuilder()
{
if (containerBuilder == null)
throw new InvalidOperationException("Container property has already been set on AutofacDependencyResolver");
}
}
}

View File

@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Version>2.0.0</Version>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Autofac" Version="4.9.4" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<package >
<metadata>
<id>Tapeti.Autofac</id>
<version>$version$</version>
<title>Tapeti Autofac</title>
<authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners>
<license type="expression">Unlicense</license>
<projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Autofac integration package for Tapeti</description>
<copyright></copyright>
<tags>rabbitmq tapeti autofac</tags>
<dependencies>
<dependency id="Tapeti" version="[$version$]" />
<dependency id="Autofac" version="4.9.4" />
</dependencies>
</metadata>
<files>
<file src="bin\Release\**" target="lib" />
</files>
</package>

View File

@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Version>2.0.0</Version>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Castle.Windsor" Version="5.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<package >
<metadata>
<id>Tapeti.CastleWindsor</id>
<version>$version$</version>
<title>Tapeti Castle Windsor</title>
<authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners>
<license type="expression">Unlicense</license>
<projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Castle.Windsor integration package for Tapeti</description>
<copyright></copyright>
<tags>rabbitmq tapeti castle windsor</tags>
<dependencies>
<dependency id="Tapeti" version="[$version$]" />
<dependency id="Castle.Windsor" version="5.0.0" />
</dependencies>
</metadata>
<files>
<file src="bin\Release\**" target="lib" />
</files>
</package>

View File

@ -0,0 +1,98 @@
using System;
using Castle.MicroKernel.Registration;
using Castle.Windsor;
namespace Tapeti.CastleWindsor
{
/// <inheritdoc />
/// <summary>
/// Dependency resolver and container implementation for Castle Windsor.
/// </summary>
public class WindsorDependencyResolver : IDependencyContainer
{
private readonly IWindsorContainer container;
/// <inheritdoc />
public WindsorDependencyResolver(IWindsorContainer container)
{
this.container = container;
}
/// <inheritdoc />
public T Resolve<T>() where T : class
{
return container.Resolve<T>();
}
/// <inheritdoc />
public object Resolve(Type type)
{
return container.Resolve(type);
}
/// <inheritdoc />
public void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService
{
// No need for anything special to register as default, because "In Windsor first one wins":
// https://github.com/castleproject/Windsor/blob/master/docs/registering-components-one-by-one.md
container.Register(
Component
.For<TService>()
.ImplementedBy<TImplementation>()
);
}
/// <inheritdoc />
public void RegisterDefault<TService>(Func<TService> factory) where TService : class
{
container.Register(
Component
.For<TService>()
.UsingFactoryMethod(() => factory())
);
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService, TImplementation>() where TService : class where TImplementation : class, TService
{
container.Register(
Component
.For<TService>()
.ImplementedBy<TImplementation>()
.LifestyleSingleton()
);
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService>(TService instance) where TService : class
{
container.Register(
Component
.For<TService>()
.Instance(instance)
);
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService>(Func<TService> factory) where TService : class
{
container.Register(
Component
.For<TService>()
.UsingFactoryMethod(() => factory())
.LifestyleSingleton()
);
}
/// <inheritdoc />
public void RegisterController(Type type)
{
container.Register(Component.For(type));
}
}
}

View File

@ -0,0 +1,46 @@
using RabbitMQ.Client;
using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd.Commands
{
public class ExportCommand
{
public IMessageSerializer MessageSerializer { get; set; }
public string QueueName { get; set; }
public bool RemoveMessages { get; set; }
public int? MaxCount { get; set; }
public int Execute(IModel channel)
{
var messageCount = 0;
while (!MaxCount.HasValue || messageCount < MaxCount.Value)
{
var result = channel.BasicGet(QueueName, false);
if (result == null)
// No more messages on the queue
break;
messageCount++;
MessageSerializer.Serialize(new Message
{
DeliveryTag = result.DeliveryTag,
Redelivered = result.Redelivered,
Exchange = result.Exchange,
RoutingKey = result.RoutingKey,
Queue = QueueName,
Properties = result.BasicProperties,
Body = result.Body
});
if (RemoveMessages)
channel.BasicAck(result.DeliveryTag, false);
}
return messageCount;
}
}
}

View File

@ -0,0 +1,29 @@
using RabbitMQ.Client;
using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd.Commands
{
public class ImportCommand
{
public IMessageSerializer MessageSerializer { get; set; }
public bool DirectToQueue { get; set; }
public int Execute(IModel channel)
{
var messageCount = 0;
foreach (var message in MessageSerializer.Deserialize())
{
var exchange = DirectToQueue ? "" : message.Exchange;
var routingKey = DirectToQueue ? message.Queue : message.RoutingKey;
channel.BasicPublish(exchange, routingKey, message.Properties, message.Body);
messageCount++;
}
return messageCount;
}
}
}

View File

@ -0,0 +1,37 @@
using RabbitMQ.Client;
namespace Tapeti.Cmd.Commands
{
public class ShovelCommand
{
public string QueueName { get; set; }
public string TargetQueueName { get; set; }
public bool RemoveMessages { get; set; }
public int? MaxCount { get; set; }
public int Execute(IModel sourceChannel, IModel targetChannel)
{
var messageCount = 0;
while (!MaxCount.HasValue || messageCount < MaxCount.Value)
{
var result = sourceChannel.BasicGet(QueueName, false);
if (result == null)
// No more messages on the queue
break;
targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body);
messageCount++;
if (RemoveMessages)
sourceChannel.BasicAck(result.DeliveryTag, false);
}
return messageCount;
}
}
}

293
Tapeti.Cmd/Program.cs Normal file
View File

@ -0,0 +1,293 @@
using System;
using System.Diagnostics;
using CommandLine;
using RabbitMQ.Client;
using Tapeti.Cmd.Commands;
using Tapeti.Cmd.Serialization;
namespace Tapeti.Cmd
{
public class Program
{
public class CommonOptions
{
[Option('h', "host", HelpText = "Hostname of the RabbitMQ server.", Default = "localhost")]
public string Host { get; set; }
[Option('p', "port", HelpText = "AMQP port of the RabbitMQ server.", Default = 5672)]
public int Port { get; set; }
[Option('v', "virtualhost", HelpText = "Virtual host used for the RabbitMQ connection.", Default = "/")]
public string VirtualHost { get; set; }
[Option('u', "username", HelpText = "Username used to connect to the RabbitMQ server.", Default = "guest")]
public string Username { get; set; }
[Option('p', "password", HelpText = "Password used to connect to the RabbitMQ server.", Default = "guest")]
public string Password { get; set; }
}
public enum SerializationMethod
{
SingleFileJSON,
EasyNetQHosepipe
}
public class MessageSerializerOptions : CommonOptions
{
[Option('s', "serialization", HelpText = "The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe.", Default = SerializationMethod.SingleFileJSON)]
public SerializationMethod SerializationMethod { get; set; }
}
[Verb("export", HelpText = "Fetch messages from a queue and write it to disk.")]
public class ExportOptions : MessageSerializerOptions
{
[Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")]
public string QueueName { get; set; }
[Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")]
public string OutputPath { get; set; }
[Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")]
public bool RemoveMessages { get; set; }
[Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")]
public int? MaxCount { get; set; }
}
[Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")]
public class ImportOptions : MessageSerializerOptions
{
[Option('i', "input", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")]
public string Input { get; set; }
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
public bool PublishToExchange { get; set; }
}
[Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")]
public class ShovelOptions : CommonOptions
{
[Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")]
public string QueueName { get; set; }
[Option('t', "targetqueue", HelpText = "The target queue to publish the messages to. Defaults to the source queue if a different target host, port or virtualhost is specified. Otherwise it must be different from the source queue.")]
public string TargetQueueName { get; set; }
[Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the source queue. If not messages are kept.")]
public bool RemoveMessages { get; set; }
[Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")]
public int? MaxCount { get; set; }
[Option("targethost", HelpText = "Hostname of the target RabbitMQ server. Defaults to the source host. Note that you may still specify a different targetusername for example.")]
public string TargetHost { get; set; }
[Option("targetport", HelpText = "AMQP port of the target RabbitMQ server. Defaults to the source port.")]
public int? TargetPort { get; set; }
[Option("targetvirtualhost", HelpText = "Virtual host used for the target RabbitMQ connection. Defaults to the source virtualhost.")]
public string TargetVirtualHost { get; set; }
[Option("targetusername", HelpText = "Username used to connect to the target RabbitMQ server. Defaults to the source username.")]
public string TargetUsername { get; set; }
[Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")]
public string TargetPassword { get; set; }
}
public static int Main(string[] args)
{
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions>(args)
.MapResult(
(ExportOptions o) => ExecuteVerb(o, RunExport),
(ImportOptions o) => ExecuteVerb(o, RunImport),
(ShovelOptions o) => ExecuteVerb(o, RunShovel),
errs =>
{
if (!Debugger.IsAttached)
return 1;
Console.WriteLine("Press any Enter key to continue...");
Console.ReadLine();
return 1;
}
);
}
private static int ExecuteVerb<T>(T options, Action<T> execute) where T : class
{
try
{
execute(options);
return 0;
}
catch (Exception e)
{
Console.WriteLine(e.Message);
return 1;
}
}
private static IConnection GetConnection(CommonOptions options)
{
var factory = new ConnectionFactory
{
HostName = options.Host,
Port = options.Port,
VirtualHost = options.VirtualHost,
UserName = options.Username,
Password = options.Password
};
return factory.CreateConnection();
}
private static IMessageSerializer GetMessageSerializer(MessageSerializerOptions options, string path)
{
switch (options.SerializationMethod)
{
case SerializationMethod.SingleFileJSON:
return new SingleFileJSONMessageSerializer(path);
case SerializationMethod.EasyNetQHosepipe:
return new EasyNetQMessageSerializer(path);
default:
throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
}
}
private static void RunExport(ExportOptions options)
{
int messageCount;
using (var messageSerializer = GetMessageSerializer(options, options.OutputPath))
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
messageCount = new ExportCommand
{
MessageSerializer = messageSerializer,
QueueName = options.QueueName,
RemoveMessages = options.RemoveMessages,
MaxCount = options.MaxCount
}.Execute(channel);
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported.");
}
private static void RunImport(ImportOptions options)
{
int messageCount;
using (var messageSerializer = GetMessageSerializer(options, options.Input))
using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
messageCount = new ImportCommand
{
MessageSerializer = messageSerializer,
DirectToQueue = !options.PublishToExchange
}.Execute(channel);
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published.");
}
private static void RunShovel(ShovelOptions options)
{
int messageCount;
using (var sourceConnection = GetConnection(options))
using (var sourceChannel = sourceConnection.CreateModel())
{
var shovelCommand = new ShovelCommand
{
QueueName = options.QueueName,
TargetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName,
RemoveMessages = options.RemoveMessages,
MaxCount = options.MaxCount
};
if (RequiresSecondConnection(options))
{
using (var targetConnection = GetTargetConnection(options))
using (var targetChannel = targetConnection.CreateModel())
{
messageCount = shovelCommand.Execute(sourceChannel, targetChannel);
}
}
else
messageCount = shovelCommand.Execute(sourceChannel, sourceChannel);
}
Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled.");
}
private static bool RequiresSecondConnection(ShovelOptions options)
{
if (!string.IsNullOrEmpty(options.TargetHost) && options.TargetHost != options.Host)
return true;
if (options.TargetPort.HasValue && options.TargetPort.Value != options.Port)
return true;
if (!string.IsNullOrEmpty(options.TargetVirtualHost) && options.TargetVirtualHost != options.VirtualHost)
return true;
// All relevant target host parameters are either omitted or the same. This means the queue must be different
// to prevent an infinite loop.
if (string.IsNullOrEmpty(options.TargetQueueName) || options.TargetQueueName == options.QueueName)
throw new ArgumentException("Target queue must be different from the source queue when shoveling within the same (virtual) host");
if (!string.IsNullOrEmpty(options.TargetUsername) && options.TargetUsername != options.Username)
return true;
// ReSharper disable once ConvertIfStatementToReturnStatement
if (!string.IsNullOrEmpty(options.TargetPassword) && options.TargetPassword != options.Password)
return true;
// Everything's the same, we can use the same channel
return false;
}
private static IConnection GetTargetConnection(ShovelOptions options)
{
var factory = new ConnectionFactory
{
HostName = !string.IsNullOrEmpty(options.TargetHost) ? options.TargetHost : options.Host,
Port = options.TargetPort ?? options.Port,
VirtualHost = !string.IsNullOrEmpty(options.TargetVirtualHost) ? options.TargetVirtualHost : options.VirtualHost,
UserName = !string.IsNullOrEmpty(options.TargetUsername) ? options.TargetUsername : options.Username,
Password = !string.IsNullOrEmpty(options.TargetPassword) ? options.TargetPassword : options.Password,
};
return factory.CreateConnection();
}
}
}

View File

@ -0,0 +1,314 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
namespace Tapeti.Cmd.Serialization
{
public class EasyNetQMessageSerializer : IMessageSerializer
{
private static readonly Regex InvalidCharRegex = new Regex(@"[\\\/:\*\?\""\<\>|]", RegexOptions.Compiled);
private readonly string path;
private readonly Lazy<string> writablePath;
private int messageCount;
public EasyNetQMessageSerializer(string path)
{
this.path = path;
writablePath = new Lazy<string>(() =>
{
Directory.CreateDirectory(path);
return path;
});
}
public void Dispose()
{
}
public void Serialize(Message message)
{
var uniqueFileName = SanitiseQueueName(message.Queue) + "." + messageCount;
var bodyPath = Path.Combine(writablePath.Value, uniqueFileName + ".message.txt");
var propertiesPath = Path.Combine(writablePath.Value, uniqueFileName + ".properties.txt");
var infoPath = Path.Combine(writablePath.Value, uniqueFileName + ".info.txt");
var properties = new EasyNetQMessageProperties(message.Properties);
var info = new EasyNetQMessageReceivedInfo(message);
File.WriteAllText(bodyPath, Encoding.UTF8.GetString(message.Body));
File.WriteAllText(propertiesPath, JsonConvert.SerializeObject(properties));
File.WriteAllText(infoPath, JsonConvert.SerializeObject(info));
messageCount++;
}
private static string SanitiseQueueName(string queueName)
{
return InvalidCharRegex.Replace(queueName, "_");
}
public IEnumerable<Message> Deserialize()
{
foreach (var file in Directory.GetFiles(path, "*.*.message.txt"))
{
const string messageTag = ".message.";
var directoryName = Path.GetDirectoryName(file);
var fileName = Path.GetFileName(file);
var propertiesFileName = Path.Combine(directoryName, fileName.Replace(messageTag, ".properties."));
var infoFileName = Path.Combine(directoryName, fileName.Replace(messageTag, ".info."));
var body = File.ReadAllText(file);
var propertiesJson = File.ReadAllText(propertiesFileName);
var properties = JsonConvert.DeserializeObject<EasyNetQMessageProperties>(propertiesJson);
var infoJson = File.ReadAllText(infoFileName);
var info = JsonConvert.DeserializeObject<EasyNetQMessageReceivedInfo>(infoJson);
var message = info.ToMessage();
message.Properties = properties.ToBasicProperties();
message.Body = Encoding.UTF8.GetBytes(body);
yield return message;
}
}
// ReSharper disable MemberCanBePrivate.Local - used by JSON deserialization
// ReSharper disable AutoPropertyCanBeMadeGetOnly.Local
private class EasyNetQMessageProperties
{
// ReSharper disable once MemberCanBePrivate.Local - used by JSON deserialization
public EasyNetQMessageProperties()
{
}
public EasyNetQMessageProperties(IBasicProperties basicProperties) : this()
{
if (basicProperties.IsContentTypePresent()) ContentType = basicProperties.ContentType;
if (basicProperties.IsContentEncodingPresent()) ContentEncoding = basicProperties.ContentEncoding;
if (basicProperties.IsDeliveryModePresent()) DeliveryMode = basicProperties.DeliveryMode;
if (basicProperties.IsPriorityPresent()) Priority = basicProperties.Priority;
if (basicProperties.IsCorrelationIdPresent()) CorrelationId = basicProperties.CorrelationId;
if (basicProperties.IsReplyToPresent()) ReplyTo = basicProperties.ReplyTo;
if (basicProperties.IsExpirationPresent()) Expiration = basicProperties.Expiration;
if (basicProperties.IsMessageIdPresent()) MessageId = basicProperties.MessageId;
if (basicProperties.IsTimestampPresent()) Timestamp = basicProperties.Timestamp.UnixTime;
if (basicProperties.IsTypePresent()) Type = basicProperties.Type;
if (basicProperties.IsUserIdPresent()) UserId = basicProperties.UserId;
if (basicProperties.IsAppIdPresent()) AppId = basicProperties.AppId;
if (basicProperties.IsClusterIdPresent()) ClusterId = basicProperties.ClusterId;
if (!basicProperties.IsHeadersPresent())
return;
foreach (var header in basicProperties.Headers)
Headers.Add(header.Key, (byte[])header.Value);
}
public IBasicProperties ToBasicProperties()
{
var basicProperties = new BasicProperties();
if (ContentTypePresent) basicProperties.ContentType = ContentType;
if (ContentEncodingPresent) basicProperties.ContentEncoding = ContentEncoding;
if (DeliveryModePresent) basicProperties.DeliveryMode = DeliveryMode;
if (PriorityPresent) basicProperties.Priority = Priority;
if (CorrelationIdPresent) basicProperties.CorrelationId = CorrelationId;
if (ReplyToPresent) basicProperties.ReplyTo = ReplyTo;
if (ExpirationPresent) basicProperties.Expiration = Expiration;
if (MessageIdPresent) basicProperties.MessageId = MessageId;
if (TimestampPresent) basicProperties.Timestamp = new AmqpTimestamp(Timestamp);
if (TypePresent) basicProperties.Type = Type;
if (UserIdPresent) basicProperties.UserId = UserId;
if (AppIdPresent) basicProperties.AppId = AppId;
if (ClusterIdPresent) basicProperties.ClusterId = ClusterId;
if (HeadersPresent)
{
basicProperties.Headers = new Dictionary<string, object>(Headers.ToDictionary(p => p.Key, p => (object)p.Value));
}
return basicProperties;
}
private string contentType;
public string ContentType
{
get => contentType;
set { contentType = value; ContentTypePresent = true; }
}
private string contentEncoding;
public string ContentEncoding
{
get => contentEncoding;
set { contentEncoding = value; ContentEncodingPresent = true; }
}
// The original EasyNetQ.Hosepipe defines this as an IDictionary<string, object>. This causes UTF-8 headers
// to be serialized as Base64, and deserialized as string, corrupting the republished message.
// This may cause incompatibilities, but fixes it for dumped Tapeti messages.
private IDictionary<string, byte[]> headers = new Dictionary<string, byte[]>();
public IDictionary<string, byte[]> Headers
{
get => headers;
set { headers = value; HeadersPresent = true; }
}
private byte deliveryMode;
public byte DeliveryMode
{
get => deliveryMode;
set { deliveryMode = value; DeliveryModePresent = true; }
}
private byte priority;
public byte Priority
{
get => priority;
set { priority = value; PriorityPresent = true; }
}
private string correlationId;
public string CorrelationId
{
get => correlationId;
set { correlationId = value; CorrelationIdPresent = true; }
}
private string replyTo;
public string ReplyTo
{
get => replyTo;
set { replyTo = value; ReplyToPresent = true; }
}
private string expiration;
public string Expiration
{
get => expiration;
set { expiration = value; ExpirationPresent = true; }
}
private string messageId;
public string MessageId
{
get => messageId;
set { messageId = value; MessageIdPresent = true; }
}
private long timestamp;
public long Timestamp
{
get => timestamp;
set { timestamp = value; TimestampPresent = true; }
}
private string type;
public string Type
{
get => type;
set { type = value; TypePresent = true; }
}
private string userId;
public string UserId
{
get => userId;
set { userId = value; UserIdPresent = true; }
}
private string appId;
public string AppId
{
get => appId;
set { appId = value; AppIdPresent = true; }
}
private string clusterId;
public string ClusterId
{
get => clusterId;
set { clusterId = value; ClusterIdPresent = true; }
}
public bool ContentTypePresent { get; set; }
public bool ContentEncodingPresent { get; set; }
public bool HeadersPresent { get; set; } = true;
public bool DeliveryModePresent { get; set; }
public bool PriorityPresent { get; set; }
public bool CorrelationIdPresent { get; set; }
public bool ReplyToPresent { get; set; }
public bool ExpirationPresent { get; set; }
public bool MessageIdPresent { get; set; }
public bool TimestampPresent { get; set; }
public bool TypePresent { get; set; }
public bool UserIdPresent { get; set; }
public bool AppIdPresent { get; set; }
public bool ClusterIdPresent { get; set; }
}
private class EasyNetQMessageReceivedInfo
{
public string ConsumerTag { get; set; }
public ulong DeliverTag { get; set; }
public bool Redelivered { get; set; }
public string Exchange { get; set; }
public string RoutingKey { get; set; }
public string Queue { get; set; }
// ReSharper disable once MemberCanBePrivate.Local - used by JSON deserialization
// ReSharper disable once UnusedMember.Local
// ReSharper disable once UnusedMember.Global
public EasyNetQMessageReceivedInfo()
{
}
public EasyNetQMessageReceivedInfo(Message fromMessage)
{
ConsumerTag = "hosepipe";
DeliverTag = fromMessage.DeliveryTag;
Redelivered = fromMessage.Redelivered;
Exchange = fromMessage.Exchange;
RoutingKey = fromMessage.RoutingKey;
Queue = fromMessage.Queue;
}
public Message ToMessage()
{
return new Message
{
//ConsumerTag =
DeliveryTag = DeliverTag,
Redelivered = Redelivered,
Exchange = Exchange,
RoutingKey = RoutingKey,
Queue = Queue
};
}
}
// ReSharper restore AutoPropertyCanBeMadeGetOnly.Local
// ReSharper restore MemberCanBePrivate.Local
}
}

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using RabbitMQ.Client;
namespace Tapeti.Cmd.Serialization
{
public class Message
{
public ulong DeliveryTag;
public bool Redelivered;
public string Exchange;
public string RoutingKey;
public string Queue;
public IBasicProperties Properties;
public byte[] Body;
}
public interface IMessageSerializer : IDisposable
{
void Serialize(Message message);
IEnumerable<Message> Deserialize();
}
}

View File

@ -0,0 +1,234 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
namespace Tapeti.Cmd.Serialization
{
public class SingleFileJSONMessageSerializer : IMessageSerializer
{
private readonly string path;
private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
};
private readonly Lazy<StreamWriter> exportFile;
public SingleFileJSONMessageSerializer(string path)
{
this.path = path;
exportFile = new Lazy<StreamWriter>(() => new StreamWriter(path, false, Encoding.UTF8));
}
public void Serialize(Message message)
{
var serializableMessage = new SerializableMessage(message);
var serialized = JsonConvert.SerializeObject(serializableMessage, SerializerSettings);
exportFile.Value.WriteLine(serialized);
}
public IEnumerable<Message> Deserialize()
{
using (var file = new StreamReader(path))
{
while (!file.EndOfStream)
{
var serialized = file.ReadLine();
if (string.IsNullOrEmpty(serialized))
continue;
var serializableMessage = JsonConvert.DeserializeObject<SerializableMessage>(serialized);
if (serializableMessage == null)
continue;
yield return serializableMessage.ToMessage();
}
}
}
public void Dispose()
{
if (exportFile.IsValueCreated)
exportFile.Value.Dispose();
}
// ReSharper disable MemberCanBePrivate.Local - used for JSON serialization
// ReSharper disable NotAccessedField.Local
// ReSharper disable FieldCanBeMadeReadOnly.Local
private class SerializableMessage
{
public ulong DeliveryTag;
public bool Redelivered;
public string Exchange;
public string RoutingKey;
public string Queue;
// ReSharper disable once FieldCanBeMadeReadOnly.Local - must be settable by JSON deserialization
public SerializableMessageProperties Properties;
public JObject Body;
public byte[] RawBody;
// ReSharper disable once UnusedMember.Global - used by JSON deserialization
// ReSharper disable once UnusedMember.Local
public SerializableMessage()
{
Properties = new SerializableMessageProperties();
}
public SerializableMessage(Message fromMessage)
{
DeliveryTag = fromMessage.DeliveryTag;
Redelivered = fromMessage.Redelivered;
Exchange = fromMessage.Exchange;
RoutingKey = fromMessage.RoutingKey;
Queue = fromMessage.Queue;
Properties = new SerializableMessageProperties(fromMessage.Properties);
// If this is detected as a JSON message, include the object directly in the JSON line so that it is easier
// to read and process in the output file. Otherwise simply include the raw data and let Newtonsoft encode it.
// This does mean the message will be rewritten. If this is an issue, feel free to add a "raw" option to this tool
// that forces the RawBody to be used. It is open-source after all :-).
if (Properties.ContentType == "application/json")
{
try
{
Body = JObject.Parse(Encoding.UTF8.GetString(fromMessage.Body));
RawBody = null;
}
catch
{
// Fall back to using the raw body
Body = null;
RawBody = fromMessage.Body;
}
}
else
{
Body = null;
RawBody = fromMessage.Body;
}
}
public Message ToMessage()
{
return new Message
{
DeliveryTag = DeliveryTag,
Redelivered = Redelivered,
Exchange = Exchange,
RoutingKey = RoutingKey,
Queue = Queue,
Properties = Properties.ToBasicProperties(),
Body = Body != null
? Encoding.UTF8.GetBytes(Body.ToString(Formatting.None))
: RawBody
};
}
}
// IBasicProperties is finicky when it comes to writing it's properties,
// so we need this normalized class to read and write it from and to JSON
private class SerializableMessageProperties
{
public string AppId;
public string ClusterId;
public string ContentEncoding;
public string ContentType;
public string CorrelationId;
public byte? DeliveryMode;
public string Expiration;
public IDictionary<string, string> Headers;
public string MessageId;
public byte? Priority;
public string ReplyTo;
public long? Timestamp;
public string Type;
public string UserId;
public SerializableMessageProperties()
{
}
public SerializableMessageProperties(IBasicProperties fromProperties)
{
AppId = fromProperties.AppId;
ClusterId = fromProperties.ClusterId;
ContentEncoding = fromProperties.ContentEncoding;
ContentType = fromProperties.ContentType;
CorrelationId = fromProperties.CorrelationId;
DeliveryMode = fromProperties.IsDeliveryModePresent() ? (byte?)fromProperties.DeliveryMode : null;
Expiration = fromProperties.Expiration;
MessageId = fromProperties.MessageId;
Priority = fromProperties.IsPriorityPresent() ? (byte?) fromProperties.Priority : null;
ReplyTo = fromProperties.ReplyTo;
Timestamp = fromProperties.IsTimestampPresent() ? (long?)fromProperties.Timestamp.UnixTime : null;
Type = fromProperties.Type;
UserId = fromProperties.UserId;
if (fromProperties.IsHeadersPresent())
{
Headers = new Dictionary<string, string>();
// This assumes header values are UTF-8 encoded strings. This is true for Tapeti.
foreach (var pair in fromProperties.Headers)
Headers.Add(pair.Key, Encoding.UTF8.GetString((byte[])pair.Value));
}
else
Headers = null;
}
public IBasicProperties ToBasicProperties()
{
var properties = new BasicProperties();
if (!string.IsNullOrEmpty(AppId)) properties.AppId = AppId;
if (!string.IsNullOrEmpty(ClusterId)) properties.ClusterId = ClusterId;
if (!string.IsNullOrEmpty(ContentEncoding)) properties.ContentEncoding = ContentEncoding;
if (!string.IsNullOrEmpty(ContentType)) properties.ContentType = ContentType;
if (DeliveryMode.HasValue) properties.DeliveryMode = DeliveryMode.Value;
if (!string.IsNullOrEmpty(Expiration)) properties.Expiration = Expiration;
if (!string.IsNullOrEmpty(MessageId)) properties.MessageId = MessageId;
if (Priority.HasValue) properties.Priority = Priority.Value;
if (!string.IsNullOrEmpty(ReplyTo)) properties.ReplyTo = ReplyTo;
if (Timestamp.HasValue) properties.Timestamp = new AmqpTimestamp(Timestamp.Value);
if (!string.IsNullOrEmpty(Type)) properties.Type = Type;
if (!string.IsNullOrEmpty(UserId)) properties.UserId = UserId;
// ReSharper disable once InvertIf
if (Headers != null)
{
properties.Headers = new Dictionary<string, object>();
foreach (var pair in Headers)
properties.Headers.Add(pair.Key, Encoding.UTF8.GetBytes(pair.Value));
}
return properties;
}
}
// ReSharper restore FieldCanBeMadeReadOnly.Local
// ReSharper restore NotAccessedField.Local
// ReSharper restore MemberCanBePrivate.Local
}
}

View File

@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework>
<Version>2.0.0</Version>
<Authors>Mark van Renswoude</Authors>
<Company>Mark van Renswoude</Company>
<Product>Tapeti Command-line Utility</Product>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="CommandLineParser" Version="2.6.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="RabbitMQ.Client" Version="5.1.2" />
</ItemGroup>
</Project>

View File

@ -0,0 +1 @@
dotnet publish -c Release -r win-x64 --self-contained false

View File

@ -1,9 +1,11 @@
using System;
using System.ComponentModel.DataAnnotations;
using System.Globalization;
// ReSharper disable UnusedMember.Global
namespace Tapeti.DataAnnotations.Extensions
{
/// <inheritdoc />
/// <summary>
/// Can be used on Guid fields which are supposed to be Required, as the Required attribute does
/// not work for Guids and making them Nullable is counter-intuitive.
@ -13,10 +15,12 @@ namespace Tapeti.DataAnnotations.Extensions
private const string DefaultErrorMessage = "'{0}' does not contain a valid guid";
private const string InvalidTypeErrorMessage = "'{0}' is not of type Guid";
/// <inheritdoc />
public RequiredGuidAttribute() : base(DefaultErrorMessage)
{
}
/// <inheritdoc />
protected override ValidationResult IsValid(object value, ValidationContext validationContext)
{
if (value == null)

View File

@ -2,6 +2,12 @@
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Version>2.0.0</Version>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702</NoWarn>
</PropertyGroup>
<ItemGroup>

View File

@ -6,7 +6,7 @@
<title>Tapeti DataAnnotations Extensions</title>
<authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners>
<licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<license type="expression">Unlicense</license>
<projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.DataAnnotations.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>

View File

@ -1,10 +1,19 @@
namespace Tapeti.DataAnnotations
using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
/// <summary>
/// Extends ITapetiConfigBuilder to enable DataAnnotations.
/// </summary>
public static class ConfigExtensions
{
public static TapetiConfig WithDataAnnotations(this TapetiConfig config)
/// <summary>
/// Enables the DataAnnotations validation middleware.
/// </summary>
/// <param name="config"></param>
public static ITapetiConfigBuilder WithDataAnnotations(this ITapetiConfigBuilder config)
{
config.Use(new DataAnnotationsMiddleware());
config.Use(new DataAnnotationsExtension());
return config;
}
}

View File

@ -3,12 +3,18 @@ using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
public class DataAnnotationsMiddleware : ITapetiExtension
/// <inheritdoc />
/// <summary>
/// Provides the DataAnnotations validation middleware.
/// </summary>
public class DataAnnotationsExtension : ITapetiExtension
{
/// <inheritdoc />
public void RegisterDefaults(IDependencyContainer container)
{
}
/// <inheritdoc />
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
{
return new object[]

View File

@ -5,14 +5,19 @@ using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
public class DataAnnotationsMessageMiddleware : IMessageMiddleware
/// <inheritdoc />
/// <summary>
/// Validates consumed messages using System.ComponentModel.DataAnnotations
/// </summary>
internal class DataAnnotationsMessageMiddleware : IMessageMiddleware
{
public Task Handle(IMessageContext context, Func<Task> next)
/// <inheritdoc />
public async Task Handle(IMessageContext context, Func<Task> next)
{
var validationContext = new ValidationContext(context.Message);
Validator.ValidateObject(context.Message, validationContext, true);
return next();
await next();
}
}
}

View File

@ -5,14 +5,19 @@ using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
public class DataAnnotationsPublishMiddleware : IPublishMiddleware
/// <inheritdoc />
/// <summary>
/// Validates published messages using System.ComponentModel.DataAnnotations
/// </summary>
internal class DataAnnotationsPublishMiddleware : IPublishMiddleware
{
public Task Handle(IPublishContext context, Func<Task> next)
/// <inheritdoc />
public async Task Handle(IPublishContext context, Func<Task> next)
{
var validationContext = new ValidationContext(context.Message);
Validator.ValidateObject(context.Message, validationContext, true);
return next();
await next();
}
}
}

View File

@ -3,6 +3,11 @@
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Version>2.0.0</Version>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702</NoWarn>
</PropertyGroup>
<ItemGroup>

View File

@ -6,7 +6,7 @@
<title>Tapeti DataAnnotations</title>
<authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners>
<licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<license type="expression">Unlicense</license>
<projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.DataAnnotations.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>

View File

@ -5,23 +5,32 @@ using Tapeti.Config;
namespace Tapeti.Flow.SQL
{
/// <summary>
/// Extends ITapetiConfigBuilder to enable Flow SQL.
/// </summary>
public static class ConfigExtensions
{
public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config, string connectionString, string tableName = "Flow")
/// <summary>
/// Enables the Flow SQL repository.
/// </summary>
/// <param name="config"></param>
/// <param name="connectionString"></param>
/// <param name="tableName"></param>
public static ITapetiConfigBuilder WithFlowSqlRepository(this ITapetiConfigBuilder config, string connectionString, string tableName = "Flow")
{
config.Use(new FlowSqlRepositoryBundle(connectionString, tableName));
config.Use(new FlowSqlRepositoryExtension(connectionString, tableName));
return config;
}
}
internal class FlowSqlRepositoryBundle : ITapetiExtension
internal class FlowSqlRepositoryExtension : ITapetiExtension
{
private readonly string connectionString;
private readonly string tableName;
public FlowSqlRepositoryBundle(string connectionString, string tableName)
public FlowSqlRepositoryExtension(string connectionString, string tableName)
{
this.connectionString = connectionString;
this.tableName = tableName;

View File

@ -7,25 +7,27 @@ using Newtonsoft.Json;
namespace Tapeti.Flow.SQL
{
/*
Assumes the following table layout (table name configurable and may include schema):
create table Flow
(
FlowID uniqueidentifier not null,
CreationTime datetime2(3) not null,
StateJson nvarchar(max) null,
constraint PK_Flow primary key clustered (FlowID)
);
*/
/// <inheritdoc />
/// <summary>
/// IFlowRepository implementation for SQL server.
/// </summary>
/// <remarks>
/// Assumes the following table layout (table name configurable and may include schema):
/// create table Flow
/// (
/// FlowID uniqueidentifier not null,
/// CreationTime datetime2(3) not null,
/// StateJson nvarchar(max) null,
/// constraint PK_Flow primary key clustered(FlowID)
/// );
/// </remarks>
public class SqlConnectionFlowRepository : IFlowRepository
{
private readonly string connectionString;
private readonly string tableName;
/// <inheritdoc />
public SqlConnectionFlowRepository(string connectionString, string tableName = "Flow")
{
this.connectionString = connectionString;
@ -33,6 +35,7 @@ namespace Tapeti.Flow.SQL
}
/// <inheritdoc />
public async Task<List<KeyValuePair<Guid, T>>> GetStates<T>()
{
return await SqlRetryHelper.Execute(async () =>
@ -58,6 +61,7 @@ namespace Tapeti.Flow.SQL
});
}
/// <inheritdoc />
public async Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
{
await SqlRetryHelper.Execute(async () =>
@ -81,6 +85,7 @@ namespace Tapeti.Flow.SQL
});
}
/// <inheritdoc />
public async Task UpdateState<T>(Guid flowID, T state)
{
await SqlRetryHelper.Execute(async () =>
@ -100,6 +105,7 @@ namespace Tapeti.Flow.SQL
});
}
/// <inheritdoc />
public async Task DeleteState(Guid flowID)
{
await SqlRetryHelper.Execute(async () =>

View File

@ -3,10 +3,15 @@
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Version>2.0.0</Version>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="System.Data.SqlClient" Version="4.5.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.6.1" />
</ItemGroup>
<ItemGroup>

View File

@ -6,7 +6,7 @@
<title>Tapeti Flow SQL</title>
<authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners>
<licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<license type="expression">Unlicense</license>
<projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.SQL.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>

View File

@ -2,6 +2,11 @@
namespace Tapeti.Flow.Annotations
{
/// <inheritdoc />
/// <summary>
/// Marks a message handler as a response message handler which continues a Tapeti Flow.
/// The method only receives direct messages, and does not create a routing key based binding to the queue.
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
public class ContinuationAttribute : Attribute
{

View File

@ -3,6 +3,11 @@ using JetBrains.Annotations;
namespace Tapeti.Flow.Annotations
{
/// <inheritdoc />
/// <summary>
/// Marks this method as the start of a Tapeti Flow. Use IFlowStarter.Start to begin a new flow and
/// call this method. Must return an IYieldPoint.
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
[MeansImplicitUse]
public class StartAttribute : Attribute

View File

@ -1,10 +1,21 @@
namespace Tapeti.Flow
using Tapeti.Config;
namespace Tapeti.Flow
{
/// <summary>
/// ITapetiConfigBuilder extension for enabling Flow.
/// </summary>
public static class ConfigExtensions
{
public static TapetiConfig WithFlow(this TapetiConfig config, IFlowRepository flowRepository = null)
/// <summary>
/// Enables Tapeti Flow.
/// </summary>
/// <param name="config"></param>
/// <param name="flowRepository">An optional IFlowRepository implementation to persist flow state. If not provided, flow state will be lost when the application restarts.</param>
/// <returns></returns>
public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository flowRepository = null)
{
config.Use(new FlowMiddleware(flowRepository));
config.Use(new FlowExtension(flowRepository));
return config;
}
}

View File

@ -1,7 +1,13 @@
namespace Tapeti.Flow
{
/// <summary>
/// Key names as used in the message context store. For internal use.
/// </summary>
public static class ContextItems
{
/// <summary>
/// Key given to the FlowContext object as stored in the message context.
/// </summary>
public const string FlowContext = "Tapeti.Flow.FlowContext";
}
}

View File

@ -14,9 +14,9 @@ namespace Tapeti.Flow.Default
}
public Task Execute(FlowContext context)
public async Task Execute(FlowContext context)
{
return onExecute(context);
await onExecute(context);
}
}
}

View File

@ -8,16 +8,13 @@ using Tapeti.Helpers;
namespace Tapeti.Flow.Default
{
internal class FlowBindingMiddleware : IBindingMiddleware
internal class FlowBindingMiddleware : IControllerBindingMiddleware
{
public void Handle(IBindingContext context, Action next)
public void Handle(IControllerBindingContext context, Action next)
{
if (context.Method.GetCustomAttribute<StartAttribute>() != null)
return;
if (context.Method.GetCustomAttribute<ContinuationAttribute>() != null)
context.QueueBindingMode = QueueBindingMode.DirectToQueue;
RegisterYieldPointResult(context);
RegisterContinuationFilter(context);
@ -27,14 +24,14 @@ namespace Tapeti.Flow.Default
}
private static void RegisterContinuationFilter(IBindingContext context)
private static void RegisterContinuationFilter(IControllerBindingContext context)
{
var continuationAttribute = context.Method.GetCustomAttribute<ContinuationAttribute>();
if (continuationAttribute == null)
return;
context.Use(new FlowMessageFilterMiddleware());
context.Use(new FlowMessageMiddleware());
context.SetBindingTargetMode(BindingTargetMode.Direct);
context.Use(new FlowContinuationMiddleware());
if (context.Result.HasHandler)
return;
@ -58,7 +55,7 @@ namespace Tapeti.Flow.Default
}
private static void RegisterYieldPointResult(IBindingContext context)
private static void RegisterYieldPointResult(IControllerBindingContext context)
{
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out var isTaskOf))
return;
@ -77,24 +74,24 @@ namespace Tapeti.Flow.Default
}
private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint)
private static Task HandleYieldPoint(IControllerMessageContext context, IYieldPoint yieldPoint)
{
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(context, yieldPoint);
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);
}
private static Task HandleParallelResponse(IMessageContext context)
private static Task HandleParallelResponse(IControllerMessageContext context)
{
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(context, new DelegateYieldPoint(async flowContext =>
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext =>
{
await flowContext.Store();
await flowContext.Store(context.Binding.QueueType == QueueType.Durable);
}));
}
private static void ValidateRequestResponse(IBindingContext context)
private static void ValidateRequestResponse(IControllerBindingContext context)
{
var request = context.MessageClass?.GetCustomAttribute<RequestAttribute>();
if (request?.Response == null)

View File

@ -1,25 +0,0 @@
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Flow.Default
{
public class FlowCleanupMiddleware : ICleanupMiddleware
{
public async Task Handle(IMessageContext context, HandlingResult handlingResult)
{
if (!context.Items.TryGetValue(ContextItems.FlowContext, out var flowContextObj))
return;
var flowContext = (FlowContext)flowContextObj;
if (flowContext?.FlowStateLock != null)
{
if (handlingResult.ConsumeResponse == ConsumeResponse.Nack
|| handlingResult.MessageAction == MessageAction.ErrorLog)
{
await flowContext.FlowStateLock.DeleteFlowState();
}
flowContext.FlowStateLock.Dispose();
}
}
}
}

View File

@ -1,12 +1,11 @@
using System;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Flow.Default
{
internal class FlowContext : IDisposable
{
public IMessageContext MessageContext { get; set; }
public IFlowHandlerContext HandlerContext { get; set; }
public IFlowStateLock FlowStateLock { get; set; }
public FlowState FlowState { get; set; }
@ -17,16 +16,16 @@ namespace Tapeti.Flow.Default
private bool deleteCalled;
public async Task Store()
public async Task Store(bool persistent)
{
storeCalled = true;
if (MessageContext == null) throw new ArgumentNullException(nameof(MessageContext));
if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext));
if (FlowState == null) throw new ArgumentNullException(nameof(FlowState));
if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock));
FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(MessageContext.Controller);
await FlowStateLock.StoreFlowState(FlowState);
FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(HandlerContext.Controller);
await FlowStateLock.StoreFlowState(FlowState, persistent);
}
public async Task Delete()

View File

@ -0,0 +1,130 @@
using System;
using System.Reflection;
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default
{
/// <inheritdoc cref="IControllerMessageMiddleware"/> />
/// <summary>
/// Handles methods marked with the Continuation attribute.
/// </summary>
internal class FlowContinuationMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware
{
public async Task Filter(IControllerMessageContext context, Func<Task> next)
{
var flowContext = await EnrichWithFlowContext(context);
if (flowContext?.ContinuationMetadata == null)
return;
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method))
return;
await next();
}
public async Task Handle(IControllerMessageContext context, Func<Task> next)
{
if (context.Get(ContextItems.FlowContext, out FlowContext flowContext))
{
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller);
// Remove Continuation now because the IYieldPoint result handler will store the new state
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
var converge = flowContext.FlowState.Continuations.Count == 0 &&
flowContext.ContinuationMetadata.ConvergeMethodName != null;
await next();
if (converge)
await CallConvergeMethod(context,
flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync);
}
else
await next();
}
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next)
{
await next();
if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext))
return;
if (flowContext?.FlowStateLock != null)
{
if (consumeResult == ConsumeResult.Error)
await flowContext.FlowStateLock.DeleteFlowState();
flowContext.FlowStateLock.Dispose();
}
}
private static async Task<FlowContext> EnrichWithFlowContext(IControllerMessageContext context)
{
if (context.Get(ContextItems.FlowContext, out FlowContext flowContext))
return flowContext;
if (context.Properties.CorrelationId == null)
return null;
if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID))
return null;
var flowStore = context.Config.DependencyResolver.Resolve<IFlowStore>();
var flowID = await flowStore.FindFlowID(continuationID);
if (!flowID.HasValue)
return null;
var flowStateLock = await flowStore.LockFlowState(flowID.Value);
var flowState = await flowStateLock.GetFlowState();
if (flowState == null)
return null;
flowContext = new FlowContext
{
HandlerContext = new FlowHandlerContext(context),
FlowStateLock = flowStateLock,
FlowState = flowState,
ContinuationID = continuationID,
ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out var continuation) ? continuation : null
};
// IDisposable items in the IMessageContext are automatically disposed
context.Store(ContextItems.FlowContext, flowContext);
return flowContext;
}
private static async Task CallConvergeMethod(IControllerMessageContext context, string methodName, bool sync)
{
IYieldPoint yieldPoint;
var method = context.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
throw new ArgumentException($"Unknown converge method in controller {context.Controller.GetType().Name}: {methodName}");
if (sync)
yieldPoint = (IYieldPoint)method.Invoke(context.Controller, new object[] {});
else
yieldPoint = await (Task<IYieldPoint>)method.Invoke(context.Controller, new object[] { });
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for converge method {methodName}");
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);
}
}
}

View File

@ -0,0 +1,48 @@
using System.Reflection;
using Tapeti.Config;
namespace Tapeti.Flow.Default
{
/// <inheritdoc />
/// <summary>
/// Default implementation for IFlowHandlerContext
/// </summary>
internal class FlowHandlerContext : IFlowHandlerContext
{
/// <inheritdoc />
public FlowHandlerContext()
{
}
/// <inheritdoc />
public FlowHandlerContext(IControllerMessageContext source)
{
if (source == null)
return;
Config = source.Config;
Controller = source.Controller;
Method = source.Binding.Method;
ControllerMessageContext = source;
}
/// <inheritdoc />
public void Dispose()
{
}
/// <inheritdoc />
public ITapetiConfig Config { get; set; }
/// <inheritdoc />
public object Controller { get; set; }
/// <inheritdoc />
public MethodInfo Method { get; set; }
/// <inheritdoc />
public IControllerMessageContext ControllerMessageContext { get; set; }
}
}

View File

@ -1,62 +0,0 @@
using System;
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default
{
public class FlowMessageFilterMiddleware : IMessageFilterMiddleware
{
public async Task Handle(IMessageContext context, Func<Task> next)
{
var flowContext = await GetFlowContext(context);
if (flowContext?.ContinuationMetadata == null)
return;
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method))
return;
await next();
}
private static async Task<FlowContext> GetFlowContext(IMessageContext context)
{
if (context.Items.ContainsKey(ContextItems.FlowContext))
return (FlowContext)context.Items[ContextItems.FlowContext];
if (context.Properties.CorrelationId == null)
return null;
if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID))
return null;
var flowStore = context.DependencyResolver.Resolve<IFlowStore>();
var flowID = await flowStore.FindFlowID(continuationID);
if (!flowID.HasValue)
return null;
var flowStateLock = await flowStore.LockFlowState(flowID.Value);
var flowState = await flowStateLock.GetFlowState();
if (flowState == null)
return null;
var flowContext = new FlowContext
{
MessageContext = context,
FlowStateLock = flowStateLock,
FlowState = flowState,
ContinuationID = continuationID,
ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out var continuation) ? continuation : null
};
// IDisposable items in the IMessageContext are automatically disposed
context.Items.Add(ContextItems.FlowContext, flowContext);
return flowContext;
}
}
}

View File

@ -1,54 +0,0 @@
using System;
using System.Reflection;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Flow.Default
{
public class FlowMessageMiddleware : IMessageMiddleware
{
public async Task Handle(IMessageContext context, Func<Task> next)
{
var flowContext = (FlowContext)context.Items[ContextItems.FlowContext];
if (flowContext != null)
{
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller);
// Remove Continuation now because the IYieldPoint result handler will store the new state
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
var converge = flowContext.FlowState.Continuations.Count == 0 &&
flowContext.ContinuationMetadata.ConvergeMethodName != null;
await next();
if (converge)
await CallConvergeMethod(context,
flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync);
}
else
await next();
}
private static async Task CallConvergeMethod(IMessageContext context, string methodName, bool sync)
{
IYieldPoint yieldPoint;
var method = context.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
throw new ArgumentException($"Unknown converge method in controller {context.Controller.GetType().Name}: {methodName}");
if (sync)
yieldPoint = (IYieldPoint)method.Invoke(context.Controller, new object[] {});
else
yieldPoint = await (Task<IYieldPoint>)method.Invoke(context.Controller, new object[] { });
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for converge method {methodName}");
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(context, yieldPoint);
}
}
}

View File

@ -4,49 +4,59 @@ using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;
using Tapeti.Annotations;
using Tapeti.Config;
using Tapeti.Default;
using Tapeti.Flow.Annotations;
using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default
{
/// <inheritdoc cref="IFlowProvider"/> />
/// <summary>
/// Default implementation for IFlowProvider.
/// </summary>
public class FlowProvider : IFlowProvider, IFlowHandler
{
private readonly IConfig config;
private readonly ITapetiConfig config;
private readonly IInternalPublisher publisher;
public FlowProvider(IConfig config, IPublisher publisher)
/// <inheritdoc />
public FlowProvider(ITapetiConfig config, IPublisher publisher)
{
this.config = config;
this.publisher = (IInternalPublisher)publisher;
}
/// <inheritdoc />
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler)
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
/// <inheritdoc />
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler)
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
/// <inheritdoc />
public IFlowParallelRequestBuilder YieldWithParallelRequest()
{
return new ParallelRequestBuilder(config, SendRequest);
}
/// <inheritdoc />
public IYieldPoint EndWithResponse<TResponse>(TResponse message)
{
return new DelegateYieldPoint(context => SendResponse(context, message));
}
/// <inheritdoc />
public IYieldPoint End()
{
return new DelegateYieldPoint(EndFlow);
@ -72,13 +82,13 @@ namespace Tapeti.Flow.Default
ConvergeMethodSync = convergeMethodTaskSync
});
var properties = new BasicProperties
var properties = new MessageProperties
{
CorrelationId = continuationID.ToString(),
ReplyTo = responseHandlerInfo.ReplyToQueue
};
await context.Store();
await context.Store(responseHandlerInfo.IsDurableQueue);
await publisher.Publish(message, properties, true);
}
@ -87,7 +97,7 @@ namespace Tapeti.Flow.Default
private async Task SendResponse(FlowContext context, object message)
{
var reply = context.FlowState == null
? GetReply(context.MessageContext)
? GetReply(context.HandlerContext)
: context.FlowState.Metadata.Reply;
if (reply == null)
@ -96,12 +106,10 @@ namespace Tapeti.Flow.Default
if (message.GetType().FullName != reply.ResponseTypeName)
throw new YieldPointException($"Flow must end with a response message of type {reply.ResponseTypeName}, {message.GetType().FullName} was returned instead");
var properties = new BasicProperties();
// Only set the property if it's not null, otherwise a string reference exception can occur:
// http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html
if (reply.CorrelationId != null)
properties.CorrelationId = reply.CorrelationId;
var properties = new MessageProperties
{
CorrelationId = reply.CorrelationId
};
// TODO disallow if replyto is not specified?
if (reply.ReplyTo != null)
@ -122,14 +130,17 @@ namespace Tapeti.Flow.Default
}
private static ResponseHandlerInfo GetResponseHandlerInfo(IConfig config, object request, Delegate responseHandler)
private static ResponseHandlerInfo GetResponseHandlerInfo(ITapetiConfig config, object request, Delegate responseHandler)
{
var binding = config.GetBinding(responseHandler);
var requestAttribute = request.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response == null)
throw new ArgumentException($"Request message {request.GetType().Name} must be marked with the Request attribute and a valid Response type", nameof(request));
var binding = config.Bindings.ForMethod(responseHandler);
if (binding == null)
throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler));
var requestAttribute = request.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response != null && !binding.Accept(requestAttribute.Response))
if (!binding.Accept(requestAttribute.Response))
throw new ArgumentException($"responseHandler must accept message of type {requestAttribute.Response}", nameof(responseHandler));
var continuationAttribute = binding.Method.GetCustomAttribute<ContinuationAttribute>();
@ -137,34 +148,35 @@ namespace Tapeti.Flow.Default
throw new ArgumentException("responseHandler must be marked with the Continuation attribute", nameof(responseHandler));
if (binding.QueueName == null)
throw new ArgumentException("responseHandler must bind to a valid queue", nameof(responseHandler));
throw new ArgumentException("responseHandler is not yet subscribed to a queue, TapetiConnection.Subscribe must be called before starting a flow", nameof(responseHandler));
return new ResponseHandlerInfo
{
MethodName = MethodSerializer.Serialize(responseHandler.Method),
ReplyToQueue = binding.QueueName
ReplyToQueue = binding.QueueName,
IsDurableQueue = binding.QueueType == QueueType.Durable
};
}
private static ReplyMetadata GetReply(IMessageContext context)
private static ReplyMetadata GetReply(IFlowHandlerContext context)
{
var requestAttribute = context.Message?.GetType().GetCustomAttribute<RequestAttribute>();
var requestAttribute = context.ControllerMessageContext?.Message?.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response == null)
return null;
return new ReplyMetadata
{
CorrelationId = context.Properties.CorrelationId,
ReplyTo = context.Properties.ReplyTo,
CorrelationId = context.ControllerMessageContext.Properties.CorrelationId,
ReplyTo = context.ControllerMessageContext.Properties.ReplyTo,
ResponseTypeName = requestAttribute.Response.FullName,
Mandatory = context.Properties.Persistent
Mandatory = context.ControllerMessageContext.Properties.Persistent.GetValueOrDefault(true)
};
}
private async Task CreateNewFlowState(FlowContext flowContext)
private static async Task CreateNewFlowState(FlowContext flowContext)
{
var flowStore = flowContext.MessageContext.DependencyResolver.Resolve<IFlowStore>();
var flowStore = flowContext.HandlerContext.Config.DependencyResolver.Resolve<IFlowStore>();
var flowID = Guid.NewGuid();
flowContext.FlowStateLock = await flowStore.LockFlowState(flowID);
@ -176,44 +188,55 @@ namespace Tapeti.Flow.Default
{
Metadata = new FlowMetadata
{
Reply = GetReply(flowContext.MessageContext)
Reply = GetReply(flowContext.HandlerContext)
}
};
}
public async Task Execute(IMessageContext context, IYieldPoint yieldPoint)
/// <inheritdoc />
public async Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint)
{
if (!(yieldPoint is DelegateYieldPoint executableYieldPoint))
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Binding.Method.Name}");
FlowContext flowContext;
if (!context.Items.TryGetValue(ContextItems.FlowContext, out var flowContextItem))
{
flowContext = new FlowContext
{
MessageContext = context
};
context.Items.Add(ContextItems.FlowContext, flowContext);
}
else
flowContext = (FlowContext)flowContextItem;
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Method.Name}");
FlowContext flowContext = null;
var disposeFlowContext = false;
try
{
await executableYieldPoint.Execute(flowContext);
}
catch (YieldPointException e)
{
// Useful for debugging
e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName;
e.Data["Tapeti.Controller.Method"] = context.Binding.Method.Name;
throw;
}
var messageContext = context.ControllerMessageContext;
if (messageContext == null || !messageContext.Get(ContextItems.FlowContext, out flowContext))
{
flowContext = new FlowContext
{
HandlerContext = context
};
flowContext.EnsureStoreOrDeleteIsCalled();
// If we ended up here it is because of a Start. No point in storing the new FlowContext
// in the messageContext as the yield point is the last to execute.
disposeFlowContext = true;
}
try
{
await executableYieldPoint.Execute(flowContext);
}
catch (YieldPointException e)
{
// Useful for debugging
e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName;
e.Data["Tapeti.Controller.Method"] = context.Method.Name;
throw;
}
flowContext.EnsureStoreOrDeleteIsCalled();
}
finally
{
if (disposeFlowContext)
flowContext.Dispose();
}
}
@ -234,12 +257,12 @@ namespace Tapeti.Flow.Default
}
private readonly IConfig config;
private readonly ITapetiConfig config;
private readonly SendRequestFunc sendRequest;
private readonly List<RequestInfo> requests = new List<RequestInfo>();
public ParallelRequestBuilder(IConfig config, SendRequestFunc sendRequest)
public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest)
{
this.config = config;
this.sendRequest = sendRequest;
@ -284,12 +307,15 @@ namespace Tapeti.Flow.Default
private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync)
{
if (requests.Count == 0)
throw new YieldPointException("At least one request must be added before yielding a parallel request");
if (convergeMethod?.Method == null)
throw new ArgumentNullException(nameof(convergeMethod));
return new DelegateYieldPoint(context =>
{
if (convergeMethod.Method.DeclaringType != context.MessageContext.Controller.GetType())
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType())
throw new YieldPointException("Converge method must be in the same controller class");
return Task.WhenAll(requests.Select(requestInfo =>
@ -306,6 +332,7 @@ namespace Tapeti.Flow.Default
{
public string MethodName { get; set; }
public string ReplyToQueue { get; set; }
public bool IsDurableQueue { get; set; }
}
}
}

View File

@ -3,42 +3,47 @@ using System.Linq.Expressions;
using System.Reflection;
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Default;
namespace Tapeti.Flow.Default
{
public class FlowStarter : IFlowStarter
/// <inheritdoc />
/// <summary>
/// Default implementation for IFlowStarter.
/// </summary>
internal class FlowStarter : IFlowStarter
{
private readonly IConfig config;
private readonly ILogger logger;
private readonly ITapetiConfig config;
public FlowStarter(IConfig config, ILogger logger)
/// <inheritdoc />
public FlowStarter(ITapetiConfig config)
{
this.config = config;
this.logger = logger;
}
public Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class
/// <inheritdoc />
public async Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class
{
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] { });
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] { });
}
public Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class
/// <inheritdoc />
public async Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class
{
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {});
}
public Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class
/// <inheritdoc />
public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class
{
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] {parameter});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] {parameter});
}
public Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class
/// <inheritdoc />
public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class
{
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {parameter});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {parameter});
}
@ -47,42 +52,15 @@ namespace Tapeti.Flow.Default
var controller = config.DependencyResolver.Resolve<TController>();
var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters));
var context = new MessageContext
var context = new FlowHandlerContext
{
DependencyResolver = config.DependencyResolver,
Controller = controller
Config = config,
Controller = controller,
Method = method
};
var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>();
HandlingResultBuilder handlingResult = new HandlingResultBuilder
{
ConsumeResponse = ConsumeResponse.Nack,
};
try
{
await flowHandler.Execute(context, yieldPoint);
handlingResult.ConsumeResponse = ConsumeResponse.Ack;
}
finally
{
await RunCleanup(context, handlingResult.ToHandlingResult());
}
}
private async Task RunCleanup(MessageContext context, HandlingResult handlingResult)
{
foreach (var handler in config.CleanupMiddleware)
{
try
{
await handler.Handle(context, handlingResult);
}
catch (Exception eCleanup)
{
logger.HandlerException(eCleanup);
}
}
await flowHandler.Execute(context, yieldPoint);
}
@ -98,6 +76,7 @@ namespace Tapeti.Flow.Default
return method;
}
private static MethodInfo GetExpressionMethod<TController, TResult, TParameter>(Expression<Func<TController, Func<TParameter, TResult>>> methodSelector)
{
var callExpression = (methodSelector.Body as UnaryExpression)?.Operand as MethodCallExpression;

View File

@ -4,20 +4,34 @@ using System.Linq;
namespace Tapeti.Flow.Default
{
/// <summary>
/// Represents the state stored for active flows.
/// </summary>
public class FlowState
{
private FlowMetadata metadata;
private Dictionary<Guid, ContinuationMetadata> continuations;
/// <summary>
/// Contains metadata about the flow.
/// </summary>
public FlowMetadata Metadata
{
get => metadata ?? (metadata = new FlowMetadata());
set => metadata = value;
}
/// <summary>
/// Contains the serialized state which is restored when a flow continues.
/// </summary>
public string Data { get; set; }
/// <summary>
/// Contains metadata about continuations awaiting a response.
/// </summary>
public Dictionary<Guid, ContinuationMetadata> Continuations
{
get => continuations ?? (continuations = new Dictionary<Guid, ContinuationMetadata>());
@ -25,6 +39,9 @@ namespace Tapeti.Flow.Default
}
/// <summary>
/// Creates a deep clone of this FlowState.
/// </summary>
public FlowState Clone()
{
return new FlowState {
@ -36,11 +53,20 @@ namespace Tapeti.Flow.Default
}
/// <summary>
/// Contains metadata about the flow.
/// </summary>
public class FlowMetadata
{
/// <summary>
/// Contains information about the expected response for this flow.
/// </summary>
public ReplyMetadata Reply { get; set; }
/// <summary>
/// Creates a deep clone of this FlowMetadata.
/// </summary>
public FlowMetadata Clone()
{
return new FlowMetadata
@ -51,15 +77,36 @@ namespace Tapeti.Flow.Default
}
/// <summary>
/// Contains information about the expected response for this flow.
/// </summary>
public class ReplyMetadata
{
/// <summary>
/// The queue to which the response should be sent.
/// </summary>
public string ReplyTo { get; set; }
/// <summary>
/// The correlation ID included in the original request.
/// </summary>
public string CorrelationId { get; set; }
/// <summary>
/// The expected response message class.
/// </summary>
public string ResponseTypeName { get; set; }
/// <summary>
/// Indicates whether the response should be sent a mandatory.
/// False for requests originating from a dynamic queue.
/// </summary>
public bool Mandatory { get; set; }
/// <summary>
/// Creates a deep clone of this ReplyMetadata.
/// </summary>
public ReplyMetadata Clone()
{
return new ReplyMetadata
@ -73,13 +120,30 @@ namespace Tapeti.Flow.Default
}
/// <summary>
/// Contains metadata about a continuation awaiting a response.
/// </summary>
public class ContinuationMetadata
{
/// <summary>
/// The name of the method which will handle the response.
/// </summary>
public string MethodName { get; set; }
/// <summary>
/// The name of the method which is called when all responses have been processed.
/// </summary>
public string ConvergeMethodName { get; set; }
/// <summary>
/// Determines if the converge method is synchronous or asynchronous.
/// </summary>
public bool ConvergeMethodSync { get; set; }
/// <summary>
/// Creates a deep clone of this ContinuationMetadata.
/// </summary>
public ContinuationMetadata Clone()
{
return new ContinuationMetadata

View File

@ -1,16 +1,31 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default
{
/// <inheritdoc />
/// <summary>
/// Default implementation of IFlowStore.
/// </summary>
public class FlowStore : IFlowStore
{
private readonly ConcurrentDictionary<Guid, FlowState> flowStates = new ConcurrentDictionary<Guid, FlowState>();
private class CachedFlowState
{
public readonly FlowState FlowState;
public readonly bool IsPersistent;
public CachedFlowState(FlowState flowState, bool isPersistent)
{
FlowState = flowState;
IsPersistent = isPersistent;
}
}
private readonly ConcurrentDictionary<Guid, CachedFlowState> flowStates = new ConcurrentDictionary<Guid, CachedFlowState>();
private readonly ConcurrentDictionary<Guid, Guid> continuationLookup = new ConcurrentDictionary<Guid, Guid>();
private readonly LockCollection<Guid> locks = new LockCollection<Guid>(EqualityComparer<Guid>.Default);
@ -19,12 +34,15 @@ namespace Tapeti.Flow.Default
private volatile bool inUse;
private volatile bool loaded;
/// <inheritdoc />
public FlowStore(IFlowRepository repository)
{
this.repository = repository;
}
/// <inheritdoc />
public async Task Load()
{
if (inUse)
@ -37,7 +55,7 @@ namespace Tapeti.Flow.Default
foreach (var flowStateRecord in await repository.GetStates<FlowState>())
{
flowStates.TryAdd(flowStateRecord.Key, flowStateRecord.Value);
flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true));
foreach (var continuation in flowStateRecord.Value.Continuations)
continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key);
@ -47,6 +65,7 @@ namespace Tapeti.Flow.Default
}
/// <inheritdoc />
public Task<Guid?> FindFlowID(Guid continuationID)
{
if (!loaded)
@ -56,6 +75,7 @@ namespace Tapeti.Flow.Default
}
/// <inheritdoc />
public async Task<IFlowStateLock> LockFlowState(Guid flowID)
{
if (!loaded)
@ -67,21 +87,23 @@ namespace Tapeti.Flow.Default
return flowStatelock;
}
private class FlowStateLock : IFlowStateLock
{
private readonly FlowStore owner;
private readonly Guid flowID;
private volatile IDisposable flowLock;
private FlowState flowState;
private CachedFlowState cachedFlowState;
public Guid FlowID { get; }
public FlowStateLock(FlowStore owner, Guid flowID, IDisposable flowLock)
{
this.owner = owner;
this.flowID = flowID;
FlowID = flowID;
this.flowLock = flowLock;
owner.flowStates.TryGetValue(flowID, out flowState);
owner.flowStates.TryGetValue(flowID, out cachedFlowState);
}
public void Dispose()
@ -91,17 +113,15 @@ namespace Tapeti.Flow.Default
l?.Dispose();
}
public Guid FlowID => flowID;
public Task<FlowState> GetFlowState()
{
if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock");
return Task.FromResult(flowState?.Clone());
return Task.FromResult(cachedFlowState.FlowState?.Clone());
}
public async Task StoreFlowState(FlowState newFlowState)
public async Task StoreFlowState(FlowState newFlowState, bool persistent)
{
if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock");
@ -110,30 +130,41 @@ namespace Tapeti.Flow.Default
newFlowState = newFlowState.Clone();
// Update the lookup dictionary for the ContinuationIDs
if (flowState != null)
if (cachedFlowState != null)
{
foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k)))
foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k)))
owner.continuationLookup.TryRemove(removedContinuation, out _);
}
foreach (var addedContinuation in newFlowState.Continuations.Where(c => flowState == null || !flowState.Continuations.ContainsKey(c.Key)))
foreach (var addedContinuation in newFlowState.Continuations.Where(c => cachedFlowState == null || !cachedFlowState.FlowState.Continuations.ContainsKey(c.Key)))
{
owner.continuationLookup.TryAdd(addedContinuation.Key, flowID);
owner.continuationLookup.TryAdd(addedContinuation.Key, FlowID);
}
var isNew = flowState == null;
flowState = newFlowState;
owner.flowStates[flowID] = newFlowState;
var isNew = cachedFlowState == null;
var wasPersistent = cachedFlowState?.IsPersistent ?? false;
// Storing the flowstate in the underlying repository
if (isNew)
cachedFlowState = new CachedFlowState(newFlowState, persistent);
owner.flowStates[FlowID] = cachedFlowState;
if (persistent)
{
var now = DateTime.UtcNow;
await owner.repository.CreateState(flowID, flowState, now);
// Storing the flowstate in the underlying repository
if (isNew)
{
var now = DateTime.UtcNow;
await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, now);
}
else
{
await owner.repository.UpdateState(FlowID, cachedFlowState.FlowState);
}
}
else
else if (wasPersistent)
{
await owner.repository.UpdateState(flowID, flowState);
// We transitioned from a durable queue to a dynamic queue,
// remove the persistent state but keep the in-memory version
await owner.repository.DeleteState(FlowID);
}
}
@ -142,18 +173,16 @@ namespace Tapeti.Flow.Default
if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock");
if (flowState != null)
if (cachedFlowState != null)
{
foreach (var removedContinuation in flowState.Continuations.Keys)
foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys)
owner.continuationLookup.TryRemove(removedContinuation, out _);
owner.flowStates.TryRemove(flowID, out _);
owner.flowStates.TryRemove(FlowID, out var removedFlowState);
cachedFlowState = null;
if (flowState != null)
{
flowState = null;
await owner.repository.DeleteState(flowID);
}
if (removedFlowState.IsPersistent)
await owner.repository.DeleteState(FlowID);
}
}
}

View File

@ -4,6 +4,10 @@ using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
/// <inheritdoc />
/// <summary>
/// Default implementation for IFlowRepository. Does not persist any state, relying on the FlowStore's cache instead.
/// </summary>
public class NonPersistentFlowRepository : IFlowRepository
{
Task<List<KeyValuePair<Guid, T>>> IFlowRepository.GetStates<T>()
@ -11,16 +15,19 @@ namespace Tapeti.Flow.Default
return Task.FromResult(new List<KeyValuePair<Guid, T>>());
}
/// <inheritdoc />
public Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
{
return Task.CompletedTask;
}
/// <inheritdoc />
public Task UpdateState<T>(Guid flowID, T state)
{
return Task.CompletedTask;
}
/// <inheritdoc />
public Task DeleteState(Guid flowID)
{
return Task.CompletedTask;

View File

@ -4,15 +4,21 @@ using Tapeti.Flow.Default;
namespace Tapeti.Flow
{
public class FlowMiddleware : ITapetiExtension
/// <inheritdoc />
/// <summary>
/// Provides the Flow middleware.
/// </summary>
public class FlowExtension : ITapetiExtension
{
private readonly IFlowRepository flowRepository;
public FlowMiddleware(IFlowRepository flowRepository)
/// <inheritdoc />
public FlowExtension(IFlowRepository flowRepository)
{
this.flowRepository = flowRepository;
}
/// <inheritdoc />
public void RegisterDefaults(IDependencyContainer container)
{
container.RegisterDefault<IFlowProvider, FlowProvider>();
@ -22,10 +28,10 @@ namespace Tapeti.Flow
container.RegisterDefaultSingleton<IFlowStore, FlowStore>();
}
/// <inheritdoc />
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
{
yield return new FlowBindingMiddleware();
yield return new FlowCleanupMiddleware();
}
}
}

View File

@ -4,22 +4,30 @@ using System.Threading.Tasks;
namespace Tapeti.Flow.FlowHelpers
{
/// <summary>
/// Implementation of an asynchronous locking mechanism.
/// </summary>
public class LockCollection<T>
{
private readonly Dictionary<T, LockItem> locks;
/// <inheritdoc />
public LockCollection(IEqualityComparer<T> comparer)
{
locks = new Dictionary<T, LockItem>(comparer);
}
/// <summary>
/// Waits for and acquires a lock on the specified key. Dispose the returned value to release the lock.
/// </summary>
/// <param name="key"></param>
public Task<IDisposable> GetLock(T key)
{
// ReSharper disable once InconsistentlySynchronizedField - by design
LockItem nextLi = new LockItem(locks, key);
var nextLi = new LockItem(locks, key);
try
{
bool continueImmediately = false;
var continueImmediately = false;
lock (locks)
{
if (!locks.TryGetValue(key, out var li))
@ -45,6 +53,7 @@ namespace Tapeti.Flow.FlowHelpers
return nextLi.GetTask();
}
private class LockItem : IDisposable
{
internal volatile LockItem Next;
@ -83,7 +92,7 @@ namespace Tapeti.Flow.FlowHelpers
if (li != this)
{
// Something is wrong (comparer is not stable?), but we cannot loose the completions sources
// Something is wrong (comparer is not stable?), but we cannot lose the completions sources
while (li.Next != null)
li = li.Next;
li.Next = Next;

View File

@ -2,8 +2,15 @@
namespace Tapeti.Flow.FlowHelpers
{
/// <summary>
/// Converts a method into a unique string representation.
/// </summary>
public static class MethodSerializer
{
/// <summary>
/// Converts a method into a unique string representation.
/// </summary>
/// <param name="method"></param>
public static string Serialize(MethodInfo method)
{
return method.Name + '@' + method.DeclaringType?.Assembly.GetName().Name + ':' + method.DeclaringType?.FullName;

View File

@ -0,0 +1,37 @@
using System;
using System.Reflection;
using Tapeti.Config;
namespace Tapeti.Flow
{
/// <inheritdoc />
/// <summary>
/// Provides information about the handler for the flow.
/// </summary>
public interface IFlowHandlerContext : IDisposable
{
/// <summary>
/// Provides access to the Tapeti config.
/// </summary>
ITapetiConfig Config { get; }
/// <summary>
/// An instance of the controller which starts or continues the flow.
/// </summary>
object Controller { get; }
/// <summary>
/// Information about the method which starts or continues the flow.
/// </summary>
MethodInfo Method { get; }
/// <summary>
/// Access to the controller message context if this is a continuated flow.
/// Will be null when in a starting flow.
/// </summary>
IControllerMessageContext ControllerMessageContext { get; }
}
}

View File

@ -1,55 +1,167 @@
using System;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Tapeti.Config;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Flow
{
/// <summary>
/// Provides methods to build an IYieldPoint to indicate if and how Flow should continue.
/// </summary>
public interface IFlowProvider
{
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler);
// One does not simply overload methods with Task vs non-Task Funcs. "Ambiguous call".
// Apparantly this is because a return type of a method is not part of its signature,
// according to: http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for synchronous response handlers.
/// </summary>
/// <remarks>
/// The reason why this requires the extra 'Sync' in the name: one does not simply overload methods
/// with Task vs non-Task Funcs. "Ambiguous call". Apparantly this is because a return type
/// of a method is not part of its signature,according to:
/// http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter
/// </remarks>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler);
/// <summary>
/// Create a request builder to publish one or more requests messages. Call Yield on the resulting builder
/// to acquire an IYieldPoint.
/// </summary>
IFlowParallelRequestBuilder YieldWithParallelRequest();
/// <summary>
/// End the flow by publishing the specified response message. Only allowed, and required, when the
/// current flow was started by a message handler for a Request message.
/// </summary>
/// <param name="message"></param>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint EndWithResponse<TResponse>(TResponse message);
/// <summary>
/// End the flow and dispose any state.
/// </summary>
IYieldPoint End();
}
/// <summary>
/// Allows starting a flow outside of a message handler.
/// </summary>
public interface IFlowStarter
{
/// <summary>
/// Starts a new flow.
/// </summary>
/// <param name="methodSelector"></param>
Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class;
/// <summary>
/// Starts a new flow.
/// </summary>
/// <param name="methodSelector"></param>
Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class;
/// <summary>
/// Starts a new flow and passes the parameter to the method.
/// </summary>
/// <param name="methodSelector"></param>
/// <param name="parameter"></param>
Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class;
/// <summary>
/// Starts a new flow and passes the parameter to the method.
/// </summary>
/// <param name="methodSelector"></param>
/// <param name="parameter"></param>
Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class;
}
/// <summary>
/// Internal interface. Do not call directly.
/// </summary>
public interface IFlowHandler
{
Task Execute(IMessageContext context, IYieldPoint yieldPoint);
/// <summary>
/// Executes the YieldPoint for the given message context.
/// </summary>
/// <param name="context"></param>
/// <param name="yieldPoint"></param>
Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint);
}
/// <summary>
/// Builder to publish one or more request messages and continuing the flow when the responses arrive.
/// </summary>
public interface IFlowParallelRequestBuilder
{
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint.
/// It can instead store state in the controller for the continuation passed to the Yield method.
/// Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint.
/// It can instead store state in the controller for the continuation passed to the Yield method.
/// Used for synchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
/// <summary>
/// Constructs an IYieldPoint to continue the flow when responses arrive.
/// The continuation method is called when all responses have arrived.
/// Response handlers and the continuation method are guaranteed thread-safe access to the
/// controller and can store state.
/// Used for asynchronous continuation methods.
/// </summary>
/// <param name="continuation"></param>
IYieldPoint Yield(Func<Task<IYieldPoint>> continuation);
/// <summary>
/// Constructs an IYieldPoint to continue the flow when responses arrive.
/// The continuation method is called when all responses have arrived.
/// Response handlers and the continuation method are guaranteed thread-safe access to the
/// controller and can store state.
/// Used for synchronous continuation methods.
/// </summary>
/// <param name="continuation"></param>
IYieldPoint YieldSync(Func<IYieldPoint> continuation);
}
/// <summary>
/// Defines if and how the Flow should continue. Construct using any of the IFlowProvider methods.
/// </summary>
public interface IYieldPoint
{
}

View File

@ -4,11 +4,39 @@ using System.Threading.Tasks;
namespace Tapeti.Flow
{
/// <summary>
/// Provides persistency for flow states.
/// </summary>
public interface IFlowRepository
{
/// <summary>
/// Load the previously persisted flow states.
/// </summary>
/// <returns>A list of flow states, where the key is the unique Flow ID and the value is the deserialized T.</returns>
Task<List<KeyValuePair<Guid, T>>> GetStates<T>();
/// <summary>
/// Stores a new flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary>
/// <param name="flowID"></param>
/// <param name="state"></param>
/// <param name="timestamp"></param>
/// <returns></returns>
Task CreateState<T>(Guid flowID, T state, DateTime timestamp);
/// <summary>
/// Updates an existing flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary>
/// <param name="flowID"></param>
/// <param name="state"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Task UpdateState<T>(Guid flowID, T state);
/// <summary>
/// Delete a flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary>
/// <param name="flowID"></param>
Task DeleteState(Guid flowID);
}
}

View File

@ -6,19 +6,58 @@ using Tapeti.Flow.Default;
namespace Tapeti.Flow
{
/// <summary>
/// Provides a way to store and load flow state.
/// </summary>
public interface IFlowStore
{
/// <summary>
/// Must be called during application startup before subscribing or starting a flow.
/// If using an IFlowRepository that requires an update (such as creating tables) make
/// sure it is called before calling Load.
/// </summary>
Task Load();
/// <summary>
/// Looks up the FlowID corresponding to a ContinuationID. For internal use.
/// </summary>
/// <param name="continuationID"></param>
Task<Guid?> FindFlowID(Guid continuationID);
/// <summary>
/// Acquires a lock on the flow with the specified FlowID.
/// </summary>
/// <param name="flowID"></param>
Task<IFlowStateLock> LockFlowState(Guid flowID);
}
/// <inheritdoc />
/// <summary>
/// Represents a lock on the flow state, to provide thread safety.
/// </summary>
public interface IFlowStateLock : IDisposable
{
/// <summary>
/// The unique ID of the flow state.
/// </summary>
Guid FlowID { get; }
/// <summary>
/// Acquires a copy of the flow state.
/// </summary>
Task<FlowState> GetFlowState();
Task StoreFlowState(FlowState flowState);
/// <summary>
/// Stores the new flow state.
/// </summary>
/// <param name="flowState"></param>
/// <param name="persistent"></param>
Task StoreFlowState(FlowState flowState, bool persistent);
/// <summary>
/// Disposes of the flow state corresponding to this Flow ID.
/// </summary>
Task DeleteFlowState();
}
}

View File

@ -38,6 +38,7 @@ using System;
// ReSharper disable IntroduceOptionalParameters.Global
// ReSharper disable MemberCanBeProtected.Global
// ReSharper disable InconsistentNaming
// ReSharper disable InheritdocConsiderUsage
// ReSharper disable once CheckNamespace
namespace JetBrains.Annotations
@ -96,9 +97,9 @@ namespace JetBrains.Annotations
TargetFlags = targetFlags;
}
public ImplicitUseKindFlags UseKindFlags { get; private set; }
public ImplicitUseKindFlags UseKindFlags { get; }
public ImplicitUseTargetFlags TargetFlags { get; private set; }
public ImplicitUseTargetFlags TargetFlags { get; }
}
/// <summary>
@ -142,7 +143,7 @@ namespace JetBrains.Annotations
/// </summary>
InstantiatedWithFixedConstructorSignature = 4,
/// <summary>Indicates implicit instantiation of a type.</summary>
InstantiatedNoFixedConstructorSignature = 8,
InstantiatedNoFixedConstructorSignature = 8
}
/// <summary>
@ -174,6 +175,6 @@ namespace JetBrains.Annotations
Comment = comment;
}
[CanBeNull] public string Comment { get; private set; }
[CanBeNull] public string Comment { get; }
}
}

View File

@ -2,8 +2,13 @@
namespace Tapeti.Flow
{
/// <inheritdoc />
/// <summary>
/// Raised when a response is expected to end a flow, but none was provided.
/// </summary>
public class ResponseExpectedException : Exception
{
/// <inheritdoc />
public ResponseExpectedException(string message) : base(message) { }
}
}

View File

@ -3,6 +3,11 @@
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Version>2.0.0</Version>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702</NoWarn>
</PropertyGroup>
<ItemGroup>

View File

@ -6,7 +6,7 @@
<title>Tapeti Flow</title>
<authors>Menno van Lavieren, Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners>
<licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<license type="expression">Unlicense</license>
<projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>

View File

@ -2,8 +2,13 @@
namespace Tapeti.Flow
{
/// <inheritdoc />
/// <summary>
/// Raised when an invalid yield point is returned.
/// </summary>
public class YieldPointException : Exception
{
/// <inheritdoc />
public YieldPointException(string message) : base(message) { }
}
}

View File

@ -0,0 +1,89 @@
using System;
using System.Linq;
using Ninject;
namespace Tapeti.Ninject
{
/// <inheritdoc />
/// <summary>
/// Dependency resolver and container implementation for Ninject.
/// </summary>
public class NinjectDependencyResolver : IDependencyContainer
{
private readonly IKernel kernel;
/// <inheritdoc />
public NinjectDependencyResolver(IKernel kernel)
{
this.kernel = kernel;
}
/// <inheritdoc />
public T Resolve<T>() where T : class
{
return kernel.Get<T>();
}
/// <inheritdoc />
public object Resolve(Type type)
{
return kernel.Get(type);
}
/// <inheritdoc />
public void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService
{
if (kernel.GetBindings(typeof(TService)).Any())
return;
kernel.Bind<TService>().To<TImplementation>();
}
/// <inheritdoc />
public void RegisterDefault<TService>(Func<TService> factory) where TService : class
{
if (kernel.GetBindings(typeof(TService)).Any())
return;
kernel.Bind<TService>().ToMethod(context => factory());
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService, TImplementation>() where TService : class where TImplementation : class, TService
{
if (kernel.GetBindings(typeof(TService)).Any())
return;
kernel.Bind<TService>().To<TImplementation>().InSingletonScope();
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService>(TService instance) where TService : class
{
if (kernel.GetBindings(typeof(TService)).Any())
return;
kernel.Bind<TService>().ToConstant(instance);
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService>(Func<TService> factory) where TService : class
{
if (kernel.GetBindings(typeof(TService)).Any())
return;
kernel.Bind<TService>().ToMethod(context => factory()).InSingletonScope();
}
/// <inheritdoc />
public void RegisterController(Type type)
{
kernel.Bind(type).ToSelf();
}
}
}

View File

@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Version>2.0.0</Version>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Ninject" Version="3.3.4" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<package >
<metadata>
<id>Tapeti.Ninject</id>
<version>$version$</version>
<title>Tapeti Ninject</title>
<authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners>
<license type="expression">Unlicense</license>
<projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Ninject integration package for Tapeti</description>
<copyright></copyright>
<tags>rabbitmq tapeti ninject</tags>
<dependencies>
<dependency id="Tapeti" version="[$version$]" />
<dependency id="Ninject" version="3.3.4" />
</dependencies>
</metadata>
<files>
<file src="bin\Release\**" target="lib" />
</files>
</package>

View File

@ -3,10 +3,15 @@
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Version>2.0.0</Version>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Serilog" Version="2.7.1" />
<PackageReference Include="Serilog" Version="2.8.0" />
</ItemGroup>
<ItemGroup>

View File

@ -6,7 +6,7 @@
<title>Tapeti Serilog</title>
<authors>Hans Mulder</authors>
<owners>Hans Mulder</owners>
<licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<license type="expression">Unlicense</license>
<projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Serilog.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>

View File

@ -1,46 +1,94 @@
using System;
using ISeriLogger = Serilog.ILogger;
using Tapeti.Config;
using ISerilogLogger = Serilog.ILogger;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Serilog
{
/// <inheritdoc />
/// <summary>
/// Implements the Tapeti ILogger interface for Serilog output.
/// </summary>
public class TapetiSeriLogger: ILogger
{
private readonly ISeriLogger seriLogger;
private readonly ISerilogLogger seriLogger;
public TapetiSeriLogger(ISeriLogger seriLogger)
/// <inheritdoc />
public TapetiSeriLogger(ISerilogLogger seriLogger)
{
this.seriLogger = seriLogger;
}
public void Connect(TapetiConnectionParams connectionParams)
/// <inheritdoc />
public void Connect(IConnectContext connectContext)
{
seriLogger.Information("Tapeti: trying to connect to {host}:{port}/{virtualHost}",
connectionParams.HostName,
connectionParams.Port,
connectionParams.VirtualHost);
seriLogger
.ForContext("isReconnect", connectContext.IsReconnect)
.Information("Tapeti: trying to connect to {host}:{port}/{virtualHost}",
connectContext.ConnectionParams.HostName,
connectContext.ConnectionParams.Port,
connectContext.ConnectionParams.VirtualHost);
}
public void ConnectFailed(TapetiConnectionParams connectionParams, Exception exception)
/// <inheritdoc />
public void ConnectFailed(IConnectFailedContext connectContext)
{
seriLogger.Error(exception, "Tapeti: could not connect to {host}:{port}/{virtualHost}",
connectionParams.HostName,
connectionParams.Port,
connectionParams.VirtualHost);
seriLogger.Error(connectContext.Exception, "Tapeti: could not connect to {host}:{port}/{virtualHost}",
connectContext.ConnectionParams.HostName,
connectContext.ConnectionParams.Port,
connectContext.ConnectionParams.VirtualHost);
}
public void ConnectSuccess(TapetiConnectionParams connectionParams)
/// <inheritdoc />
public void ConnectSuccess(IConnectSuccessContext connectContext)
{
seriLogger.Information("Tapeti: successfully connected to {host}:{port}/{virtualHost}",
connectionParams.HostName,
connectionParams.Port,
connectionParams.VirtualHost);
seriLogger
.ForContext("isReconnect", connectContext.IsReconnect)
.Information("Tapeti: successfully connected to {host}:{port}/{virtualHost} on local port {localPort}",
connectContext.ConnectionParams.HostName,
connectContext.ConnectionParams.Port,
connectContext.ConnectionParams.VirtualHost,
connectContext.LocalPort);
}
public void HandlerException(Exception e)
/// <inheritdoc />
public void Disconnect(IDisconnectContext disconnectContext)
{
seriLogger.Error(e, "Tapeti: exception in message handler");
seriLogger
.Information("Tapeti: connection closed, reply text = {replyText}, reply code = {replyCode}",
disconnectContext.ReplyText,
disconnectContext.ReplyCode);
}
/// <inheritdoc />
public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
var contextLogger = seriLogger
.ForContext("consumeResult", consumeResult)
.ForContext("exchange", messageContext.Exchange)
.ForContext("queue", messageContext.Queue)
.ForContext("routingKey", messageContext.RoutingKey);
if (messageContext is IControllerMessageContext controllerMessageContext)
{
contextLogger = contextLogger
.ForContext("controller", controllerMessageContext.Binding.Controller.FullName)
.ForContext("method", controllerMessageContext.Binding.Method.Name);
}
contextLogger.Error(exception, "Tapeti: exception in message handler");
}
/// <inheritdoc />
public void QueueObsolete(string queueName, bool deleted, uint messageCount)
{
if (deleted)
seriLogger.Information("Tapeti: obsolete queue {queue} has been deleted", queueName);
else
seriLogger.Information("Tapeti: obsolete queue {queue} has been unbound but not yet deleted, {messageCount} messages remaining", queueName, messageCount);
}
}
}

View File

@ -4,12 +4,17 @@ using SimpleInjector;
namespace Tapeti.SimpleInjector
{
/// <inheritdoc />
/// <summary>
/// Dependency resolver and container implementation for SimpleInjector.
/// </summary>
public class SimpleInjectorDependencyResolver : IDependencyContainer
{
private readonly Container container;
private readonly Lifestyle defaultsLifestyle;
private readonly Lifestyle controllersLifestyle;
/// <inheritdoc />
public SimpleInjectorDependencyResolver(Container container, Lifestyle defaultsLifestyle = null, Lifestyle controllersLifestyle = null)
{
this.container = container;
@ -17,17 +22,21 @@ namespace Tapeti.SimpleInjector
this.controllersLifestyle = controllersLifestyle;
}
/// <inheritdoc />
public T Resolve<T>() where T : class
{
return container.GetInstance<T>();
}
/// <inheritdoc />
public object Resolve(Type type)
{
return container.GetInstance(type);
}
/// <inheritdoc />
public void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService
{
if (!CanRegisterDefault<TService>())
@ -39,6 +48,7 @@ namespace Tapeti.SimpleInjector
container.Register<TService, TImplementation>();
}
/// <inheritdoc />
public void RegisterDefault<TService>(Func<TService> factory) where TService : class
{
if (!CanRegisterDefault<TService>())
@ -50,24 +60,29 @@ namespace Tapeti.SimpleInjector
container.Register(factory);
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService, TImplementation>() where TService : class where TImplementation : class, TService
{
if (CanRegisterDefault<TService>())
container.RegisterSingleton<TService, TImplementation>();
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService>(TService instance) where TService : class
{
if (CanRegisterDefault<TService>())
container.RegisterInstance(instance);
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService>(Func<TService> factory) where TService : class
{
if (CanRegisterDefault<TService>())
container.RegisterSingleton(factory);
}
/// <inheritdoc />
public void RegisterController(Type type)
{
if (controllersLifestyle != null)

View File

@ -3,10 +3,15 @@
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Version>2.0.0</Version>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="4.3.0" />
<PackageReference Include="SimpleInjector" Version="4.6.2" />
</ItemGroup>
<ItemGroup>

View File

@ -6,7 +6,7 @@
<title>Tapeti SimpleInjector</title>
<authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners>
<licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<license type="expression">Unlicense</license>
<projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>

Some files were not shown because too many files have changed in this diff Show More