diff --git a/.gitignore b/.gitignore
index ecbc623..ac34bb9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,3 +8,5 @@ packages/
publish/
*.sublime-workspace
docs/_build/
+
+Tapeti.Cmd/Properties/launchSettings.json
diff --git a/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj b/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj
new file mode 100644
index 0000000..a2d0d06
--- /dev/null
+++ b/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj
@@ -0,0 +1,29 @@
+
+
+
+ Exe
+ netcoreapp2.1
+ _01_PublishSubscribe
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Examples/01-PublishSubscribe/ExampleMessageController.cs b/Examples/01-PublishSubscribe/ExampleMessageController.cs
new file mode 100644
index 0000000..6cce0a1
--- /dev/null
+++ b/Examples/01-PublishSubscribe/ExampleMessageController.cs
@@ -0,0 +1,27 @@
+using System;
+using ExampleLib;
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+
+namespace _01_PublishSubscribe
+{
+ [MessageController]
+ [DynamicQueue("tapeti.example.01")]
+ public class ExampleMessageController
+ {
+ private readonly IExampleState exampleState;
+
+
+ public ExampleMessageController(IExampleState exampleState)
+ {
+ this.exampleState = exampleState;
+ }
+
+
+ public void HandlePublishSubscribeMessage(PublishSubscribeMessage message)
+ {
+ Console.WriteLine("Received message: " + message.Greeting);
+ exampleState.Done();
+ }
+ }
+}
diff --git a/Examples/01-PublishSubscribe/ExamplePublisher.cs b/Examples/01-PublishSubscribe/ExamplePublisher.cs
new file mode 100644
index 0000000..4ebf41b
--- /dev/null
+++ b/Examples/01-PublishSubscribe/ExamplePublisher.cs
@@ -0,0 +1,45 @@
+using System;
+using System.ComponentModel.DataAnnotations;
+using System.Threading.Tasks;
+using Messaging.TapetiExample;
+using Tapeti;
+
+namespace _01_PublishSubscribe
+{
+ public class ExamplePublisher
+ {
+ private readonly IPublisher publisher;
+
+ ///
+ /// Shows that the IPublisher is registered in the container by Tapeti
+ ///
+ ///
+ public ExamplePublisher(IPublisher publisher)
+ {
+ this.publisher = publisher;
+ }
+
+
+ public async Task SendTestMessage()
+ {
+ await publisher.Publish(new PublishSubscribeMessage
+ {
+ Greeting = "Hello world of messaging!"
+ });
+
+
+ // Demonstrates what happens when DataAnnotations is enabled
+ // and the message is invalid
+ try
+ {
+ await publisher.Publish(new PublishSubscribeMessage());
+
+ Console.WriteLine("This is not supposed to show. Did you disable the DataAnnotations extension?");
+ }
+ catch (ValidationException e)
+ {
+ Console.WriteLine("As expected, the DataAnnotations check failed: " + e.Message);
+ }
+ }
+ }
+}
diff --git a/Examples/01-PublishSubscribe/Program.cs b/Examples/01-PublishSubscribe/Program.cs
new file mode 100644
index 0000000..a53b65b
--- /dev/null
+++ b/Examples/01-PublishSubscribe/Program.cs
@@ -0,0 +1,154 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Autofac;
+using Castle.MicroKernel.Registration;
+using Castle.Windsor;
+using ExampleLib;
+using Ninject;
+using Tapeti;
+using Tapeti.Autofac;
+using Tapeti.CastleWindsor;
+using Tapeti.DataAnnotations;
+using Tapeti.Default;
+using Tapeti.Ninject;
+using Tapeti.SimpleInjector;
+using Tapeti.UnityContainer;
+using Unity;
+using Container = SimpleInjector.Container;
+
+namespace _01_PublishSubscribe
+{
+ public class Program
+ {
+ public static void Main(string[] args)
+ {
+ var dependencyResolver = GetSimpleInjectorDependencyResolver();
+
+ // or use your IoC container of choice:
+ //var dependencyResolver = GetAutofacDependencyResolver();
+ //var dependencyResolver = GetCastleWindsorDependencyResolver();
+ //var dependencyResolver = GetUnityDependencyResolver();
+ //var dependencyResolver = GetNinjectDependencyResolver();
+
+ // This helper is used because this example is not run as a service. You do not
+ // need it in your own applications.
+ var helper = new ExampleConsoleApp(dependencyResolver);
+ helper.Run(MainAsync);
+ }
+
+
+ internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone)
+ {
+ var config = new TapetiConfig(dependencyResolver)
+ .WithDataAnnotations()
+ .RegisterAllControllers()
+ .Build();
+
+ using (var connection = new TapetiConnection(config)
+ {
+ // Params is optional if you want to use the defaults, but we'll set it
+ // explicitly for this example
+ Params = new TapetiConnectionParams
+ {
+ HostName = "localhost",
+ Username = "guest",
+ Password = "guest",
+
+ // These properties allow you to identify the connection in the RabbitMQ Management interface
+ ClientProperties = new Dictionary
+ {
+ { "example", "01 - Publish Subscribe" }
+ }
+ }
+ })
+ {
+ // IoC containers that separate the builder from the resolver (Autofac) must be built after
+ // creating a TapetConnection, as it modifies the container by injecting IPublisher.
+ (dependencyResolver as AutofacDependencyResolver)?.Build();
+
+
+ // Create the queues and start consuming immediately.
+ // If you need to do some processing before processing messages, but after the
+ // queues have initialized, pass false as the startConsuming parameter and store
+ // the returned ISubscriber. Then call Resume on it later.
+ await connection.Subscribe();
+
+
+ // We could get an IPublisher from the container directly, but since you'll usually use
+ // it as an injected constructor parameter this shows
+ await dependencyResolver.Resolve().SendTestMessage();
+
+
+ // Wait for the controller to signal that the message has been received
+ await waitForDone();
+ }
+ }
+
+
+ internal static IDependencyContainer GetSimpleInjectorDependencyResolver()
+ {
+ var container = new Container();
+
+ container.Register();
+ container.Register();
+
+ return new SimpleInjectorDependencyResolver(container);
+ }
+
+
+ internal static IDependencyContainer GetAutofacDependencyResolver()
+ {
+ var containerBuilder = new ContainerBuilder();
+
+ containerBuilder
+ .RegisterType()
+ .As();
+
+ containerBuilder
+ .RegisterType()
+ .AsSelf();
+
+ return new AutofacDependencyResolver(containerBuilder);
+ }
+
+
+ internal static IDependencyContainer GetCastleWindsorDependencyResolver()
+ {
+ var container = new WindsorContainer();
+
+ // This exact combination is registered by TapetiConfig when running in a console,
+ // and Windsor will throw an exception for that. This is specific to the WindsorDependencyResolver as it
+ // relies on the "first one wins" behaviour of Windsor and does not check the registrations.
+ //
+ // You can of course register another ILogger instead, like DevNullLogger.
+ //container.Register(Component.For().ImplementedBy());
+
+ container.Register(Component.For());
+
+ return new WindsorDependencyResolver(container);
+ }
+
+
+ internal static IDependencyContainer GetUnityDependencyResolver()
+ {
+ var container = new UnityContainer();
+
+ container.RegisterType();
+ container.RegisterType();
+
+ return new UnityDependencyResolver(container);
+ }
+
+
+ internal static IDependencyContainer GetNinjectDependencyResolver()
+ {
+ var kernel = new StandardKernel();
+
+ kernel.Bind().To();
+ kernel.Bind().ToSelf();
+
+ return new NinjectDependencyResolver(kernel);
+ }
+ }
+}
diff --git a/Examples/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj b/Examples/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj
new file mode 100644
index 0000000..c16ddf6
--- /dev/null
+++ b/Examples/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj
@@ -0,0 +1,21 @@
+
+
+
+ Exe
+ netcoreapp2.1
+ _02_DeclareDurableQueues
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Examples/02-DeclareDurableQueues/ExampleMessageController.cs b/Examples/02-DeclareDurableQueues/ExampleMessageController.cs
new file mode 100644
index 0000000..ee5266e
--- /dev/null
+++ b/Examples/02-DeclareDurableQueues/ExampleMessageController.cs
@@ -0,0 +1,28 @@
+using System;
+using ExampleLib;
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+
+namespace _02_DeclareDurableQueues
+{
+ [MessageController]
+ [DurableQueue("tapeti.example.02")]
+ public class ExampleMessageController
+ {
+ private readonly IExampleState exampleState;
+
+
+ public ExampleMessageController(IExampleState exampleState)
+ {
+ this.exampleState = exampleState;
+ }
+
+
+ public void HandlePublishSubscribeMessage(PublishSubscribeMessage message)
+ {
+ // Note that if you run example 01 after 02, it's message will also be in this durable queue
+ Console.WriteLine("Received message: " + message.Greeting);
+ exampleState.Done();
+ }
+ }
+}
diff --git a/Examples/02-DeclareDurableQueues/Program.cs b/Examples/02-DeclareDurableQueues/Program.cs
new file mode 100644
index 0000000..935470a
--- /dev/null
+++ b/Examples/02-DeclareDurableQueues/Program.cs
@@ -0,0 +1,48 @@
+using System;
+using System.Threading.Tasks;
+using ExampleLib;
+using Messaging.TapetiExample;
+using SimpleInjector;
+using Tapeti;
+using Tapeti.Default;
+using Tapeti.SimpleInjector;
+
+namespace _02_DeclareDurableQueues
+{
+ public class Program
+ {
+ public static void Main(string[] args)
+ {
+ var container = new Container();
+ var dependencyResolver = new SimpleInjectorDependencyResolver(container);
+
+ container.Register();
+
+ var helper = new ExampleConsoleApp(dependencyResolver);
+ helper.Run(MainAsync);
+ }
+
+
+ internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone)
+ {
+ var config = new TapetiConfig(dependencyResolver)
+ .RegisterAllControllers()
+ .EnableDeclareDurableQueues()
+ .Build();
+
+ using (var connection = new TapetiConnection(config))
+ {
+ // This creates or updates the durable queue
+ await connection.Subscribe();
+
+ await dependencyResolver.Resolve().Publish(new PublishSubscribeMessage
+ {
+ Greeting = "Hello durable queue!"
+ });
+
+ // Wait for the controller to signal that the message has been received
+ await waitForDone();
+ }
+ }
+ }
+}
diff --git a/Examples/03-FlowRequestResponse/03-FlowRequestResponse.csproj b/Examples/03-FlowRequestResponse/03-FlowRequestResponse.csproj
new file mode 100644
index 0000000..bfa9f7e
--- /dev/null
+++ b/Examples/03-FlowRequestResponse/03-FlowRequestResponse.csproj
@@ -0,0 +1,22 @@
+
+
+
+ Exe
+ netcoreapp2.1
+ _03_FlowRequestResponse
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Examples/03-FlowRequestResponse/ParallelFlowController.cs b/Examples/03-FlowRequestResponse/ParallelFlowController.cs
new file mode 100644
index 0000000..2193302
--- /dev/null
+++ b/Examples/03-FlowRequestResponse/ParallelFlowController.cs
@@ -0,0 +1,73 @@
+using System;
+using ExampleLib;
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+using Tapeti.Flow;
+using Tapeti.Flow.Annotations;
+
+namespace _03_FlowRequestResponse
+{
+ [MessageController]
+ [DynamicQueue("tapeti.example.03")]
+ public class ParallelFlowController
+ {
+ private readonly IFlowProvider flowProvider;
+ private readonly IExampleState exampleState;
+
+ public string FirstQuote;
+ public string SecondQuote;
+
+
+ public ParallelFlowController(IFlowProvider flowProvider, IExampleState exampleState)
+ {
+ this.flowProvider = flowProvider;
+ this.exampleState = exampleState;
+ }
+
+
+ [Start]
+ public IYieldPoint StartFlow()
+ {
+ return flowProvider.YieldWithParallelRequest()
+ .AddRequestSync(
+ new QuoteRequestMessage
+ {
+ Amount = 1
+ },
+ HandleFirstQuoteResponse)
+ .AddRequestSync(
+ new QuoteRequestMessage
+ {
+ Amount = 2
+ },
+ HandleSecondQuoteResponse)
+ .YieldSync(AllQuotesReceived);
+ }
+
+
+ [Continuation]
+ public void HandleFirstQuoteResponse(QuoteResponseMessage message)
+ {
+ Console.WriteLine("[ParallelFlowController] First quote response received");
+ FirstQuote = message.Quote;
+ }
+
+
+ [Continuation]
+ public void HandleSecondQuoteResponse(QuoteResponseMessage message)
+ {
+ Console.WriteLine("[ParallelFlowController] Second quote response received");
+ SecondQuote = message.Quote;
+ }
+
+
+ private IYieldPoint AllQuotesReceived()
+ {
+ Console.WriteLine("[ParallelFlowController] First quote: " + FirstQuote);
+ Console.WriteLine("[ParallelFlowController] Second quote: " + SecondQuote);
+
+ exampleState.Done();
+ return flowProvider.End();
+ }
+ }
+}
diff --git a/Examples/03-FlowRequestResponse/Program.cs b/Examples/03-FlowRequestResponse/Program.cs
new file mode 100644
index 0000000..edac429
--- /dev/null
+++ b/Examples/03-FlowRequestResponse/Program.cs
@@ -0,0 +1,66 @@
+using System;
+using System.Threading.Tasks;
+using ExampleLib;
+using SimpleInjector;
+using Tapeti;
+using Tapeti.DataAnnotations;
+using Tapeti.Default;
+using Tapeti.Flow;
+using Tapeti.SimpleInjector;
+
+namespace _03_FlowRequestResponse
+{
+ public class Program
+ {
+ public static void Main(string[] args)
+ {
+ var container = new Container();
+ var dependencyResolver = new SimpleInjectorDependencyResolver(container);
+
+ container.Register();
+
+ var helper = new ExampleConsoleApp(dependencyResolver);
+ helper.Run(MainAsync);
+ }
+
+
+ internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone)
+ {
+ var config = new TapetiConfig(dependencyResolver)
+ .WithDataAnnotations()
+ .WithFlow()
+ .RegisterAllControllers()
+ .Build();
+
+
+ using (var connection = new TapetiConnection(config))
+ {
+ // Must be called before using any flow. When using a persistent repository like the
+ // SQL server implementation, you can run any required update scripts (for example, using DbUp)
+ // before calling this Load method.
+ // Call after creating the TapetiConnection, as it modifies the container to inject IPublisher.
+ await dependencyResolver.Resolve().Load();
+
+
+ await connection.Subscribe();
+
+
+ var flowStarter = dependencyResolver.Resolve();
+
+ var startData = new SimpleFlowController.StartData
+ {
+ RequestStartTime = DateTime.Now,
+ Amount = 1
+ };
+
+
+ await flowStarter.Start(c => c.StartFlow, startData);
+ await flowStarter.Start(c => c.StartFlow);
+
+
+ // Wait for the controller to signal that the message has been received
+ await waitForDone();
+ }
+ }
+ }
+}
diff --git a/Examples/03-FlowRequestResponse/ReceivingMessageController.cs b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs
new file mode 100644
index 0000000..46a265e
--- /dev/null
+++ b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs
@@ -0,0 +1,42 @@
+using System.Threading.Tasks;
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+
+namespace _03_FlowRequestResponse
+{
+ [MessageController]
+ [DynamicQueue("tapeti.example.03")]
+ public class ReceivingMessageController
+ {
+ // No publisher required, responses can simply be returned
+ public async Task HandleQuoteRequest(QuoteRequestMessage message)
+ {
+ string quote;
+
+ switch (message.Amount)
+ {
+ case 1:
+ // Well, they asked for it... :-)
+ quote = "'";
+ break;
+
+ case 2:
+ quote = "\"";
+ break;
+
+ default:
+ // We have to return a response.
+ quote = null;
+ break;
+ }
+
+ // Just gonna let them wait for a bit, to demonstrate async message handlers
+ await Task.Delay(1000);
+
+ return new QuoteResponseMessage
+ {
+ Quote = quote
+ };
+ }
+ }
+}
diff --git a/Examples/03-FlowRequestResponse/SimpleFlowController.cs b/Examples/03-FlowRequestResponse/SimpleFlowController.cs
new file mode 100644
index 0000000..1726e40
--- /dev/null
+++ b/Examples/03-FlowRequestResponse/SimpleFlowController.cs
@@ -0,0 +1,73 @@
+using System;
+using ExampleLib;
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+using Tapeti.Flow;
+using Tapeti.Flow.Annotations;
+
+namespace _03_FlowRequestResponse
+{
+ [MessageController]
+ [DynamicQueue("tapeti.example.03")]
+ public class SimpleFlowController
+ {
+ private readonly IFlowProvider flowProvider;
+ private readonly IExampleState exampleState;
+
+
+ // Shows how multiple values can be passed to a start method
+ public struct StartData
+ {
+ public DateTime RequestStartTime;
+ public int Amount;
+ }
+
+ // Private and protected fields are lost between method calls because the controller is
+ // recreated when a response arrives. When using a persistent flow repository this may
+ // even be after a restart of the application.
+ private bool nonPersistentState;
+
+
+ // Public fields will be stored.
+ public DateTime RequestStartTime;
+
+
+ public SimpleFlowController(IFlowProvider flowProvider, IExampleState exampleState)
+ {
+ this.flowProvider = flowProvider;
+ this.exampleState = exampleState;
+ }
+
+
+ [Start]
+ public IYieldPoint StartFlow(StartData startData)
+ {
+ nonPersistentState = true;
+ RequestStartTime = startData.RequestStartTime;
+
+ return flowProvider.YieldWithRequestSync(
+ new QuoteRequestMessage
+ {
+ Amount = startData.Amount
+ },
+ HandleQuoteResponse);
+ }
+
+
+ [Continuation]
+ public IYieldPoint HandleQuoteResponse(QuoteResponseMessage message)
+ {
+ if (nonPersistentState)
+ Console.WriteLine("[SimpleFlowController] This is not supposed to show. NonPersistentState should not be retained. Someone please check http://www.hasthelargehadroncolliderdestroyedtheworldyet.com.");
+
+ Console.WriteLine("[SimpleFlowController] Request start: " + RequestStartTime.ToLongTimeString());
+ Console.WriteLine("[SimpleFlowController] Response time: " + DateTime.Now.ToLongTimeString());
+ Console.WriteLine("[SimpleFlowController] Quote: " + message.Quote);
+
+
+ exampleState.Done();
+
+ return flowProvider.End();
+ }
+ }
+}
diff --git a/Examples/04-Transient/04-Transient.csproj b/Examples/04-Transient/04-Transient.csproj
new file mode 100644
index 0000000..bb077b6
--- /dev/null
+++ b/Examples/04-Transient/04-Transient.csproj
@@ -0,0 +1,22 @@
+
+
+
+ Exe
+ netcoreapp2.1
+ _04_Transient
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Examples/04-Transient/Program.cs b/Examples/04-Transient/Program.cs
new file mode 100644
index 0000000..18b84f9
--- /dev/null
+++ b/Examples/04-Transient/Program.cs
@@ -0,0 +1,55 @@
+using System;
+using System.Threading.Tasks;
+using ExampleLib;
+using Messaging.TapetiExample;
+using SimpleInjector;
+using Tapeti;
+using Tapeti.DataAnnotations;
+using Tapeti.Default;
+using Tapeti.SimpleInjector;
+using Tapeti.Transient;
+
+namespace _04_Transient
+{
+ public class Program
+ {
+ public static void Main(string[] args)
+ {
+ var container = new Container();
+ var dependencyResolver = new SimpleInjectorDependencyResolver(container);
+
+ container.Register();
+
+ var helper = new ExampleConsoleApp(dependencyResolver);
+ helper.Run(MainAsync);
+ }
+
+
+ internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone)
+ {
+ var config = new TapetiConfig(dependencyResolver)
+ .WithDataAnnotations()
+ .WithTransient(TimeSpan.FromSeconds(5), "tapeti.example.04.transient")
+ .RegisterAllControllers()
+ .Build();
+
+
+ using (var connection = new TapetiConnection(config))
+ {
+ await connection.Subscribe();
+
+
+ Console.WriteLine("Sending request...");
+
+ var transientPublisher = dependencyResolver.Resolve();
+ var response = await transientPublisher.RequestResponse(
+ new LoggedInUsersRequestMessage());
+
+ Console.WriteLine("Response: " + response.Count);
+
+
+ // Unlike the other example, there is no need to call waitForDone, once we're here the response has been handled.
+ }
+ }
+ }
+}
diff --git a/Examples/04-Transient/UsersMessageController.cs b/Examples/04-Transient/UsersMessageController.cs
new file mode 100644
index 0000000..5565c49
--- /dev/null
+++ b/Examples/04-Transient/UsersMessageController.cs
@@ -0,0 +1,24 @@
+using System;
+using System.Threading.Tasks;
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+
+namespace _04_Transient
+{
+ [MessageController]
+ [DynamicQueue("tapeti.example.04")]
+ public class UsersMessageController
+ {
+ // No publisher required, responses can simply be returned
+ public async Task HandleQuoteRequest(LoggedInUsersRequestMessage message)
+ {
+ // Simulate the response taking some time
+ await Task.Delay(1000);
+
+ return new LoggedInUsersResponseMessage
+ {
+ Count = new Random().Next(0, 100)
+ };
+ }
+ }
+}
diff --git a/Examples/05-SpeedTest/05-SpeedTest.csproj b/Examples/05-SpeedTest/05-SpeedTest.csproj
new file mode 100644
index 0000000..f85333c
--- /dev/null
+++ b/Examples/05-SpeedTest/05-SpeedTest.csproj
@@ -0,0 +1,19 @@
+
+
+
+ Exe
+ netcoreapp2.1
+ _05_SpeedTest
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Examples/05-SpeedTest/IMessageCounter.cs b/Examples/05-SpeedTest/IMessageCounter.cs
new file mode 100644
index 0000000..e5e9aaf
--- /dev/null
+++ b/Examples/05-SpeedTest/IMessageCounter.cs
@@ -0,0 +1,7 @@
+namespace _05_SpeedTest
+{
+ public interface IMessageCounter
+ {
+ void Add();
+ }
+}
diff --git a/Examples/05-SpeedTest/Program.cs b/Examples/05-SpeedTest/Program.cs
new file mode 100644
index 0000000..6c399d7
--- /dev/null
+++ b/Examples/05-SpeedTest/Program.cs
@@ -0,0 +1,140 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+using ExampleLib;
+using Messaging.TapetiExample;
+using SimpleInjector;
+using Tapeti;
+using Tapeti.Default;
+using Tapeti.SimpleInjector;
+
+namespace _05_SpeedTest
+{
+ public class Program
+ {
+ private const int MessageCount = 20000;
+
+ // This does not make a massive difference, since internally Tapeti uses a single thread
+ // to perform all channel operations as recommended by the RabbitMQ .NET client library.
+ private const int ConcurrentTasks = 20;
+
+
+ public static void Main(string[] args)
+ {
+ var container = new Container();
+ var dependencyResolver = new SimpleInjectorDependencyResolver(container);
+
+ container.Register();
+
+ var helper = new ExampleConsoleApp(dependencyResolver);
+ helper.Run(MainAsync);
+ }
+
+
+ internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone)
+ {
+ var container = (IDependencyContainer)dependencyResolver;
+ container.RegisterDefaultSingleton(new MessageCounter(MessageCount, () =>
+ {
+ var exampleState = dependencyResolver.Resolve();
+ exampleState.Done();
+ }));
+
+
+
+ var config = new TapetiConfig(dependencyResolver)
+ // On a developer test machine, this makes the difference between 2200 messages/sec and 3000 messages/sec published.
+ // Interesting, but only if speed is more important than guaranteed delivery.
+ //.DisablePublisherConfirms()
+ .RegisterAllControllers()
+ .Build();
+
+
+ using (var connection = new TapetiConnection(config))
+ {
+ var subscriber = await connection.Subscribe(false);
+
+
+ var publisher = dependencyResolver.Resolve();
+ Console.WriteLine($"Publishing {MessageCount} messages...");
+
+ var stopwatch = new Stopwatch();
+ stopwatch.Start();
+
+ await PublishMessages(publisher);
+
+ stopwatch.Stop();
+ Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
+
+
+
+ Console.WriteLine("Consuming messages...");
+ await subscriber.Resume();
+
+ stopwatch.Restart();
+
+ await waitForDone();
+
+ stopwatch.Stop();
+ Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
+ }
+ }
+
+
+ internal static async Task PublishMessages(IPublisher publisher)
+ {
+ var semaphore = new SemaphoreSlim(ConcurrentTasks);
+ var tasks = new List();
+
+ for (var i = 0; i < MessageCount; i++)
+ {
+ var item = i;
+ var task = Task.Run(async () =>
+ {
+ try
+ {
+ await semaphore.WaitAsync();
+ await publisher.Publish(new SpeedTestMessage
+ {
+ PublishCount = item
+ });
+ }
+ finally
+ {
+ semaphore.Release();
+ }
+ });
+
+ tasks.Add(task);
+ }
+
+ await Task.WhenAll(tasks);
+ }
+ }
+
+
+ internal class MessageCounter : IMessageCounter
+ {
+ private readonly int max;
+ private readonly Action done;
+ private int count;
+
+
+ public MessageCounter(int max, Action done)
+ {
+ this.max = max;
+ this.done = done;
+ }
+
+
+ public void Add()
+ {
+ // With a prefetchcount > 1 the consumers are running in multiple threads,
+ // beware of this when using singletons.
+ if (Interlocked.Increment(ref count) == max)
+ done();
+ }
+ }
+}
diff --git a/Examples/05-SpeedTest/SpeedMessageController.cs b/Examples/05-SpeedTest/SpeedMessageController.cs
new file mode 100644
index 0000000..83af176
--- /dev/null
+++ b/Examples/05-SpeedTest/SpeedMessageController.cs
@@ -0,0 +1,23 @@
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+
+namespace _05_SpeedTest
+{
+ [MessageController]
+ [DynamicQueue("tapeti.example.05")]
+ public class SpeedMessageController
+ {
+ private readonly IMessageCounter messageCounter;
+
+ public SpeedMessageController(IMessageCounter messageCounter)
+ {
+ this.messageCounter = messageCounter;
+ }
+
+
+ public void HandleSpeedTestMessage(SpeedTestMessage message)
+ {
+ messageCounter.Add();
+ }
+ }
+}
diff --git a/Examples/ExampleLib/ExampleConsoleApp.cs b/Examples/ExampleLib/ExampleConsoleApp.cs
new file mode 100644
index 0000000..f7f4d98
--- /dev/null
+++ b/Examples/ExampleLib/ExampleConsoleApp.cs
@@ -0,0 +1,109 @@
+using System;
+using System.Threading.Tasks;
+using Tapeti;
+
+namespace ExampleLib
+{
+ ///
+ /// Callback method for ExampleConsoleApp.Run
+ ///
+ /// A reference to the dependency resolver passed to the ExampleConsoleApp
+ /// Await this function to wait for the Done signal
+ public delegate Task AsyncFunc(IDependencyResolver dependencyResolver, Func waitForDone);
+
+
+ ///
+ /// Since the examples do not run as a service, we need to know when the example has run
+ /// to completion. This helper injects IExampleState into the container which
+ /// can be used to signal that it has finished. It also provides the Wait
+ /// method to wait for this signal.
+ ///
+ public class ExampleConsoleApp
+ {
+ private readonly IDependencyContainer dependencyResolver;
+ private readonly TaskCompletionSource doneSignal = new TaskCompletionSource();
+
+
+ /// Uses Tapeti's IDependencyContainer interface so you can easily switch an example to your favourite IoC container
+ public ExampleConsoleApp(IDependencyContainer dependencyResolver)
+ {
+ this.dependencyResolver = dependencyResolver;
+ dependencyResolver.RegisterDefault(() => new ExampleState(this));
+ }
+
+
+ ///
+ /// Runs the specified async method and waits for completion. Handles exceptions and waiting
+ /// for user input when the example application finishes.
+ ///
+ ///
+ public void Run(AsyncFunc asyncFunc)
+ {
+ try
+ {
+ asyncFunc(dependencyResolver, WaitAsync).Wait();
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine(UnwrapException(e));
+ }
+ finally
+ {
+ Console.WriteLine("Press any Enter key to continue...");
+ Console.ReadLine();
+ }
+ }
+
+
+ ///
+ /// Returns a Task which completed when IExampleState.Done is called
+ ///
+ public async Task WaitAsync()
+ {
+ await doneSignal.Task;
+
+ // This is a hack, because the signal is often given in a message handler before the message can be
+ // acknowledged, causing it to be put back on the queue because the connection is closed.
+ // This short delay allows consumers to finish. This is not an issue in a proper service application.
+ await Task.Delay(500);
+ }
+
+
+ internal Exception UnwrapException(Exception e)
+ {
+ while (true)
+ {
+ if (!(e is AggregateException aggregateException))
+ return e;
+
+ if (aggregateException.InnerExceptions.Count != 1)
+ return e;
+
+ e = aggregateException.InnerExceptions[0];
+ }
+ }
+
+ internal void Done()
+ {
+ doneSignal.TrySetResult(true);
+ }
+
+
+ private class ExampleState : IExampleState
+ {
+ private readonly ExampleConsoleApp owner;
+
+
+ public ExampleState(ExampleConsoleApp owner)
+ {
+ this.owner = owner;
+ }
+
+
+ public void Done()
+ {
+ owner.Done();
+ }
+ }
+ }
+}
diff --git a/Examples/ExampleLib/ExampleLib.csproj b/Examples/ExampleLib/ExampleLib.csproj
new file mode 100644
index 0000000..d73eb4a
--- /dev/null
+++ b/Examples/ExampleLib/ExampleLib.csproj
@@ -0,0 +1,12 @@
+
+
+
+ netstandard2.0
+ true
+
+
+
+
+
+
+
diff --git a/Examples/ExampleLib/IExampleState.cs b/Examples/ExampleLib/IExampleState.cs
new file mode 100644
index 0000000..8f47b22
--- /dev/null
+++ b/Examples/ExampleLib/IExampleState.cs
@@ -0,0 +1,14 @@
+namespace ExampleLib
+{
+ ///
+ /// Since the examples do not run as a service, this interface provides a way
+ /// for the implementation to signal that it has finished and the example can be closed.
+ ///
+ public interface IExampleState
+ {
+ ///
+ /// Signals the Program that the example has finished and the application can be closed.
+ ///
+ void Done();
+ }
+}
diff --git a/Examples/Messaging.TapetiExample/LoggedInUsersRequestMessage.cs b/Examples/Messaging.TapetiExample/LoggedInUsersRequestMessage.cs
new file mode 100644
index 0000000..5fe285d
--- /dev/null
+++ b/Examples/Messaging.TapetiExample/LoggedInUsersRequestMessage.cs
@@ -0,0 +1,15 @@
+using Tapeti.Annotations;
+
+namespace Messaging.TapetiExample
+{
+ [Request(Response = typeof(LoggedInUsersResponseMessage))]
+ public class LoggedInUsersRequestMessage
+ {
+ }
+
+
+ public class LoggedInUsersResponseMessage
+ {
+ public int Count { get; set; }
+ }
+}
diff --git a/Examples/Messaging.TapetiExample/Messaging.TapetiExample.csproj b/Examples/Messaging.TapetiExample/Messaging.TapetiExample.csproj
new file mode 100644
index 0000000..cc9631d
--- /dev/null
+++ b/Examples/Messaging.TapetiExample/Messaging.TapetiExample.csproj
@@ -0,0 +1,15 @@
+
+
+
+ netstandard2.0
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Examples/Messaging.TapetiExample/PublishSubscribeMessage.cs b/Examples/Messaging.TapetiExample/PublishSubscribeMessage.cs
new file mode 100644
index 0000000..b378698
--- /dev/null
+++ b/Examples/Messaging.TapetiExample/PublishSubscribeMessage.cs
@@ -0,0 +1,13 @@
+using System.ComponentModel.DataAnnotations;
+
+namespace Messaging.TapetiExample
+{
+ ///
+ /// Example of a simple broadcast message used in the standard publish - subscribe pattern
+ ///
+ public class PublishSubscribeMessage
+ {
+ [Required(ErrorMessage = "Don't be impolite, supply a {0}")]
+ public string Greeting { get; set; }
+ }
+}
diff --git a/Examples/Messaging.TapetiExample/QuoteRequestMessage.cs b/Examples/Messaging.TapetiExample/QuoteRequestMessage.cs
new file mode 100644
index 0000000..a67367c
--- /dev/null
+++ b/Examples/Messaging.TapetiExample/QuoteRequestMessage.cs
@@ -0,0 +1,16 @@
+using Tapeti.Annotations;
+
+namespace Messaging.TapetiExample
+{
+ [Request(Response = typeof(QuoteResponseMessage))]
+ public class QuoteRequestMessage
+ {
+ public int Amount { get; set; }
+ }
+
+
+ public class QuoteResponseMessage
+ {
+ public string Quote { get; set; }
+ }
+}
diff --git a/Examples/Messaging.TapetiExample/SpeedTestMessage.cs b/Examples/Messaging.TapetiExample/SpeedTestMessage.cs
new file mode 100644
index 0000000..08e03de
--- /dev/null
+++ b/Examples/Messaging.TapetiExample/SpeedTestMessage.cs
@@ -0,0 +1,9 @@
+using System.ComponentModel.DataAnnotations;
+
+namespace Messaging.TapetiExample
+{
+ public class SpeedTestMessage
+ {
+ public int PublishCount { get; set; }
+ }
+}
diff --git a/README.md b/README.md
index c35e158..827d424 100644
--- a/README.md
+++ b/README.md
@@ -1,19 +1,58 @@
+## Introduction
+Tapeti is a wrapper for the RabbitMQ .NET Client designed for long-running microservices. It’s main goal is to minimize the amount of messaging code required, and instead focus on the higher-level flow.
+
+## Key features
+* Consumers are declared using MVC-style controllers and are registered automatically based on annotations
+* Publishing requires only the message class, no transport details such as exchange and routing key
+* Flow extension (stateful request - response handling with support for parallel requests)
+* No inheritance required
+* Graceful recovery in case of connection issues, and in contrast to most libraries not designed for services, during startup as well
+* Extensible using middleware
+
+## Show me the code!
+Below is a bare minimum message controller from the first example project to get a feel for how messages are handled using Tapeti.
+```csharp
+///
+/// Example of a simple broadcast message used in the standard publish - subscribe pattern
+///
+public class PublishSubscribeMessage
+{
+ [Required(ErrorMessage = "Don't be impolite, supply a {0}")]
+ public string Greeting { get; set; }
+}
+
+
+[MessageController]
+[DynamicQueue("tapeti.example.01")]
+public class ExampleMessageController
+{
+ public ExampleMessageController() { }
+
+ public void HandlePublishSubscribeMessage(PublishSubscribeMessage message)
+ {
+ Console.WriteLine("Received message: " + message.Greeting);
+ }
+}
+```
+
+More details and examples can be found in the documentation as well as the example projects included with the source.
+
## Documentation
The documentation for Tapeti is available on Read the Docs:
-[Develop branch](http://tapeti.readthedocs.io/en/latest/)
-[![Documentation Status](https://readthedocs.org/projects/tapeti/badge/?version=latest)](http://tapeti.readthedocs.io/en/latest/?badge=latest)
+[Master branch (stable release)](http://tapeti.readthedocs.io/en/stable/introduction.html)
+[![Documentation Status](https://readthedocs.org/projects/tapeti/badge/?version=stable)](http://tapeti.readthedocs.io/en/stable/introduction.html?badge=stable)
+
+[Develop branch](http://tapeti.readthedocs.io/en/latest/introduction.html)
+[![Documentation Status](https://readthedocs.org/projects/tapeti/badge/?version=latest)](http://tapeti.readthedocs.io/en/latest/introduction.html?badge=latest)
-[Master branch](http://tapeti.readthedocs.io/en/stable/)
-[![Documentation Status](https://readthedocs.org/projects/tapeti/badge/?version=stable)](http://tapeti.readthedocs.io/en/stable/?badge=stable)
## Builds
Builds are automatically run using AppVeyor, with the resulting packages being pushed to NuGet.
+Master build (stable release)
+[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x/branch/master?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti/branch/master)
Latest build
-[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti)
-
-Master build
-[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x/branch/master?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti/branch/master)
+[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti)
\ No newline at end of file
diff --git a/Tapeti.Annotations/DurableQueueAttribute.cs b/Tapeti.Annotations/DurableQueueAttribute.cs
index 8971044..ae99278 100644
--- a/Tapeti.Annotations/DurableQueueAttribute.cs
+++ b/Tapeti.Annotations/DurableQueueAttribute.cs
@@ -8,11 +8,6 @@ namespace Tapeti.Annotations
/// Binds to an existing durable queue to receive messages. Can be used
/// on an entire MessageController class or on individual methods.
///
- ///
- /// At the moment there is no support for creating a durable queue and managing the
- /// bindings. The author recommends https://git.x2software.net/pub/RabbitMetaQueue
- /// for deploy-time management of durable queues (shameless plug intended).
- ///
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
[MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)]
public class DurableQueueAttribute : Attribute
diff --git a/Tapeti.Annotations/ReSharper/JetBrains.Annotations.cs b/Tapeti.Annotations/ReSharper/JetBrains.Annotations.cs
index 9ec2401..2790b2e 100644
--- a/Tapeti.Annotations/ReSharper/JetBrains.Annotations.cs
+++ b/Tapeti.Annotations/ReSharper/JetBrains.Annotations.cs
@@ -30,6 +30,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. */
using System;
+// ReSharper disable InheritdocConsiderUsage
#pragma warning disable 1591
// ReSharper disable UnusedMember.Global
@@ -96,9 +97,9 @@ namespace JetBrains.Annotations
TargetFlags = targetFlags;
}
- public ImplicitUseKindFlags UseKindFlags { get; private set; }
+ public ImplicitUseKindFlags UseKindFlags { get; }
- public ImplicitUseTargetFlags TargetFlags { get; private set; }
+ public ImplicitUseTargetFlags TargetFlags { get; }
}
///
@@ -142,7 +143,7 @@ namespace JetBrains.Annotations
///
InstantiatedWithFixedConstructorSignature = 4,
/// Indicates implicit instantiation of a type.
- InstantiatedNoFixedConstructorSignature = 8,
+ InstantiatedNoFixedConstructorSignature = 8
}
///
@@ -174,6 +175,6 @@ namespace JetBrains.Annotations
Comment = comment;
}
- [CanBeNull] public string Comment { get; private set; }
+ [CanBeNull] public string Comment { get; }
}
}
\ No newline at end of file
diff --git a/Tapeti.Annotations/RequestAttribute.cs b/Tapeti.Annotations/RequestAttribute.cs
index f298c50..7f7a6a9 100644
--- a/Tapeti.Annotations/RequestAttribute.cs
+++ b/Tapeti.Annotations/RequestAttribute.cs
@@ -7,7 +7,7 @@ namespace Tapeti.Annotations
/// Can be attached to a message class to specify that the receiver of the message must
/// provide a response message of the type specified in the Response attribute. This response
/// must be sent by either returning it from the message handler method or using
- /// YieldWithResponse when using Tapeti Flow. These methods will respond directly
+ /// EndWithResponse when using Tapeti Flow. These methods will respond directly
/// to the queue specified in the reply-to header automatically added by Tapeti.
///
[AttributeUsage(AttributeTargets.Class)]
diff --git a/Tapeti.Annotations/Tapeti.Annotations.csproj b/Tapeti.Annotations/Tapeti.Annotations.csproj
index be5c9ef..5c1ef0d 100644
--- a/Tapeti.Annotations/Tapeti.Annotations.csproj
+++ b/Tapeti.Annotations/Tapeti.Annotations.csproj
@@ -3,6 +3,11 @@
netstandard2.0
true
+ 2.0.0
+
+
+
+ 1701;1702
diff --git a/Tapeti.Annotations/Tapeti.Annotations.nuspec b/Tapeti.Annotations/Tapeti.Annotations.nuspec
index 68e8268..87e40c7 100644
--- a/Tapeti.Annotations/Tapeti.Annotations.nuspec
+++ b/Tapeti.Annotations/Tapeti.Annotations.nuspec
@@ -6,7 +6,7 @@
Tapeti Annotations
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Annotations.png
false
diff --git a/Tapeti.Autofac/AutofacDependencyResolver.cs b/Tapeti.Autofac/AutofacDependencyResolver.cs
new file mode 100644
index 0000000..cf6cee8
--- /dev/null
+++ b/Tapeti.Autofac/AutofacDependencyResolver.cs
@@ -0,0 +1,148 @@
+using System;
+using Autofac;
+using Autofac.Builder;
+
+namespace Tapeti.Autofac
+{
+ ///
+ ///
+ /// Dependency resolver and container implementation for Autofac.
+ /// Since this class needs access to both the ContainerBuilder and the built IContainer,
+ /// either let AutofacDependencyResolver build the container by calling it's Build method,
+ /// or set the Container property manually.
+ ///
+ public class AutofacDependencyResolver : IDependencyContainer
+ {
+ private ContainerBuilder containerBuilder;
+ private IContainer container;
+
+
+ ///
+ /// The built container. Either set directly, or use the Build method to built the
+ /// update this reference.
+ ///
+ public IContainer Container
+ {
+ get => container;
+ set
+ {
+ container = value;
+ if (value != null)
+ containerBuilder = null;
+ }
+ }
+
+
+ ///
+ public AutofacDependencyResolver(ContainerBuilder containerBuilder)
+ {
+ this.containerBuilder = containerBuilder;
+ }
+
+
+ ///
+ /// Builds the container, updates the Container property and returns the newly built IContainer.
+ ///
+ public IContainer Build(ContainerBuildOptions options = ContainerBuildOptions.None)
+ {
+ CheckContainerBuilder();
+ Container = containerBuilder.Build(options);
+
+ return container;
+ }
+
+
+ ///
+ public T Resolve() where T : class
+ {
+ CheckContainer();
+ return Container.Resolve();
+ }
+
+ ///
+ public object Resolve(Type type)
+ {
+ CheckContainer();
+ return Container.Resolve(type);
+ }
+
+
+ ///
+ public void RegisterDefault() where TService : class where TImplementation : class, TService
+ {
+ CheckContainerBuilder();
+ containerBuilder
+ .RegisterType()
+ .As()
+ .PreserveExistingDefaults();
+ }
+
+ ///
+ public void RegisterDefault(Func factory) where TService : class
+ {
+ CheckContainerBuilder();
+ containerBuilder
+ .Register(context => factory())
+ .As()
+ .PreserveExistingDefaults();
+ }
+
+
+ ///
+ public void RegisterDefaultSingleton() where TService : class where TImplementation : class, TService
+ {
+ CheckContainerBuilder();
+ containerBuilder
+ .RegisterType()
+ .As()
+ .SingleInstance()
+ .PreserveExistingDefaults();
+ }
+
+ ///
+ public void RegisterDefaultSingleton(TService instance) where TService : class
+ {
+ CheckContainerBuilder();
+ containerBuilder
+ .RegisterInstance(instance)
+ .As()
+ .SingleInstance()
+ .PreserveExistingDefaults();
+ }
+
+ ///
+ public void RegisterDefaultSingleton(Func factory) where TService : class
+ {
+ CheckContainerBuilder();
+ containerBuilder
+ .Register(context => factory())
+ .As()
+ .SingleInstance()
+ .PreserveExistingDefaults();
+ }
+
+
+ ///
+ public void RegisterController(Type type)
+ {
+ CheckContainerBuilder();
+ containerBuilder
+ .RegisterType(type)
+ .AsSelf();
+ }
+
+
+ private void CheckContainer()
+ {
+ if (container == null)
+ throw new InvalidOperationException("Container property has not been set yet on AutofacDependencyResolver");
+ }
+
+
+ private void CheckContainerBuilder()
+ {
+ if (containerBuilder == null)
+ throw new InvalidOperationException("Container property has already been set on AutofacDependencyResolver");
+ }
+ }
+}
diff --git a/Tapeti.Autofac/Tapeti.Autofac.csproj b/Tapeti.Autofac/Tapeti.Autofac.csproj
new file mode 100644
index 0000000..4aabb0c
--- /dev/null
+++ b/Tapeti.Autofac/Tapeti.Autofac.csproj
@@ -0,0 +1,17 @@
+
+
+
+ netstandard2.0
+ true
+ 2.0.0
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Tapeti.Autofac/Tapeti.Autofac.nuspec b/Tapeti.Autofac/Tapeti.Autofac.nuspec
new file mode 100644
index 0000000..d788f49
--- /dev/null
+++ b/Tapeti.Autofac/Tapeti.Autofac.nuspec
@@ -0,0 +1,24 @@
+
+
+
+ Tapeti.Autofac
+ $version$
+ Tapeti Autofac
+ Mark van Renswoude
+ Mark van Renswoude
+ Unlicense
+ https://github.com/MvRens/Tapeti
+ https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png
+ false
+ Autofac integration package for Tapeti
+
+ rabbitmq tapeti autofac
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj
new file mode 100644
index 0000000..2c81fe5
--- /dev/null
+++ b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj
@@ -0,0 +1,17 @@
+
+
+
+ netstandard2.0
+ true
+ 2.0.0
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.nuspec b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.nuspec
new file mode 100644
index 0000000..ac652ed
--- /dev/null
+++ b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.nuspec
@@ -0,0 +1,24 @@
+
+
+
+ Tapeti.CastleWindsor
+ $version$
+ Tapeti Castle Windsor
+ Mark van Renswoude
+ Mark van Renswoude
+ Unlicense
+ https://github.com/MvRens/Tapeti
+ https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png
+ false
+ Castle.Windsor integration package for Tapeti
+
+ rabbitmq tapeti castle windsor
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Tapeti.CastleWindsor/WindsorDependencyResolver.cs b/Tapeti.CastleWindsor/WindsorDependencyResolver.cs
new file mode 100644
index 0000000..419d115
--- /dev/null
+++ b/Tapeti.CastleWindsor/WindsorDependencyResolver.cs
@@ -0,0 +1,98 @@
+using System;
+using Castle.MicroKernel.Registration;
+using Castle.Windsor;
+
+namespace Tapeti.CastleWindsor
+{
+ ///
+ ///
+ /// Dependency resolver and container implementation for Castle Windsor.
+ ///
+ public class WindsorDependencyResolver : IDependencyContainer
+ {
+ private readonly IWindsorContainer container;
+
+
+ ///
+ public WindsorDependencyResolver(IWindsorContainer container)
+ {
+ this.container = container;
+ }
+
+
+ ///
+ public T Resolve() where T : class
+ {
+ return container.Resolve();
+ }
+
+ ///
+ public object Resolve(Type type)
+ {
+ return container.Resolve(type);
+ }
+
+
+ ///
+ public void RegisterDefault() where TService : class where TImplementation : class, TService
+ {
+ // No need for anything special to register as default, because "In Windsor first one wins":
+ // https://github.com/castleproject/Windsor/blob/master/docs/registering-components-one-by-one.md
+ container.Register(
+ Component
+ .For()
+ .ImplementedBy()
+ );
+ }
+
+ ///
+ public void RegisterDefault(Func factory) where TService : class
+ {
+ container.Register(
+ Component
+ .For()
+ .UsingFactoryMethod(() => factory())
+ );
+ }
+
+
+ ///
+ public void RegisterDefaultSingleton() where TService : class where TImplementation : class, TService
+ {
+ container.Register(
+ Component
+ .For()
+ .ImplementedBy()
+ .LifestyleSingleton()
+ );
+ }
+
+ ///
+ public void RegisterDefaultSingleton(TService instance) where TService : class
+ {
+ container.Register(
+ Component
+ .For()
+ .Instance(instance)
+ );
+ }
+
+ ///
+ public void RegisterDefaultSingleton(Func factory) where TService : class
+ {
+ container.Register(
+ Component
+ .For()
+ .UsingFactoryMethod(() => factory())
+ .LifestyleSingleton()
+ );
+ }
+
+
+ ///
+ public void RegisterController(Type type)
+ {
+ container.Register(Component.For(type));
+ }
+ }
+}
diff --git a/Tapeti.Cmd/Commands/ExportCommand.cs b/Tapeti.Cmd/Commands/ExportCommand.cs
new file mode 100644
index 0000000..2f69aa4
--- /dev/null
+++ b/Tapeti.Cmd/Commands/ExportCommand.cs
@@ -0,0 +1,46 @@
+using RabbitMQ.Client;
+using Tapeti.Cmd.Serialization;
+
+namespace Tapeti.Cmd.Commands
+{
+ public class ExportCommand
+ {
+ public IMessageSerializer MessageSerializer { get; set; }
+
+ public string QueueName { get; set; }
+ public bool RemoveMessages { get; set; }
+ public int? MaxCount { get; set; }
+
+
+ public int Execute(IModel channel)
+ {
+ var messageCount = 0;
+
+ while (!MaxCount.HasValue || messageCount < MaxCount.Value)
+ {
+ var result = channel.BasicGet(QueueName, false);
+ if (result == null)
+ // No more messages on the queue
+ break;
+
+ messageCount++;
+
+ MessageSerializer.Serialize(new Message
+ {
+ DeliveryTag = result.DeliveryTag,
+ Redelivered = result.Redelivered,
+ Exchange = result.Exchange,
+ RoutingKey = result.RoutingKey,
+ Queue = QueueName,
+ Properties = result.BasicProperties,
+ Body = result.Body
+ });
+
+ if (RemoveMessages)
+ channel.BasicAck(result.DeliveryTag, false);
+ }
+
+ return messageCount;
+ }
+ }
+}
diff --git a/Tapeti.Cmd/Commands/ImportCommand.cs b/Tapeti.Cmd/Commands/ImportCommand.cs
new file mode 100644
index 0000000..ccdf308
--- /dev/null
+++ b/Tapeti.Cmd/Commands/ImportCommand.cs
@@ -0,0 +1,29 @@
+using RabbitMQ.Client;
+using Tapeti.Cmd.Serialization;
+
+namespace Tapeti.Cmd.Commands
+{
+ public class ImportCommand
+ {
+ public IMessageSerializer MessageSerializer { get; set; }
+
+ public bool DirectToQueue { get; set; }
+
+
+ public int Execute(IModel channel)
+ {
+ var messageCount = 0;
+
+ foreach (var message in MessageSerializer.Deserialize())
+ {
+ var exchange = DirectToQueue ? "" : message.Exchange;
+ var routingKey = DirectToQueue ? message.Queue : message.RoutingKey;
+
+ channel.BasicPublish(exchange, routingKey, message.Properties, message.Body);
+ messageCount++;
+ }
+
+ return messageCount;
+ }
+ }
+}
diff --git a/Tapeti.Cmd/Commands/ShovelCommand.cs b/Tapeti.Cmd/Commands/ShovelCommand.cs
new file mode 100644
index 0000000..9b42a3a
--- /dev/null
+++ b/Tapeti.Cmd/Commands/ShovelCommand.cs
@@ -0,0 +1,37 @@
+using RabbitMQ.Client;
+
+namespace Tapeti.Cmd.Commands
+{
+ public class ShovelCommand
+ {
+ public string QueueName { get; set; }
+ public string TargetQueueName { get; set; }
+ public bool RemoveMessages { get; set; }
+ public int? MaxCount { get; set; }
+
+
+ public int Execute(IModel sourceChannel, IModel targetChannel)
+ {
+ var messageCount = 0;
+
+ while (!MaxCount.HasValue || messageCount < MaxCount.Value)
+ {
+ var result = sourceChannel.BasicGet(QueueName, false);
+ if (result == null)
+ // No more messages on the queue
+ break;
+
+
+ targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, result.Body);
+
+
+ messageCount++;
+
+ if (RemoveMessages)
+ sourceChannel.BasicAck(result.DeliveryTag, false);
+ }
+
+ return messageCount;
+ }
+ }
+}
diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs
new file mode 100644
index 0000000..5088586
--- /dev/null
+++ b/Tapeti.Cmd/Program.cs
@@ -0,0 +1,293 @@
+using System;
+using System.Diagnostics;
+using CommandLine;
+using RabbitMQ.Client;
+using Tapeti.Cmd.Commands;
+using Tapeti.Cmd.Serialization;
+
+namespace Tapeti.Cmd
+{
+ public class Program
+ {
+ public class CommonOptions
+ {
+ [Option('h', "host", HelpText = "Hostname of the RabbitMQ server.", Default = "localhost")]
+ public string Host { get; set; }
+
+ [Option('p', "port", HelpText = "AMQP port of the RabbitMQ server.", Default = 5672)]
+ public int Port { get; set; }
+
+ [Option('v', "virtualhost", HelpText = "Virtual host used for the RabbitMQ connection.", Default = "/")]
+ public string VirtualHost { get; set; }
+
+ [Option('u', "username", HelpText = "Username used to connect to the RabbitMQ server.", Default = "guest")]
+ public string Username { get; set; }
+
+ [Option('p', "password", HelpText = "Password used to connect to the RabbitMQ server.", Default = "guest")]
+ public string Password { get; set; }
+ }
+
+
+ public enum SerializationMethod
+ {
+ SingleFileJSON,
+ EasyNetQHosepipe
+ }
+
+
+ public class MessageSerializerOptions : CommonOptions
+ {
+ [Option('s', "serialization", HelpText = "The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe.", Default = SerializationMethod.SingleFileJSON)]
+ public SerializationMethod SerializationMethod { get; set; }
+ }
+
+
+
+ [Verb("export", HelpText = "Fetch messages from a queue and write it to disk.")]
+ public class ExportOptions : MessageSerializerOptions
+ {
+ [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")]
+ public string QueueName { get; set; }
+
+ [Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")]
+ public string OutputPath { get; set; }
+
+ [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")]
+ public bool RemoveMessages { get; set; }
+
+ [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")]
+ public int? MaxCount { get; set; }
+ }
+
+
+ [Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")]
+ public class ImportOptions : MessageSerializerOptions
+ {
+ [Option('i', "input", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")]
+ public string Input { get; set; }
+
+ [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
+ public bool PublishToExchange { get; set; }
+ }
+
+
+ [Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")]
+ public class ShovelOptions : CommonOptions
+ {
+ [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")]
+ public string QueueName { get; set; }
+
+ [Option('t', "targetqueue", HelpText = "The target queue to publish the messages to. Defaults to the source queue if a different target host, port or virtualhost is specified. Otherwise it must be different from the source queue.")]
+ public string TargetQueueName { get; set; }
+
+ [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the source queue. If not messages are kept.")]
+ public bool RemoveMessages { get; set; }
+
+ [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")]
+ public int? MaxCount { get; set; }
+
+ [Option("targethost", HelpText = "Hostname of the target RabbitMQ server. Defaults to the source host. Note that you may still specify a different targetusername for example.")]
+ public string TargetHost { get; set; }
+
+ [Option("targetport", HelpText = "AMQP port of the target RabbitMQ server. Defaults to the source port.")]
+ public int? TargetPort { get; set; }
+
+ [Option("targetvirtualhost", HelpText = "Virtual host used for the target RabbitMQ connection. Defaults to the source virtualhost.")]
+ public string TargetVirtualHost { get; set; }
+
+ [Option("targetusername", HelpText = "Username used to connect to the target RabbitMQ server. Defaults to the source username.")]
+ public string TargetUsername { get; set; }
+
+ [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")]
+ public string TargetPassword { get; set; }
+ }
+
+
+
+
+ public static int Main(string[] args)
+ {
+ return Parser.Default.ParseArguments(args)
+ .MapResult(
+ (ExportOptions o) => ExecuteVerb(o, RunExport),
+ (ImportOptions o) => ExecuteVerb(o, RunImport),
+ (ShovelOptions o) => ExecuteVerb(o, RunShovel),
+ errs =>
+ {
+ if (!Debugger.IsAttached)
+ return 1;
+
+ Console.WriteLine("Press any Enter key to continue...");
+ Console.ReadLine();
+ return 1;
+ }
+ );
+ }
+
+
+ private static int ExecuteVerb(T options, Action execute) where T : class
+ {
+ try
+ {
+ execute(options);
+ return 0;
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine(e.Message);
+ return 1;
+ }
+ }
+
+
+ private static IConnection GetConnection(CommonOptions options)
+ {
+ var factory = new ConnectionFactory
+ {
+ HostName = options.Host,
+ Port = options.Port,
+ VirtualHost = options.VirtualHost,
+ UserName = options.Username,
+ Password = options.Password
+ };
+
+ return factory.CreateConnection();
+ }
+
+
+ private static IMessageSerializer GetMessageSerializer(MessageSerializerOptions options, string path)
+ {
+ switch (options.SerializationMethod)
+ {
+ case SerializationMethod.SingleFileJSON:
+ return new SingleFileJSONMessageSerializer(path);
+
+ case SerializationMethod.EasyNetQHosepipe:
+ return new EasyNetQMessageSerializer(path);
+
+ default:
+ throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod");
+ }
+ }
+
+
+ private static void RunExport(ExportOptions options)
+ {
+ int messageCount;
+
+ using (var messageSerializer = GetMessageSerializer(options, options.OutputPath))
+ using (var connection = GetConnection(options))
+ using (var channel = connection.CreateModel())
+ {
+ messageCount = new ExportCommand
+ {
+ MessageSerializer = messageSerializer,
+
+ QueueName = options.QueueName,
+ RemoveMessages = options.RemoveMessages,
+ MaxCount = options.MaxCount
+ }.Execute(channel);
+ }
+
+ Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported.");
+ }
+
+
+ private static void RunImport(ImportOptions options)
+ {
+ int messageCount;
+
+ using (var messageSerializer = GetMessageSerializer(options, options.Input))
+ using (var connection = GetConnection(options))
+ using (var channel = connection.CreateModel())
+ {
+ messageCount = new ImportCommand
+ {
+ MessageSerializer = messageSerializer,
+
+ DirectToQueue = !options.PublishToExchange
+ }.Execute(channel);
+ }
+
+ Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published.");
+ }
+
+
+ private static void RunShovel(ShovelOptions options)
+ {
+ int messageCount;
+
+ using (var sourceConnection = GetConnection(options))
+ using (var sourceChannel = sourceConnection.CreateModel())
+ {
+ var shovelCommand = new ShovelCommand
+ {
+ QueueName = options.QueueName,
+ TargetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName,
+ RemoveMessages = options.RemoveMessages,
+ MaxCount = options.MaxCount
+ };
+
+
+ if (RequiresSecondConnection(options))
+ {
+ using (var targetConnection = GetTargetConnection(options))
+ using (var targetChannel = targetConnection.CreateModel())
+ {
+ messageCount = shovelCommand.Execute(sourceChannel, targetChannel);
+ }
+ }
+ else
+ messageCount = shovelCommand.Execute(sourceChannel, sourceChannel);
+ }
+
+ Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled.");
+ }
+
+
+ private static bool RequiresSecondConnection(ShovelOptions options)
+ {
+ if (!string.IsNullOrEmpty(options.TargetHost) && options.TargetHost != options.Host)
+ return true;
+
+ if (options.TargetPort.HasValue && options.TargetPort.Value != options.Port)
+ return true;
+
+ if (!string.IsNullOrEmpty(options.TargetVirtualHost) && options.TargetVirtualHost != options.VirtualHost)
+ return true;
+
+
+ // All relevant target host parameters are either omitted or the same. This means the queue must be different
+ // to prevent an infinite loop.
+ if (string.IsNullOrEmpty(options.TargetQueueName) || options.TargetQueueName == options.QueueName)
+ throw new ArgumentException("Target queue must be different from the source queue when shoveling within the same (virtual) host");
+
+
+ if (!string.IsNullOrEmpty(options.TargetUsername) && options.TargetUsername != options.Username)
+ return true;
+
+ // ReSharper disable once ConvertIfStatementToReturnStatement
+ if (!string.IsNullOrEmpty(options.TargetPassword) && options.TargetPassword != options.Password)
+ return true;
+
+
+ // Everything's the same, we can use the same channel
+ return false;
+ }
+
+
+ private static IConnection GetTargetConnection(ShovelOptions options)
+ {
+ var factory = new ConnectionFactory
+ {
+ HostName = !string.IsNullOrEmpty(options.TargetHost) ? options.TargetHost : options.Host,
+ Port = options.TargetPort ?? options.Port,
+ VirtualHost = !string.IsNullOrEmpty(options.TargetVirtualHost) ? options.TargetVirtualHost : options.VirtualHost,
+ UserName = !string.IsNullOrEmpty(options.TargetUsername) ? options.TargetUsername : options.Username,
+ Password = !string.IsNullOrEmpty(options.TargetPassword) ? options.TargetPassword : options.Password,
+ };
+
+ return factory.CreateConnection();
+ }
+ }
+}
diff --git a/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs b/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs
new file mode 100644
index 0000000..db3f589
--- /dev/null
+++ b/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs
@@ -0,0 +1,314 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Text.RegularExpressions;
+using Newtonsoft.Json;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Framing;
+
+namespace Tapeti.Cmd.Serialization
+{
+ public class EasyNetQMessageSerializer : IMessageSerializer
+ {
+ private static readonly Regex InvalidCharRegex = new Regex(@"[\\\/:\*\?\""\<\>|]", RegexOptions.Compiled);
+
+ private readonly string path;
+ private readonly Lazy writablePath;
+ private int messageCount;
+
+
+ public EasyNetQMessageSerializer(string path)
+ {
+ this.path = path;
+
+ writablePath = new Lazy(() =>
+ {
+ Directory.CreateDirectory(path);
+ return path;
+ });
+ }
+
+
+ public void Dispose()
+ {
+ }
+
+
+ public void Serialize(Message message)
+ {
+ var uniqueFileName = SanitiseQueueName(message.Queue) + "." + messageCount;
+
+ var bodyPath = Path.Combine(writablePath.Value, uniqueFileName + ".message.txt");
+ var propertiesPath = Path.Combine(writablePath.Value, uniqueFileName + ".properties.txt");
+ var infoPath = Path.Combine(writablePath.Value, uniqueFileName + ".info.txt");
+
+ var properties = new EasyNetQMessageProperties(message.Properties);
+ var info = new EasyNetQMessageReceivedInfo(message);
+
+ File.WriteAllText(bodyPath, Encoding.UTF8.GetString(message.Body));
+ File.WriteAllText(propertiesPath, JsonConvert.SerializeObject(properties));
+ File.WriteAllText(infoPath, JsonConvert.SerializeObject(info));
+
+ messageCount++;
+ }
+
+
+ private static string SanitiseQueueName(string queueName)
+ {
+ return InvalidCharRegex.Replace(queueName, "_");
+ }
+
+
+ public IEnumerable Deserialize()
+ {
+ foreach (var file in Directory.GetFiles(path, "*.*.message.txt"))
+ {
+ const string messageTag = ".message.";
+
+ var directoryName = Path.GetDirectoryName(file);
+ var fileName = Path.GetFileName(file);
+ var propertiesFileName = Path.Combine(directoryName, fileName.Replace(messageTag, ".properties."));
+ var infoFileName = Path.Combine(directoryName, fileName.Replace(messageTag, ".info."));
+
+ var body = File.ReadAllText(file);
+
+ var propertiesJson = File.ReadAllText(propertiesFileName);
+ var properties = JsonConvert.DeserializeObject(propertiesJson);
+
+ var infoJson = File.ReadAllText(infoFileName);
+ var info = JsonConvert.DeserializeObject(infoJson);
+
+ var message = info.ToMessage();
+ message.Properties = properties.ToBasicProperties();
+ message.Body = Encoding.UTF8.GetBytes(body);
+
+ yield return message;
+ }
+ }
+
+
+ // ReSharper disable MemberCanBePrivate.Local - used by JSON deserialization
+ // ReSharper disable AutoPropertyCanBeMadeGetOnly.Local
+ private class EasyNetQMessageProperties
+ {
+ // ReSharper disable once MemberCanBePrivate.Local - used by JSON deserialization
+ public EasyNetQMessageProperties()
+ {
+ }
+
+ public EasyNetQMessageProperties(IBasicProperties basicProperties) : this()
+ {
+ if (basicProperties.IsContentTypePresent()) ContentType = basicProperties.ContentType;
+ if (basicProperties.IsContentEncodingPresent()) ContentEncoding = basicProperties.ContentEncoding;
+ if (basicProperties.IsDeliveryModePresent()) DeliveryMode = basicProperties.DeliveryMode;
+ if (basicProperties.IsPriorityPresent()) Priority = basicProperties.Priority;
+ if (basicProperties.IsCorrelationIdPresent()) CorrelationId = basicProperties.CorrelationId;
+ if (basicProperties.IsReplyToPresent()) ReplyTo = basicProperties.ReplyTo;
+ if (basicProperties.IsExpirationPresent()) Expiration = basicProperties.Expiration;
+ if (basicProperties.IsMessageIdPresent()) MessageId = basicProperties.MessageId;
+ if (basicProperties.IsTimestampPresent()) Timestamp = basicProperties.Timestamp.UnixTime;
+ if (basicProperties.IsTypePresent()) Type = basicProperties.Type;
+ if (basicProperties.IsUserIdPresent()) UserId = basicProperties.UserId;
+ if (basicProperties.IsAppIdPresent()) AppId = basicProperties.AppId;
+ if (basicProperties.IsClusterIdPresent()) ClusterId = basicProperties.ClusterId;
+
+ if (!basicProperties.IsHeadersPresent())
+ return;
+
+ foreach (var header in basicProperties.Headers)
+ Headers.Add(header.Key, (byte[])header.Value);
+ }
+
+ public IBasicProperties ToBasicProperties()
+ {
+ var basicProperties = new BasicProperties();
+
+ if (ContentTypePresent) basicProperties.ContentType = ContentType;
+ if (ContentEncodingPresent) basicProperties.ContentEncoding = ContentEncoding;
+ if (DeliveryModePresent) basicProperties.DeliveryMode = DeliveryMode;
+ if (PriorityPresent) basicProperties.Priority = Priority;
+ if (CorrelationIdPresent) basicProperties.CorrelationId = CorrelationId;
+ if (ReplyToPresent) basicProperties.ReplyTo = ReplyTo;
+ if (ExpirationPresent) basicProperties.Expiration = Expiration;
+ if (MessageIdPresent) basicProperties.MessageId = MessageId;
+ if (TimestampPresent) basicProperties.Timestamp = new AmqpTimestamp(Timestamp);
+ if (TypePresent) basicProperties.Type = Type;
+ if (UserIdPresent) basicProperties.UserId = UserId;
+ if (AppIdPresent) basicProperties.AppId = AppId;
+ if (ClusterIdPresent) basicProperties.ClusterId = ClusterId;
+
+ if (HeadersPresent)
+ {
+ basicProperties.Headers = new Dictionary(Headers.ToDictionary(p => p.Key, p => (object)p.Value));
+ }
+
+ return basicProperties;
+ }
+
+
+ private string contentType;
+ public string ContentType
+ {
+ get => contentType;
+ set { contentType = value; ContentTypePresent = true; }
+ }
+
+ private string contentEncoding;
+ public string ContentEncoding
+ {
+ get => contentEncoding;
+ set { contentEncoding = value; ContentEncodingPresent = true; }
+ }
+
+ // The original EasyNetQ.Hosepipe defines this as an IDictionary. This causes UTF-8 headers
+ // to be serialized as Base64, and deserialized as string, corrupting the republished message.
+ // This may cause incompatibilities, but fixes it for dumped Tapeti messages.
+ private IDictionary headers = new Dictionary();
+ public IDictionary Headers
+ {
+ get => headers;
+ set { headers = value; HeadersPresent = true; }
+ }
+
+ private byte deliveryMode;
+ public byte DeliveryMode
+ {
+ get => deliveryMode;
+ set { deliveryMode = value; DeliveryModePresent = true; }
+ }
+
+ private byte priority;
+ public byte Priority
+ {
+ get => priority;
+ set { priority = value; PriorityPresent = true; }
+ }
+
+ private string correlationId;
+ public string CorrelationId
+ {
+ get => correlationId;
+ set { correlationId = value; CorrelationIdPresent = true; }
+ }
+
+ private string replyTo;
+ public string ReplyTo
+ {
+ get => replyTo;
+ set { replyTo = value; ReplyToPresent = true; }
+ }
+
+ private string expiration;
+ public string Expiration
+ {
+ get => expiration;
+ set { expiration = value; ExpirationPresent = true; }
+ }
+
+ private string messageId;
+ public string MessageId
+ {
+ get => messageId;
+ set { messageId = value; MessageIdPresent = true; }
+ }
+
+ private long timestamp;
+ public long Timestamp
+ {
+ get => timestamp;
+ set { timestamp = value; TimestampPresent = true; }
+ }
+
+ private string type;
+ public string Type
+ {
+ get => type;
+ set { type = value; TypePresent = true; }
+ }
+
+ private string userId;
+ public string UserId
+ {
+ get => userId;
+ set { userId = value; UserIdPresent = true; }
+ }
+
+ private string appId;
+ public string AppId
+ {
+ get => appId;
+ set { appId = value; AppIdPresent = true; }
+ }
+
+ private string clusterId;
+ public string ClusterId
+ {
+ get => clusterId;
+ set { clusterId = value; ClusterIdPresent = true; }
+ }
+
+ public bool ContentTypePresent { get; set; }
+ public bool ContentEncodingPresent { get; set; }
+ public bool HeadersPresent { get; set; } = true;
+ public bool DeliveryModePresent { get; set; }
+ public bool PriorityPresent { get; set; }
+ public bool CorrelationIdPresent { get; set; }
+ public bool ReplyToPresent { get; set; }
+ public bool ExpirationPresent { get; set; }
+ public bool MessageIdPresent { get; set; }
+ public bool TimestampPresent { get; set; }
+ public bool TypePresent { get; set; }
+ public bool UserIdPresent { get; set; }
+ public bool AppIdPresent { get; set; }
+ public bool ClusterIdPresent { get; set; }
+ }
+
+
+ private class EasyNetQMessageReceivedInfo
+ {
+ public string ConsumerTag { get; set; }
+ public ulong DeliverTag { get; set; }
+ public bool Redelivered { get; set; }
+ public string Exchange { get; set; }
+ public string RoutingKey { get; set; }
+ public string Queue { get; set; }
+
+
+ // ReSharper disable once MemberCanBePrivate.Local - used by JSON deserialization
+ // ReSharper disable once UnusedMember.Local
+ // ReSharper disable once UnusedMember.Global
+ public EasyNetQMessageReceivedInfo()
+ {
+ }
+
+
+ public EasyNetQMessageReceivedInfo(Message fromMessage)
+ {
+ ConsumerTag = "hosepipe";
+ DeliverTag = fromMessage.DeliveryTag;
+ Redelivered = fromMessage.Redelivered;
+ Exchange = fromMessage.Exchange;
+ RoutingKey = fromMessage.RoutingKey;
+ Queue = fromMessage.Queue;
+ }
+
+
+ public Message ToMessage()
+ {
+ return new Message
+ {
+ //ConsumerTag =
+ DeliveryTag = DeliverTag,
+ Redelivered = Redelivered,
+ Exchange = Exchange,
+ RoutingKey = RoutingKey,
+ Queue = Queue
+ };
+ }
+ }
+ // ReSharper restore AutoPropertyCanBeMadeGetOnly.Local
+ // ReSharper restore MemberCanBePrivate.Local
+ }
+}
\ No newline at end of file
diff --git a/Tapeti.Cmd/Serialization/IMessageSerializer.cs b/Tapeti.Cmd/Serialization/IMessageSerializer.cs
new file mode 100644
index 0000000..e8ce5a3
--- /dev/null
+++ b/Tapeti.Cmd/Serialization/IMessageSerializer.cs
@@ -0,0 +1,24 @@
+using System;
+using System.Collections.Generic;
+using RabbitMQ.Client;
+
+namespace Tapeti.Cmd.Serialization
+{
+ public class Message
+ {
+ public ulong DeliveryTag;
+ public bool Redelivered;
+ public string Exchange;
+ public string RoutingKey;
+ public string Queue;
+ public IBasicProperties Properties;
+ public byte[] Body;
+ }
+
+
+ public interface IMessageSerializer : IDisposable
+ {
+ void Serialize(Message message);
+ IEnumerable Deserialize();
+ }
+}
diff --git a/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs b/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs
new file mode 100644
index 0000000..7079c5f
--- /dev/null
+++ b/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs
@@ -0,0 +1,234 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Text;
+using Newtonsoft.Json;
+using Newtonsoft.Json.Linq;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Framing;
+
+namespace Tapeti.Cmd.Serialization
+{
+ public class SingleFileJSONMessageSerializer : IMessageSerializer
+ {
+ private readonly string path;
+
+
+ private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings
+ {
+ NullValueHandling = NullValueHandling.Ignore
+ };
+
+ private readonly Lazy exportFile;
+
+
+ public SingleFileJSONMessageSerializer(string path)
+ {
+ this.path = path;
+ exportFile = new Lazy(() => new StreamWriter(path, false, Encoding.UTF8));
+ }
+
+
+ public void Serialize(Message message)
+ {
+ var serializableMessage = new SerializableMessage(message);
+ var serialized = JsonConvert.SerializeObject(serializableMessage, SerializerSettings);
+ exportFile.Value.WriteLine(serialized);
+ }
+
+
+ public IEnumerable Deserialize()
+ {
+ using (var file = new StreamReader(path))
+ {
+ while (!file.EndOfStream)
+ {
+ var serialized = file.ReadLine();
+ if (string.IsNullOrEmpty(serialized))
+ continue;
+
+ var serializableMessage = JsonConvert.DeserializeObject(serialized);
+ if (serializableMessage == null)
+ continue;
+
+ yield return serializableMessage.ToMessage();
+ }
+ }
+ }
+
+
+ public void Dispose()
+ {
+ if (exportFile.IsValueCreated)
+ exportFile.Value.Dispose();
+ }
+
+
+
+ // ReSharper disable MemberCanBePrivate.Local - used for JSON serialization
+ // ReSharper disable NotAccessedField.Local
+ // ReSharper disable FieldCanBeMadeReadOnly.Local
+ private class SerializableMessage
+ {
+ public ulong DeliveryTag;
+ public bool Redelivered;
+ public string Exchange;
+ public string RoutingKey;
+ public string Queue;
+
+ // ReSharper disable once FieldCanBeMadeReadOnly.Local - must be settable by JSON deserialization
+ public SerializableMessageProperties Properties;
+
+ public JObject Body;
+ public byte[] RawBody;
+
+
+ // ReSharper disable once UnusedMember.Global - used by JSON deserialization
+ // ReSharper disable once UnusedMember.Local
+ public SerializableMessage()
+ {
+ Properties = new SerializableMessageProperties();
+ }
+
+
+ public SerializableMessage(Message fromMessage)
+ {
+ DeliveryTag = fromMessage.DeliveryTag;
+ Redelivered = fromMessage.Redelivered;
+ Exchange = fromMessage.Exchange;
+ RoutingKey = fromMessage.RoutingKey;
+ Queue = fromMessage.Queue;
+ Properties = new SerializableMessageProperties(fromMessage.Properties);
+
+ // If this is detected as a JSON message, include the object directly in the JSON line so that it is easier
+ // to read and process in the output file. Otherwise simply include the raw data and let Newtonsoft encode it.
+ // This does mean the message will be rewritten. If this is an issue, feel free to add a "raw" option to this tool
+ // that forces the RawBody to be used. It is open-source after all :-).
+ if (Properties.ContentType == "application/json")
+ {
+ try
+ {
+ Body = JObject.Parse(Encoding.UTF8.GetString(fromMessage.Body));
+ RawBody = null;
+ }
+ catch
+ {
+ // Fall back to using the raw body
+ Body = null;
+ RawBody = fromMessage.Body;
+ }
+ }
+ else
+ {
+ Body = null;
+ RawBody = fromMessage.Body;
+ }
+ }
+
+
+ public Message ToMessage()
+ {
+ return new Message
+ {
+ DeliveryTag = DeliveryTag,
+ Redelivered = Redelivered,
+ Exchange = Exchange,
+ RoutingKey = RoutingKey,
+ Queue = Queue,
+ Properties = Properties.ToBasicProperties(),
+ Body = Body != null
+ ? Encoding.UTF8.GetBytes(Body.ToString(Formatting.None))
+ : RawBody
+ };
+ }
+ }
+
+
+ // IBasicProperties is finicky when it comes to writing it's properties,
+ // so we need this normalized class to read and write it from and to JSON
+ private class SerializableMessageProperties
+ {
+ public string AppId;
+ public string ClusterId;
+ public string ContentEncoding;
+ public string ContentType;
+ public string CorrelationId;
+ public byte? DeliveryMode;
+ public string Expiration;
+ public IDictionary Headers;
+ public string MessageId;
+ public byte? Priority;
+ public string ReplyTo;
+ public long? Timestamp;
+ public string Type;
+ public string UserId;
+
+
+ public SerializableMessageProperties()
+ {
+ }
+
+
+ public SerializableMessageProperties(IBasicProperties fromProperties)
+ {
+ AppId = fromProperties.AppId;
+ ClusterId = fromProperties.ClusterId;
+ ContentEncoding = fromProperties.ContentEncoding;
+ ContentType = fromProperties.ContentType;
+ CorrelationId = fromProperties.CorrelationId;
+ DeliveryMode = fromProperties.IsDeliveryModePresent() ? (byte?)fromProperties.DeliveryMode : null;
+ Expiration = fromProperties.Expiration;
+ MessageId = fromProperties.MessageId;
+ Priority = fromProperties.IsPriorityPresent() ? (byte?) fromProperties.Priority : null;
+ ReplyTo = fromProperties.ReplyTo;
+ Timestamp = fromProperties.IsTimestampPresent() ? (long?)fromProperties.Timestamp.UnixTime : null;
+ Type = fromProperties.Type;
+ UserId = fromProperties.UserId;
+
+ if (fromProperties.IsHeadersPresent())
+ {
+ Headers = new Dictionary();
+
+ // This assumes header values are UTF-8 encoded strings. This is true for Tapeti.
+ foreach (var pair in fromProperties.Headers)
+ Headers.Add(pair.Key, Encoding.UTF8.GetString((byte[])pair.Value));
+ }
+ else
+ Headers = null;
+ }
+
+
+ public IBasicProperties ToBasicProperties()
+ {
+ var properties = new BasicProperties();
+
+ if (!string.IsNullOrEmpty(AppId)) properties.AppId = AppId;
+ if (!string.IsNullOrEmpty(ClusterId)) properties.ClusterId = ClusterId;
+ if (!string.IsNullOrEmpty(ContentEncoding)) properties.ContentEncoding = ContentEncoding;
+ if (!string.IsNullOrEmpty(ContentType)) properties.ContentType = ContentType;
+ if (DeliveryMode.HasValue) properties.DeliveryMode = DeliveryMode.Value;
+ if (!string.IsNullOrEmpty(Expiration)) properties.Expiration = Expiration;
+ if (!string.IsNullOrEmpty(MessageId)) properties.MessageId = MessageId;
+ if (Priority.HasValue) properties.Priority = Priority.Value;
+ if (!string.IsNullOrEmpty(ReplyTo)) properties.ReplyTo = ReplyTo;
+ if (Timestamp.HasValue) properties.Timestamp = new AmqpTimestamp(Timestamp.Value);
+ if (!string.IsNullOrEmpty(Type)) properties.Type = Type;
+ if (!string.IsNullOrEmpty(UserId)) properties.UserId = UserId;
+
+ // ReSharper disable once InvertIf
+ if (Headers != null)
+ {
+ properties.Headers = new Dictionary();
+
+ foreach (var pair in Headers)
+ properties.Headers.Add(pair.Key, Encoding.UTF8.GetBytes(pair.Value));
+ }
+
+ return properties;
+ }
+ }
+ // ReSharper restore FieldCanBeMadeReadOnly.Local
+ // ReSharper restore NotAccessedField.Local
+ // ReSharper restore MemberCanBePrivate.Local
+ }
+}
diff --git a/Tapeti.Cmd/Tapeti.Cmd.csproj b/Tapeti.Cmd/Tapeti.Cmd.csproj
new file mode 100644
index 0000000..6a80ffb
--- /dev/null
+++ b/Tapeti.Cmd/Tapeti.Cmd.csproj
@@ -0,0 +1,18 @@
+
+
+
+ Exe
+ netcoreapp2.2
+ 2.0.0
+ Mark van Renswoude
+ Mark van Renswoude
+ Tapeti Command-line Utility
+
+
+
+
+
+
+
+
+
diff --git a/Tapeti.Cmd/build-release.bat b/Tapeti.Cmd/build-release.bat
new file mode 100644
index 0000000..c9ccd4f
--- /dev/null
+++ b/Tapeti.Cmd/build-release.bat
@@ -0,0 +1 @@
+dotnet publish -c Release -r win-x64 --self-contained false
\ No newline at end of file
diff --git a/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs b/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs
index cef8d1e..b706ccd 100644
--- a/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs
+++ b/Tapeti.DataAnnotations.Extensions/RequiredGuidAttribute.cs
@@ -1,9 +1,11 @@
using System;
using System.ComponentModel.DataAnnotations;
-using System.Globalization;
+
+// ReSharper disable UnusedMember.Global
namespace Tapeti.DataAnnotations.Extensions
{
+ ///
///
/// Can be used on Guid fields which are supposed to be Required, as the Required attribute does
/// not work for Guids and making them Nullable is counter-intuitive.
@@ -13,10 +15,12 @@ namespace Tapeti.DataAnnotations.Extensions
private const string DefaultErrorMessage = "'{0}' does not contain a valid guid";
private const string InvalidTypeErrorMessage = "'{0}' is not of type Guid";
+ ///
public RequiredGuidAttribute() : base(DefaultErrorMessage)
{
}
+ ///
protected override ValidationResult IsValid(object value, ValidationContext validationContext)
{
if (value == null)
diff --git a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj
index 56cdff2..9fd102a 100644
--- a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj
+++ b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj
@@ -2,6 +2,12 @@
netstandard2.0
+ true
+ 2.0.0
+
+
+
+ 1701;1702
diff --git a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.nuspec b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.nuspec
index db70921..d1e1aa1 100644
--- a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.nuspec
+++ b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.nuspec
@@ -6,7 +6,7 @@
Tapeti DataAnnotations Extensions
Mark van Renswoude
Mark van Renswoude
- https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ Unlicense
https://github.com/MvRens/Tapeti
https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.DataAnnotations.png
false
diff --git a/Tapeti.DataAnnotations/ConfigExtensions.cs b/Tapeti.DataAnnotations/ConfigExtensions.cs
index 3001fe9..72a3cfb 100644
--- a/Tapeti.DataAnnotations/ConfigExtensions.cs
+++ b/Tapeti.DataAnnotations/ConfigExtensions.cs
@@ -1,10 +1,19 @@
-namespace Tapeti.DataAnnotations
+using Tapeti.Config;
+
+namespace Tapeti.DataAnnotations
{
+ ///
+ /// Extends ITapetiConfigBuilder to enable DataAnnotations.
+ ///
public static class ConfigExtensions
{
- public static TapetiConfig WithDataAnnotations(this TapetiConfig config)
+ ///
+ /// Enables the DataAnnotations validation middleware.
+ ///
+ ///
+ public static ITapetiConfigBuilder WithDataAnnotations(this ITapetiConfigBuilder config)
{
- config.Use(new DataAnnotationsMiddleware());
+ config.Use(new DataAnnotationsExtension());
return config;
}
}
diff --git a/Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs b/Tapeti.DataAnnotations/DataAnnotationsExtension.cs
similarity index 67%
rename from Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs
rename to Tapeti.DataAnnotations/DataAnnotationsExtension.cs
index ffbaac4..abdbc5c 100644
--- a/Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs
+++ b/Tapeti.DataAnnotations/DataAnnotationsExtension.cs
@@ -3,12 +3,18 @@ using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
- public class DataAnnotationsMiddleware : ITapetiExtension
+ ///
+ ///
+ /// Provides the DataAnnotations validation middleware.
+ ///
+ public class DataAnnotationsExtension : ITapetiExtension
{
+ ///
public void RegisterDefaults(IDependencyContainer container)
{
}
+ ///
public IEnumerable