1
0
mirror of synced 2025-01-22 16:13:07 +01:00

Fixed #15: Fix timing issue with flows when response comes in too fast

Added more documentation
This commit is contained in:
Mark van Renswoude 2017-02-16 23:03:07 +01:00
parent 20ac467006
commit 80d8f24123
9 changed files with 128 additions and 34 deletions

View File

@ -3,7 +3,7 @@ using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
internal class DelegateYieldPoint : IStateYieldPoint, IExecutableYieldPoint
internal class DelegateYieldPoint : IExecutableYieldPoint
{
public bool StoreState { get; }

View File

@ -1,5 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Flow.Default
@ -13,6 +13,24 @@ namespace Tapeti.Flow.Default
public Guid ContinuationID { get; set; }
public ContinuationMetadata ContinuationMetadata { get; set; }
private bool stored;
public async Task EnsureStored()
{
if (stored)
return;
if (MessageContext == null) throw new ArgumentNullException(nameof(MessageContext));
if (FlowState == null) throw new ArgumentNullException(nameof(FlowState));
if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock));
FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(MessageContext.Controller);
await FlowStateLock.StoreFlowState(FlowState);
stored = true;
}
public void Dispose()
{
FlowStateLock?.Dispose();

View File

@ -70,6 +70,7 @@ namespace Tapeti.Flow.Default
ReplyTo = responseHandlerInfo.ReplyToQueue
};
await context.EnsureStored();
await publisher.Publish(message, properties);
}
@ -149,10 +150,8 @@ namespace Tapeti.Flow.Default
public async Task Execute(IMessageContext context, IYieldPoint yieldPoint)
{
var stateYieldPoint = yieldPoint as IStateYieldPoint;
var executableYieldPoint = yieldPoint as IExecutableYieldPoint;
var storeState = stateYieldPoint?.StoreState ?? false;
var storeState = executableYieldPoint?.StoreState ?? false;
FlowContext flowContext;
object flowContextItem;
@ -200,14 +199,9 @@ namespace Tapeti.Flow.Default
}
if (storeState)
{
flowContext.FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(context.Controller);
await flowContext.FlowStateLock.StoreFlowState(flowContext.FlowState);
}
await flowContext.EnsureStored();
else
{
await flowContext.FlowStateLock.DeleteFlowState();
}
}

View File

@ -2,14 +2,9 @@
namespace Tapeti.Flow.Default
{
internal interface IStateYieldPoint : IYieldPoint
{
bool StoreState { get; }
}
internal interface IExecutableYieldPoint : IYieldPoint
{
bool StoreState { get; }
Task Execute(FlowContext context);
}
}

View File

@ -1,6 +1,8 @@
namespace Tapeti.Flow.Default
using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
internal class StateYieldPoint : IStateYieldPoint
internal class StateYieldPoint : IExecutableYieldPoint
{
public bool StoreState { get; }
@ -9,5 +11,12 @@
{
StoreState = storeState;
}
public async Task Execute(FlowContext context)
{
if (StoreState)
await context.EnsureStored();
}
}
}

View File

@ -60,7 +60,7 @@
<Compile Include="Default\FlowMessageMiddleware.cs" />
<Compile Include="Default\FlowStarter.cs" />
<Compile Include="Default\FlowState.cs" />
<Compile Include="Default\IInternalYieldPoint.cs" />
<Compile Include="Default\IExecutableYieldPoint.cs" />
<Compile Include="Default\NonPersistentFlowRepository.cs" />
<Compile Include="Default\DelegateYieldPoint.cs" />
<Compile Include="ConfigExtensions.cs" />

View File

@ -1,5 +1,5 @@
Flow
====
Flow extension
==============
.. error:: You've stumbled upon a piece of unfinished documentation.
Behind you is all prior knowledge. In front of you is nothing but emptyness. What do you do?

View File

@ -1,6 +1,8 @@
Getting started
===============
This guide is a step by step introduction. If you want to know more about how Tapeti works, for example how it determines the exchange and routing keys, see :doc:`indepth`.
Install packages
----------------
I'll assume you are familiar with installing NuGet.org packages into your project.
@ -50,23 +52,97 @@ First create an instance of TapetiConfig, tell it which controllers to register
Defining a message
------------------
A message is a plain object which can be serialized using `Json.NET <http://www.newtonsoft.com/json>`_.
A message is simply a plain object which can be serialized using `Json.NET <http://www.newtonsoft.com/json>`_.
::
public class SomethingHappenedMessage
{
public string Description { get; set; }
}
public class RabbitEscapedMessage
{
public string Name { get; set; }
public string LastKnownHutch { get; set; }
}
Creating a message controller
-----------------------------
To handle messages you need what Tapeti refers to as a "message controller". It is similar to an ASP.NET MVC controller if you're familiar with those, but it handles RabbitMQ messages instead of HTTP requests.
.. error:: You've stumbled upon a piece of unfinished documentation.
Behind you is all prior knowledge. In front of you is nothing but emptyness. What do you do?
All you need to do is create a new class and annotate it with the MessageController attribute and a queue attribute. The name and folder of the class is not important to Tapeti, though you might want to agree on a standard in your team.
1. Attempt to explore further
2. Complain to the author and demand your money back
3. Abandon all hope
The queue attribute can be either *DynamicQueue* or *DurableQueue*. The attribute can be set for the entire controller (which is considered the default scenario) or specified / overridden per message handler.
> |
DynamicQueue will create a queue with a name generated by RabbitMQ which is automatically deleted when your service stops. Bindings will be added for the messages handled by the controller. You will typically use dynamic queues for scenarios where handling the message is only relevant while the service is running (for example, updating a service's cache or performing live queries).
DurableQueue requires a queue name as the parameter. For now it is assumed that durable queues are already declared and bound, though Tapeti will include a way to create these kind of queues automatically as well in the near future.
::
[MessageController]
[DynamicQueue]
public class MonitoringController
{
}
Responding to messages
----------------------
Any public method in a message controller is considered a message handler. There are a few requirements which are enforced by Tapeti. Below are the default requirements, although some extension packages (like the :doc:`flow`) add their own or alter these requirements.
- The first parameter must be the message class.
- The return type can be void, Task, Task<message class> or a message class.
The name of the method is not important to Tapeti. Any parameter other than the first will be resolved using the IoC container, although it is considered best practice to use the constructor for dependency injection instead.
A new controller is instantiated for each message, so it is safe to use public or private fields to store state while handling the message. Just don't expect it to be there for the next message. If you need this behaviour, take a look at the :doc:`flow`!
::
[MessageController]
[DynamicQueue]
public class MonitoringController
{
public void LogEscape(RabbitEscapedMessage message)
{
Logger.Warning($"This is a beige alert. {message.Name} has escaped." +
$"It was last seen in {message.LastKnownHutch}.");
}
}
If the method returns a message object, that object is published as if it was a reply to the incoming message, maintaining the correlationId and respecting the replyTo header.
Publishing messages
-------------------
To send a message, get a reference to IPublisher using dependency injection and call the Publish method. For example, to broadcast another message from a message handler:
::
public class LogMessage
{
public string Level { get; set; }
public string Description { get; set; }
}
[MessageController]
[DynamicQueue]
public class MonitoringController
{
private readonly IPublisher publisher;
public MonitoringController(IPublisher publisher)
{
this.publisher = publisher;
}
public async Task LogEscape(RabbitEscapedMessage message)
{
await publisher.Publish(new LogMessage
{
Level = "Beige",
Description = $"{message.Name} has escaped." +
$"It was last seen in {message.LastKnownHutch}."
});
}
}

View File

@ -6,6 +6,8 @@ Introduction
Tapeti is a wrapper for the RabbitMQ .NET Client designed for long-running microservices. It's main goal is to minimize the amount of messaging code required, and instead focus on the higher-level flow.
Tapeti requires at least .NET 4.6.1.
Key features
------------
@ -27,4 +29,4 @@ Durable queues are not created and bound automatically yet. The assumption is ma
The author shamelessly plugs `RabbitMetaQueue <https://github.com/PsychoMark/RabbitMetaQueue>`_, which will probably be integrated into Tapeti at one point.
Furthermore there are no unit tests yet. This will require a bit more decoupling in the lower levels of the Tapeti code.
Furthermore there are hardly any unit tests yet. This will require a bit more decoupling in the lower levels of the Tapeti code.