diff --git a/Tapeti.Flow/Default/DelegateYieldPoint.cs b/Tapeti.Flow/Default/DelegateYieldPoint.cs
index 1c0a28b..984f4cd 100644
--- a/Tapeti.Flow/Default/DelegateYieldPoint.cs
+++ b/Tapeti.Flow/Default/DelegateYieldPoint.cs
@@ -3,7 +3,7 @@ using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
- internal class DelegateYieldPoint : IStateYieldPoint, IExecutableYieldPoint
+ internal class DelegateYieldPoint : IExecutableYieldPoint
{
public bool StoreState { get; }
diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs
index dac86f6..460d5b2 100644
--- a/Tapeti.Flow/Default/FlowContext.cs
+++ b/Tapeti.Flow/Default/FlowContext.cs
@@ -1,5 +1,5 @@
using System;
-using System.Collections.Generic;
+using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Flow.Default
@@ -13,6 +13,24 @@ namespace Tapeti.Flow.Default
public Guid ContinuationID { get; set; }
public ContinuationMetadata ContinuationMetadata { get; set; }
+ private bool stored;
+
+
+ public async Task EnsureStored()
+ {
+ if (stored)
+ return;
+
+ if (MessageContext == null) throw new ArgumentNullException(nameof(MessageContext));
+ if (FlowState == null) throw new ArgumentNullException(nameof(FlowState));
+ if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock));
+
+ FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(MessageContext.Controller);
+ await FlowStateLock.StoreFlowState(FlowState);
+
+ stored = true;
+ }
+
public void Dispose()
{
FlowStateLock?.Dispose();
diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs
index 8c2cef3..937d077 100644
--- a/Tapeti.Flow/Default/FlowProvider.cs
+++ b/Tapeti.Flow/Default/FlowProvider.cs
@@ -70,6 +70,7 @@ namespace Tapeti.Flow.Default
ReplyTo = responseHandlerInfo.ReplyToQueue
};
+ await context.EnsureStored();
await publisher.Publish(message, properties);
}
@@ -149,10 +150,8 @@ namespace Tapeti.Flow.Default
public async Task Execute(IMessageContext context, IYieldPoint yieldPoint)
{
- var stateYieldPoint = yieldPoint as IStateYieldPoint;
var executableYieldPoint = yieldPoint as IExecutableYieldPoint;
-
- var storeState = stateYieldPoint?.StoreState ?? false;
+ var storeState = executableYieldPoint?.StoreState ?? false;
FlowContext flowContext;
object flowContextItem;
@@ -200,14 +199,9 @@ namespace Tapeti.Flow.Default
}
if (storeState)
- {
- flowContext.FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(context.Controller);
- await flowContext.FlowStateLock.StoreFlowState(flowContext.FlowState);
- }
+ await flowContext.EnsureStored();
else
- {
await flowContext.FlowStateLock.DeleteFlowState();
- }
}
diff --git a/Tapeti.Flow/Default/IInternalYieldPoint.cs b/Tapeti.Flow/Default/IExecutableYieldPoint.cs
similarity index 75%
rename from Tapeti.Flow/Default/IInternalYieldPoint.cs
rename to Tapeti.Flow/Default/IExecutableYieldPoint.cs
index 5415f35..80db564 100644
--- a/Tapeti.Flow/Default/IInternalYieldPoint.cs
+++ b/Tapeti.Flow/Default/IExecutableYieldPoint.cs
@@ -2,14 +2,9 @@
namespace Tapeti.Flow.Default
{
- internal interface IStateYieldPoint : IYieldPoint
- {
- bool StoreState { get; }
- }
-
-
internal interface IExecutableYieldPoint : IYieldPoint
{
+ bool StoreState { get; }
Task Execute(FlowContext context);
}
}
diff --git a/Tapeti.Flow/Default/StateYieldPoint.cs b/Tapeti.Flow/Default/StateYieldPoint.cs
index df9c36e..521c2fc 100644
--- a/Tapeti.Flow/Default/StateYieldPoint.cs
+++ b/Tapeti.Flow/Default/StateYieldPoint.cs
@@ -1,6 +1,8 @@
-namespace Tapeti.Flow.Default
+using System.Threading.Tasks;
+
+namespace Tapeti.Flow.Default
{
- internal class StateYieldPoint : IStateYieldPoint
+ internal class StateYieldPoint : IExecutableYieldPoint
{
public bool StoreState { get; }
@@ -9,5 +11,12 @@
{
StoreState = storeState;
}
+
+
+ public async Task Execute(FlowContext context)
+ {
+ if (StoreState)
+ await context.EnsureStored();
+ }
}
}
diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj
index ab64752..ef01da0 100644
--- a/Tapeti.Flow/Tapeti.Flow.csproj
+++ b/Tapeti.Flow/Tapeti.Flow.csproj
@@ -60,7 +60,7 @@
-
+
diff --git a/docs/flow.rst b/docs/flow.rst
index 4b7c86b..9e84e94 100644
--- a/docs/flow.rst
+++ b/docs/flow.rst
@@ -1,5 +1,5 @@
-Flow
-====
+Flow extension
+==============
.. error:: You've stumbled upon a piece of unfinished documentation.
Behind you is all prior knowledge. In front of you is nothing but emptyness. What do you do?
diff --git a/docs/gettingstarted.rst b/docs/gettingstarted.rst
index 3677631..36047da 100644
--- a/docs/gettingstarted.rst
+++ b/docs/gettingstarted.rst
@@ -1,6 +1,8 @@
Getting started
===============
+This guide is a step by step introduction. If you want to know more about how Tapeti works, for example how it determines the exchange and routing keys, see :doc:`indepth`.
+
Install packages
----------------
I'll assume you are familiar with installing NuGet.org packages into your project.
@@ -50,23 +52,97 @@ First create an instance of TapetiConfig, tell it which controllers to register
Defining a message
------------------
-A message is a plain object which can be serialized using `Json.NET `_.
+A message is simply a plain object which can be serialized using `Json.NET `_.
::
- public class SomethingHappenedMessage
- {
- public string Description { get; set; }
- }
+ public class RabbitEscapedMessage
+ {
+ public string Name { get; set; }
+ public string LastKnownHutch { get; set; }
+ }
+
Creating a message controller
-----------------------------
+To handle messages you need what Tapeti refers to as a "message controller". It is similar to an ASP.NET MVC controller if you're familiar with those, but it handles RabbitMQ messages instead of HTTP requests.
-.. error:: You've stumbled upon a piece of unfinished documentation.
- Behind you is all prior knowledge. In front of you is nothing but emptyness. What do you do?
+All you need to do is create a new class and annotate it with the MessageController attribute and a queue attribute. The name and folder of the class is not important to Tapeti, though you might want to agree on a standard in your team.
- 1. Attempt to explore further
- 2. Complain to the author and demand your money back
- 3. Abandon all hope
+The queue attribute can be either *DynamicQueue* or *DurableQueue*. The attribute can be set for the entire controller (which is considered the default scenario) or specified / overridden per message handler.
- > |
\ No newline at end of file
+DynamicQueue will create a queue with a name generated by RabbitMQ which is automatically deleted when your service stops. Bindings will be added for the messages handled by the controller. You will typically use dynamic queues for scenarios where handling the message is only relevant while the service is running (for example, updating a service's cache or performing live queries).
+
+DurableQueue requires a queue name as the parameter. For now it is assumed that durable queues are already declared and bound, though Tapeti will include a way to create these kind of queues automatically as well in the near future.
+
+::
+
+ [MessageController]
+ [DynamicQueue]
+ public class MonitoringController
+ {
+ }
+
+
+Responding to messages
+----------------------
+Any public method in a message controller is considered a message handler. There are a few requirements which are enforced by Tapeti. Below are the default requirements, although some extension packages (like the :doc:`flow`) add their own or alter these requirements.
+
+- The first parameter must be the message class.
+- The return type can be void, Task, Task or a message class.
+
+The name of the method is not important to Tapeti. Any parameter other than the first will be resolved using the IoC container, although it is considered best practice to use the constructor for dependency injection instead.
+
+A new controller is instantiated for each message, so it is safe to use public or private fields to store state while handling the message. Just don't expect it to be there for the next message. If you need this behaviour, take a look at the :doc:`flow`!
+
+::
+
+ [MessageController]
+ [DynamicQueue]
+ public class MonitoringController
+ {
+ public void LogEscape(RabbitEscapedMessage message)
+ {
+ Logger.Warning($"This is a beige alert. {message.Name} has escaped." +
+ $"It was last seen in {message.LastKnownHutch}.");
+ }
+ }
+
+
+If the method returns a message object, that object is published as if it was a reply to the incoming message, maintaining the correlationId and respecting the replyTo header.
+
+
+Publishing messages
+-------------------
+To send a message, get a reference to IPublisher using dependency injection and call the Publish method. For example, to broadcast another message from a message handler:
+
+::
+
+ public class LogMessage
+ {
+ public string Level { get; set; }
+ public string Description { get; set; }
+ }
+
+
+ [MessageController]
+ [DynamicQueue]
+ public class MonitoringController
+ {
+ private readonly IPublisher publisher;
+
+ public MonitoringController(IPublisher publisher)
+ {
+ this.publisher = publisher;
+ }
+
+ public async Task LogEscape(RabbitEscapedMessage message)
+ {
+ await publisher.Publish(new LogMessage
+ {
+ Level = "Beige",
+ Description = $"{message.Name} has escaped." +
+ $"It was last seen in {message.LastKnownHutch}."
+ });
+ }
+ }
\ No newline at end of file
diff --git a/docs/introduction.rst b/docs/introduction.rst
index 049604b..7da1f93 100644
--- a/docs/introduction.rst
+++ b/docs/introduction.rst
@@ -6,6 +6,8 @@ Introduction
Tapeti is a wrapper for the RabbitMQ .NET Client designed for long-running microservices. It's main goal is to minimize the amount of messaging code required, and instead focus on the higher-level flow.
+Tapeti requires at least .NET 4.6.1.
+
Key features
------------
@@ -27,4 +29,4 @@ Durable queues are not created and bound automatically yet. The assumption is ma
The author shamelessly plugs `RabbitMetaQueue `_, which will probably be integrated into Tapeti at one point.
-Furthermore there are no unit tests yet. This will require a bit more decoupling in the lower levels of the Tapeti code.
+Furthermore there are hardly any unit tests yet. This will require a bit more decoupling in the lower levels of the Tapeti code.