1
0
mirror of synced 2025-01-22 16:13:07 +01:00

[ci skip] Started writing the missing documentation

This commit is contained in:
Mark van Renswoude 2019-08-15 14:55:15 +02:00
parent 0aa44cfefd
commit dcd0436f30
8 changed files with 321 additions and 41 deletions

View File

@ -7,12 +7,12 @@ using Newtonsoft.Json;
namespace Tapeti.Flow.SQL
{
/// <inheritdoc />
/// <summary>
/// IFlowRepository implementation for SQL server.
/// </summary>
/// <remarks>
/// Assumes the following table layout (table name configurable and may include schema):
///
/// create table Flow
/// (
/// FlowID uniqueidentifier not null,

11
docs/dataannotations.rst Normal file
View File

@ -0,0 +1,11 @@
Validating messages
===================
.. error:: You've stumbled upon a piece of unfinished documentation.
Behind you is all prior knowledge. In front of you is nothing but emptyness. What do you do?
1. Attempt to explore further
2. Complain to the author and demand your money back
3. Abandon all hope
> |

View File

@ -8,38 +8,192 @@ This process is fully asynchronous, the service initiating the flow can be resta
Request - response pattern
--------------------------
Tapeti implements the request - response pattern by allowing a message handler method to simply return the response message. Tapeti Flow expands on this idea by enforcing you to explicitly declare that pattern. This is intended to prevent bugs where a flow will forever wait on a response that never comes.
Tapeti implements the request - response pattern by allowing a message handler method to simply return the response message. Tapeti Flow extends on this concept by allowing the sender of the request to maintain it's state for when the response arrives.
Due to this requirement your messages need to follow a pattern where broadcasts (events) are separated from true request - response messages (where the reply is sent directly to the originator).
See :doc:`indepth` on defining request - response messages.
This may result in a message having two versions; one where a reply is expected and one where it's not. This is not considered a design flaw but a clear contract between services. An example is given in `Request - response versus transfer of responsibility`_.
Declaring request - response messages
-------------------------------------
A message must be annotated with the Request attribute, where the Response property declares the expected response message class.
Enabling Tapeti Flow
--------------------
To enable the use of Tapeti Flow, install the Tapeti.Flow NuGet package and call WithFlow() when setting up your TapetiConfig:
::
[Request(Response = RabbitCountResponseMessage)]
public class RabbitCountRequestMessage
{
public int? MinimumAge { get; set; }
}
public class RabbitCountResponseMessage
{
public int Count { get; set; }
}
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
.WithFlow()
.RegisterAllControllers()
.Build();
Starting a flow
---------------
To start a new flow you need to obtain an IFlowStarter from your IoC container. It has one method in various overloads: Start.
Flow requires all methods participating in the flow, including the starting method, to be in the same controller. This allows the state to be stored and restored when the flow continues. The IFlowStarter.Start call does not need to be in the controller class.
The controller type is passed as a generic parameter. The first parameter to the Start method is a method selector. This defines which method in the controller is called as soon as the flow is initialised.
::
await flowStart.Start<QueryBunniesController>(c => c.StartFlow);
The start method can have any name, but must be annotated with the [Start] attribute. This ensures it is not recognized as a message handler. The start method and any further continuation methods must return either Task<IYieldPoint> (for asynchronous methods) or simply IYieldPoint (for synchrnous methods).
::
[MessageController]
[DynamicQueue]
public class QueryBunniesController
{
public DateTime RequestStart { get; set; }
[Start]
IYieldPoint StartFlow()
{
RequestStart = DateTime.UtcNow();
}
}
Often you'll want to pass some initial information to the flow. The Start method allows one parameter. If you need more information, bundle it in a class or struct.
::
await flowStart.Start<QueryBunniesController>(c => c.StartFlow, "pink");
[MessageController]
[DynamicQueue]
public class QueryBunniesController
{
public DateTime RequestStart { get; set; }
[Start]
IYieldPoint StartFlow(string colorFilter)
{
RequestStart = DateTime.UtcNow();
}
}
.. note:: Every time a flow is started or continued a new instance of the controller is created. All public fields in the controller are considered part of the state and will be restored when a response arrives, private and protected fields are not. Public fields must be serializable to JSON (using JSON.NET) to retain their value when a flow continues. Try to minimize the amount of state as it is cached in memory until the flow ends.
Continuing a flow
-----------------
When starting a flow you're most likely want to start with a request message. Similarly, when continuing a flow you have the option to follow it up with another request and prolong the flow. This behaviour is controlled by the IYieldPoint that must be returned from the start and continuation handlers. To get an IYieldPoint you need to inject the IFlowProvider into your controller.
IFlowProvider has a method YieldWithRequest which sends the provided request message and restores the controller when the response arrives, calling the response handler method you pass along to it.
The response handler must be marked with the [Continuation] attribute. This ensures it is never called for broadcast messages, only when the response for our specific request arrives. It must also return an IYieldPoint or Task<IYieldPoint> itself.
If the response handler is not asynchronous, use YieldWithRequestSync instead, as used in the example below:
::
[MessageController]
[DynamicQueue]
public class QueryBunniesController
{
private IFlowProvider flowProvider;
public DateTime RequestStart { get; set; }
public QueryBunniesController(IFlowProvider flowProvider)
{
this.flowProvider = flowProvider;
}
[Start]
IYieldPoint StartFlow(string colorFilter)
{
RequestStart = DateTime.UtcNow();
var request = new BunnyCountRequestMessage
{
ColorFilter = colorFilter
};
return flowProvider.YieldWithRequestSync<BunnyCountRequestMessage, BunnyCountResponseMessage>
(request, HandleBunnyCountResponse);
}
[Continuation]
public IYieldPoint HandleBunnyCountResponse(BunnyCountResponseMessage message)
{
// Handle the response. The original RequestStart is available here as well.
}
}
You can once again return a YieldWithRequest, or end it.
Ending a flow
-------------
To end the flow and dispose of any stored state, return an end yieldpoint:
Request - response versus transfer of responsibility
----------------------------------------------------
::
[Continuation]
public IYieldPoint HandleBunnyCountResponse(BunnyCountResponseMessage message)
{
// Handle the response.
return flowProvider.End();
}
Flows started by a (request) message
------------------------------------
Instead of manually starting a flow, you can also start one in response to an incoming message. You do not need access to the IFlowStarter in that case, simply return an IYieldPoint from a regular message handler:
::
[MessageController]
[DurableQueue("hutch")]
public class HutchController
{
private IBunnyRepository repository;
private IFlowProvider flowProvider;
public string ColorFilter { get; set; }
public HutchController(IBunnyRepository repository, IFlowProvider flowProvider)
{
this.repository = repository;
this.flowProvider = flowProvider;
}
public IYieldPoint HandleCountRequest(BunnyCountRequestMessage message)
{
ColorFilter = message.ColorFilter;
return flowProvider.YieldWithRequestSync<CheckAccessRequestMessage, CheckAccessResponseMessage>
(
new CheckAccessRequestMessage
{
Username = "hutch"
},
HandleCheckAccessResponseMessage
);
}
[Continuation]
public IYieldPoint HandleCheckAccessResponseMessage(CheckAccessResponseMessage message)
{
// We must provide a response to our original BunnyCountRequestMessage
return flowProvider.EndWithResponse(new BunnyCountResponseMessage
{
Count = message.HasAccess ? await repository.Count(ColorFilter) : 0
});
}
.. ::note If the message that started the flow was a request message, you must end the flow with EndWithResponse or you will get an exception. Likewise, if the message was not a request message, you must end the flow with End.
Parallel requests
-----------------
.. error:: You've stumbled upon a piece of unfinished documentation.
Behind you is all prior knowledge. In front of you is nothing but emptyness. What do you do?
@ -49,3 +203,38 @@ Request - response versus transfer of responsibility
3. Abandon all hope
> |
Persistent state
----------------
By default flow state is only preserved while the service is running. To persist the flow state across restarts and reboots, provide an implementation of IFlowRepository to WithFlow().
::
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
.WithFlow(new MyFlowRepository())
.RegisterAllControllers()
.Build();
Tapeti.Flow includes an implementation for SQL server you can use as well. First, make sure your database contains a table to store flow state:
::
create table Flow
(
FlowID uniqueidentifier not null,
CreationTime datetime2(3) not null,
StateJson nvarchar(max) null,
constraint PK_Flow primary key clustered(FlowID)
);
Then install the Tapeti.Flow.SQL NuGet package and register the SqlConnectionFlowRepository by passing it to WithFlow, or by using the WithFlowSqlRepository extension method before calling WithFlow:
::
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true")
.WithFlow()
.RegisterAllControllers()
.Build();

View File

@ -46,7 +46,7 @@ First create an instance of TapetiConfig, tell it which controllers to register
}
}
.. note:: RegisterAllControllers without parameters searches the entry assembly. Pass an Assembly parameter to register other or additional controllers.
.. note:: RegisterAllControllers without parameters searches the entry assembly. Pass an Assembly parameter to register other or additional controllers. You can call RegisterAllControllers multiple times with different assemblies.
.. caution:: Tapeti attempts to register it's default implementations in the IoC container during configuration, as well as when starting the connection (to register IPublisher). If your container is immutable after the initial configuration, like SimpleInjector is, make sure that you run the Tapeti configuration before requesting any instances from the container.
@ -73,19 +73,21 @@ The queue attribute can be either *DynamicQueue* or *DurableQueue*. The attribut
DynamicQueue will create a queue with a name generated by RabbitMQ which is automatically deleted when your service stops. Bindings will be added for the messages handled by the controller. You will typically use dynamic queues for scenarios where handling the message is only relevant while the service is running (for example, updating a service's cache or performing live queries).
DurableQueue requires a queue name as the parameter. For now it is assumed that durable queues are already declared and bound, though Tapeti will include a way to create these kind of queues automatically as well in the near future.
DurableQueue requires a queue name as the parameter. By default, the queue is assumed to be present already and Tapeti will throw an exception if it does not. If you want Tapeti to create and update the durable queues as well, see :doc:`indepth` for use of EnableDeclareDurableQueues.
::
[MessageController]
[DynamicQueue]
[DynamicQueue("monitoring")]
public class MonitoringController
{
}
.. note:: Notice the parameter to DynamicQueue. This defines the prefix. If specified, the queue name will begin with the supplied value, followed by a unique identifier, so it can be more easily recognized in the RabbitMQ management interface.
Responding to messages
----------------------
Handling incoming messages
--------------------------
Any public method in a message controller is considered a message handler. There are a few requirements which are enforced by Tapeti. Below are the default requirements, although some extension packages (like the :doc:`flow`) add their own or alter these requirements.
- The first parameter must be the message class.
@ -108,8 +110,9 @@ A new controller is instantiated for each message, so it is safe to use public o
}
}
.. note:: If you're doing anything asynchronous in the message handler, make it async as well! Simply change the return type to "Task" or "async Task".
If the method returns a message object, that object is published as if it was a reply to the incoming message, maintaining the correlationId and respecting the replyTo header.
If the method returns a message object, that object is published as if it was a reply to the incoming message, maintaining the correlationId and respecting the replyTo header. See :doc:`indepth` for request-response requirements.
Publishing messages

View File

@ -9,6 +9,79 @@ As described in the Getting started guide, a message is a plain object which can
When communicating between services it is considered best practice to define messages in separate class library assemblies which can be referenced in other services. This establishes a public interface between services and components without binding to the implementation.
Request - response
------------------
Messages can be annotated with the Request attribute to indicate that they require a response. For example:
::
[Request(Response = typeof(BunnyCountResponseMessage))]
public class BunnyCountRequestMessage
{
public string ColorFilter { get; set; }
}
public class BunnyCountResponseMessage
{
public int Count { get; set; }
}
Message handlers processing the BunnyCountRequestMessage *must* respond with a BunnyCountResponseMessage, either directly or at the end of a Flow when using the :doc:`flow`.
::
[MessageController]
[DurableQueue("hutch")]
public class HutchController
{
private IBunnyRepository repository;
public HutchController(IBunnyRepository repository)
{
this.repository = repository;
}
public async Task<BunnyCountResponseMessage> HandleCountRequest(BunnyCountRequestMessage message)
{
return new BunnyCountResponseMessage
{
Count = await repository.Count(message.ColorFilter)
};
}
}
Tapeti will throw an exception if a request message is published but there is no route for it. Tapeti will also throw an exception if you do not return the correct response class. This ensures consistent flow across services.
If you simply want to broadcast an event in response to a message, do not use the return value but instead call IPublisher.Publish in the message handler.
In practise your service may end up with the same message having two versions; one where a reply is expected and one where it's not. This is not considered a design flaw but a clear contract between services. It is common and recommended for the request message to inherit from the base non-request version, and implement two message handlers that internally perform the same logic.
While designing Tapeti this difference has been defined as `Transfer of responsibility`_ which is explained below.
Transfer of responsibility
--------------------------
When working with microservices there will be dependencies between services.
Sometimes the dependency should be on the consumer side, which is the classic publish-subscribe pattern. For example, a reporting service will often listen in on status updates from various other services to compose a combined report. The services producing the events simply broadcast the message without concerning who if anyone is listening.
Sometimes you need another service to handle or query data outside of your responsibility, and the Request - Response mechanism can be used. Tapeti ensures these messages are routed as described above.
The third pattern is what we refer to as "Transfer of responsibility". You need another service to continue your work, but a response is not required. For example, you have a REST API which receives and validates a request, then sends it to a queue to be handled by a background service.
Messages like these must not be lost, there should always be a queue bound to it to handle the message. Tapeti supports the [Mandatory] attribute for these cases and will throw an exception if there is no queue bound to receive the message:
::
[Mandatory]
public class SomeoneHandleMeMessage
{
}
Routing keys
------------
The routing key is determined by converting CamelCase to dot-separated lowercase, leaving out "Message" at the end if it is present. In the example below, the routing key will be "something.happened":
@ -23,7 +96,6 @@ The routing key is determined by converting CamelCase to dot-separated lowercase
This behaviour is implemented using the IRoutingKeyStrategy interface. For more information about changing this, see `Overriding default behaviour`_
Exchanges
---------
The exchange on which the message is published and consumers are expected to bind to is determined by the first part of the namespace, skipping "Messaging" if it is present. In the example below, the exchange will be "Example":

View File

@ -8,4 +8,6 @@ Tapeti documentation
introduction
gettingstarted
indepth
flow
dataannotations
flow
transient

View File

@ -6,27 +6,19 @@ Introduction
Tapeti is a wrapper for the RabbitMQ .NET Client designed for long-running microservices. It's main goal is to minimize the amount of messaging code required, and instead focus on the higher-level flow.
Tapeti requires at least .NET 4.6.1.
Tapeti is built using .NET Standard 2.0 and mostly tested using .NET 4.7.
Key features
------------
* Consumers are declared using MVC-style controllers and are registered automatically based on annotations
* Publishing requires only the message class, no transport details such as exchange and routing key
* :doc:`flow` (stateful request - response handling with support for parallel requests)
* No inheritance required
* Graceful recovery in case of connection issues, and in contrast to most libraries not designed for services, during startup as well
* Extensible using middleware, see for example the Tapeti Flow package
* Extensible using middleware
What it is not
--------------
Tapeti is not a general purpose RabbitMQ client. Although some behaviour can be overridden by implementing various interfaces, it enforces it's style of messaging and assumes everyone on the bus speaks the same language.
What is missing
---------------
Durable queues are not created and bound automatically yet. The assumption is made that these queues are initialized during a deploy to ensure messages are persisted even when the consuming service isn't running yet.
The author shamelessly plugs `RabbitMetaQueue <https://github.com/PsychoMark/RabbitMetaQueue>`_, which will probably be integrated into Tapeti at one point.
Furthermore there are hardly any unit tests yet. This will require a bit more decoupling in the lower levels of the Tapeti code.
Tapeti is not a general purpose RabbitMQ client. Although some behaviour can be overridden by implementing various interfaces, it enforces it's style of messaging and assumes everyone on the bus speaks the same language.

11
docs/transient.rst Normal file
View File

@ -0,0 +1,11 @@
Transient requests
==================
.. error:: You've stumbled upon a piece of unfinished documentation.
Behind you is all prior knowledge. In front of you is nothing but emptyness. What do you do?
1. Attempt to explore further
2. Complain to the author and demand your money back
3. Abandon all hope
> |