1
0
mirror of synced 2025-01-07 16:53:08 +01:00

Merge branch 'hotfix/2.8.3'

This commit is contained in:
Mark van Renswoude 2022-03-10 09:50:09 +01:00
commit 30915fd5fc
24 changed files with 287 additions and 51 deletions

View File

@ -2,7 +2,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<RootNamespace>_01_PublishSubscribe</RootNamespace>
</PropertyGroup>

View File

@ -2,7 +2,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<RootNamespace>_02_DeclareDurableQueues</RootNamespace>
</PropertyGroup>

View File

@ -2,7 +2,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<RootNamespace>_03_FlowRequestResponse</RootNamespace>
</PropertyGroup>

View File

@ -2,7 +2,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<RootNamespace>_04_Transient</RootNamespace>
</PropertyGroup>

View File

@ -2,7 +2,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<RootNamespace>_05_SpeedTest</RootNamespace>
</PropertyGroup>

View File

@ -2,7 +2,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<RootNamespace>_06_StatelessRequestResponse</RootNamespace>
</PropertyGroup>

View File

@ -2,7 +2,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<RootNamespace>_07_ParallelizationTest</RootNamespace>
</PropertyGroup>

View File

@ -2,7 +2,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<RootNamespace>_08_MessageHandlerLogging</RootNamespace>
</PropertyGroup>

View File

@ -37,24 +37,25 @@ namespace Tapeti.Flow.SQL
/// <inheritdoc />
public async Task<List<KeyValuePair<Guid, T>>> GetStates<T>()
public async Task<IEnumerable<FlowRecord<T>>> GetStates<T>()
{
return await SqlRetryHelper.Execute(async () =>
{
using (var connection = await GetConnection())
{
var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection);
var flowQuery = new SqlCommand($"select FlowID, CreationTime, StateJson from {tableName}", connection);
var flowReader = await flowQuery.ExecuteReaderAsync();
var result = new List<KeyValuePair<Guid, T>>();
var result = new List<FlowRecord<T>>();
while (await flowReader.ReadAsync())
{
var flowID = flowReader.GetGuid(0);
var stateJson = flowReader.GetString(1);
var creationTime = flowReader.GetDateTime(1);
var stateJson = flowReader.GetString(2);
var state = JsonConvert.DeserializeObject<T>(stateJson);
result.Add(new KeyValuePair<Guid, T>(flowID, state));
result.Add(new FlowRecord<T>(flowID, creationTime, state));
}
return result;

View File

@ -18,11 +18,13 @@ namespace Tapeti.Flow.Default
private class CachedFlowState
{
public readonly FlowState FlowState;
public readonly DateTime CreationTime;
public readonly bool IsPersistent;
public CachedFlowState(FlowState flowState, bool isPersistent)
public CachedFlowState(FlowState flowState, DateTime creationTime, bool isPersistent)
{
FlowState = flowState;
CreationTime = creationTime;
IsPersistent = isPersistent;
}
}
@ -64,12 +66,12 @@ namespace Tapeti.Flow.Default
{
foreach (var flowStateRecord in await repository.GetStates<FlowState>())
{
flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true));
flowStates.TryAdd(flowStateRecord.FlowID, new CachedFlowState(flowStateRecord.FlowState, flowStateRecord.CreationTime, true));
foreach (var continuation in flowStateRecord.Value.Continuations)
foreach (var continuation in flowStateRecord.FlowState.Continuations)
{
ValidateContinuation(flowStateRecord.Key, continuation.Key, continuation.Value);
continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key);
ValidateContinuation(flowStateRecord.FlowID, continuation.Key, continuation.Value);
continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.FlowID);
}
}
}
@ -134,6 +136,18 @@ namespace Tapeti.Flow.Default
}
/// <inheritdoc />
public Task<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge)
{
var maximumDateTime = DateTime.UtcNow - minimumAge;
return Task.FromResult(flowStates
.Where(p => p.Value.CreationTime <= maximumDateTime)
.Select(p => new ActiveFlow(p.Key, p.Value.CreationTime))
.ToArray() as IEnumerable<ActiveFlow>);
}
private class FlowStateLock : IFlowStateLock
{
private readonly FlowStore owner;
@ -190,7 +204,7 @@ namespace Tapeti.Flow.Default
var isNew = cachedFlowState == null;
var wasPersistent = cachedFlowState?.IsPersistent ?? false;
cachedFlowState = new CachedFlowState(newFlowState, persistent);
cachedFlowState = new CachedFlowState(newFlowState, isNew ? DateTime.UtcNow : cachedFlowState.CreationTime, persistent);
owner.flowStates[FlowID] = cachedFlowState;
if (persistent)
@ -198,8 +212,7 @@ namespace Tapeti.Flow.Default
// Storing the flowstate in the underlying repository
if (isNew)
{
var now = DateTime.UtcNow;
await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, now);
await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, cachedFlowState.CreationTime);
}
else
{

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Tapeti.Flow.Default
@ -10,9 +11,9 @@ namespace Tapeti.Flow.Default
/// </summary>
public class NonPersistentFlowRepository : IFlowRepository
{
Task<List<KeyValuePair<Guid, T>>> IFlowRepository.GetStates<T>()
Task<IEnumerable<FlowRecord<T>>> IFlowRepository.GetStates<T>()
{
return Task.FromResult(new List<KeyValuePair<Guid, T>>());
return Task.FromResult(Enumerable.Empty<FlowRecord<T>>());
}
/// <inheritdoc />

View File

@ -13,30 +13,64 @@ namespace Tapeti.Flow
/// Load the previously persisted flow states.
/// </summary>
/// <returns>A list of flow states, where the key is the unique Flow ID and the value is the deserialized T.</returns>
Task<List<KeyValuePair<Guid, T>>> GetStates<T>();
Task<IEnumerable<FlowRecord<T>>> GetStates<T>();
/// <summary>
/// Stores a new flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary>
/// <param name="flowID"></param>
/// <param name="state"></param>
/// <param name="timestamp"></param>
/// <param name="flowID">The unique ID of the flow.</param>
/// <param name="state">The flow state to be stored.</param>
/// <param name="timestamp">The time when the flow was initially created.</param>
/// <returns></returns>
Task CreateState<T>(Guid flowID, T state, DateTime timestamp);
/// <summary>
/// Updates an existing flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary>
/// <param name="flowID"></param>
/// <param name="state"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
/// <param name="flowID">The unique ID of the flow.</param>
/// <param name="state">The flow state to be stored.</param>
Task UpdateState<T>(Guid flowID, T state);
/// <summary>
/// Delete a flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary>
/// <param name="flowID"></param>
/// <param name="flowID">The unique ID of the flow.</param>
Task DeleteState(Guid flowID);
}
/// <summary>
/// Contains information about a persisted flow state.
/// </summary>
public class FlowRecord<T>
{
/// <summary>
/// The unique ID of the flow.
/// </summary>
public Guid FlowID { get; }
/// <summary>
/// The time when the flow was initially created.
/// </summary>
public DateTime CreationTime { get; }
/// <summary>
/// The stored flow state.
/// </summary>
public T FlowState { get; }
/// <summary>
/// Creates a new instance of a FlowRecord.
/// </summary>
/// <param name="flowID"></param>
/// <param name="creationTime"></param>
/// <param name="flowState"></param>
public FlowRecord(Guid flowID, DateTime creationTime, T flowState)
{
FlowID = flowID;
CreationTime = creationTime;
FlowState = flowState;
}
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Tapeti.Flow.Default;
@ -29,6 +30,15 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="flowID"></param>
Task<IFlowStateLock> LockFlowState(Guid flowID);
/// <summary>
/// Returns information about the currently active flows.
/// </summary>
/// <remarks>
/// This is intended for monitoring purposes and should be treated as a snapshot.
/// </remarks>
/// <param name="minimumAge">The minimum age of the flow before it is included in the result. Set to TimeSpan.Zero to return all active flows.</param>
Task<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge);
}
@ -60,4 +70,33 @@ namespace Tapeti.Flow
/// </summary>
Task DeleteFlowState();
}
/// <summary>
/// Contains information about an active flow, as returned by <see cref="IFlowStore.GetActiveFlows"/>.
/// </summary>
public class ActiveFlow
{
/// <summary>
/// The ID of the active flow.
/// </summary>
public Guid FlowID { get; }
/// <summary>
/// The time when the flow was initially created.
/// </summary>
public DateTime CreationTime { get; }
/// <summary>
/// Create a new instance of an ActiveFlow.
/// </summary>
/// <param name="flowID">The ID of the active flow.</param>
/// <param name="creationTime">The time when the flow was initially created.</param>
public ActiveFlow(Guid flowID, DateTime creationTime)
{
FlowID = flowID;
CreationTime = creationTime;
}
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Threading;
// ReSharper disable UnusedMemberInSuper.Global - public API
// ReSharper disable UnusedMember.Global
@ -50,6 +51,13 @@ namespace Tapeti.Config
/// </remarks>
IBinding Binding { get; }
/// <summary>
/// Contains a CancellationToken which is cancelled when the connection to the RabbitMQ server is closed.
/// Note that this token is cancelled regardless of whether the connection will be reestablished, as any
/// messages still in the queue will be redelivered with a new token.
/// </summary>
CancellationToken ConnectionClosed { get; }
/// <summary>
/// Stores additional properties in the message context which can be passed between middleware stages.
/// </summary>

View File

@ -74,7 +74,8 @@ namespace Tapeti.Connection
RawBody = body,
Message = message,
Properties = properties,
Binding = null
Binding = null,
ConnectionClosed = CancellationToken.None
};
var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException);
@ -118,7 +119,8 @@ namespace Tapeti.Connection
RawBody = messageContextData.RawBody,
Message = message,
Properties = messageContextData.Properties,
Binding = binding
Binding = binding,
ConnectionClosed = cancellationToken
};
try

View File

@ -0,0 +1,25 @@
using System;
using System.Linq;
using System.Threading;
using Tapeti.Config;
namespace Tapeti.Default
{
/// <inheritdoc />
/// <summary>
/// Binds a parameter of type CancellationToken to a token which is cancelled when the RabbitMQ connection is closed.
/// Similar to and very much inspired by ASP.NET's RequestAborted CancellationToken.
/// This middleware is included by default in the standard TapetiConfig.
/// </summary>
public class CancellationTokenBinding : IControllerBindingMiddleware
{
/// <inheritdoc />
public void Handle(IControllerBindingContext context, Action next)
{
foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(CancellationToken)))
parameter.SetBinding(messageContext => messageContext.ConnectionClosed);
next();
}
}
}

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Tapeti.Config;
@ -34,7 +35,10 @@ namespace Tapeti.Default
/// <inheritdoc />
public IBinding Binding { get; set; }
/// <inheritdoc />
public CancellationToken ConnectionClosed { get; set; }
public void Store<T>(T payload) where T : IMessageContextPayload
{
payloads.Add(typeof(T), payload);

View File

@ -35,6 +35,7 @@ namespace Tapeti
Use(new DependencyResolverBinding());
Use(new PublishResultBinding());
Use(new CancellationTokenBinding());
// Registered last so it runs first and the MessageClass is known to other middleware
Use(new MessageBinding());

View File

@ -151,10 +151,8 @@ namespace Tapeti
protected virtual void OnConnected(ConnectedEventArgs e)
{
var connectedEvent = Connected;
if (connectedEvent == null)
return;
Task.Run(() => connectedEvent.Invoke(this, e));
if (connectedEvent != null)
Task.Run(() => connectedEvent.Invoke(this, e));
}
/// <summary>
@ -162,13 +160,11 @@ namespace Tapeti
/// </summary>
protected virtual void OnReconnected(ConnectedEventArgs e)
{
var reconnectedEvent = Reconnected;
if (reconnectedEvent == null && subscriber == null)
return;
subscriber?.Reconnect();
Task.Run(() => reconnectedEvent?.Invoke(this, e));
var reconnectedEvent = Reconnected;
if (reconnectedEvent != null)
Task.Run(() => reconnectedEvent?.Invoke(this, e));
}
/// <summary>
@ -176,13 +172,11 @@ namespace Tapeti
/// </summary>
protected virtual void OnDisconnected(DisconnectedEventArgs e)
{
var disconnectedEvent = Disconnected;
if (disconnectedEvent == null)
return;
subscriber?.Disconnect();
Task.Run(() => disconnectedEvent.Invoke(this, e));
var disconnectedEvent = Disconnected;
if (disconnectedEvent != null)
Task.Run(() => disconnectedEvent.Invoke(this, e));
}
}
}

66
docs/compatibility.rst Normal file
View File

@ -0,0 +1,66 @@
Compatibility
=============
ASP.NET Core
------------
When integrating Tapeti into an ASP.NET Core service, depending on your naming conventions you may run into an issue where ASP.NET tries to register all your messaging controllers as API controllers. This is because by default any class ending in "Controller" will be picked up by ASP.NET.
You can rename your Tapeti controller classes as long as there are no persisted Tapeti Flows for that controller.
Alternatively, you can filter the classes that ASP.NET will register by using a custom ControllerFeatureProvider.
Whitelist ASP.NET controllers
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If all your ASP.NET controllers use the [ApiController] attribute, you can simply whitelist classes which are annotated with that attribute, ensuring the class name is not relevant to ASP.NET:
::
public class APIControllersOnlyFeatureProvider : ControllerFeatureProvider
{
protected override bool IsController(TypeInfo typeInfo)
{
return typeInfo.IsClass && typeInfo.IsPublic && !typeInfo.IsAbstract &&
typeInfo.GetCustomAttribute<ApiControllerAttribute>() != null;
}
}
Blacklist Tapeti controllers
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If instead you want the default ASP.NET behaviour but only exclude Tapeti messaging controllers, you can decorate the default feature provider as follows:
::
public class ExcludeMessagingControllerFeatureProvider : ControllerFeatureProvider
{
protected override bool IsController(TypeInfo typeInfo)
{
return base.IsController(typeInfo) &&
typeInfo.GetCustomAttribute<MessageControllerAttribute>() == null;
}
}
Replace feature provider
^^^^^^^^^^^^^^^^^^^^^^^^
In either case, replace the default ControllerFeatureProvider in the ConfigureServices method of your startup class:
::
public void ConfigureServices(IServiceCollection services)
{
services
.AddControllers()
.ConfigureApplicationPartManager(manager =>
{
var controllerFeatureProvider = manager.FeatureProviders
.FirstOrDefault(provider => provider is ControllerFeatureProvider);
if (controllerFeatureProvider != null)
manager.FeatureProviders.Remove(controllerFeatureProvider);
manager.FeatureProviders.Add(new APIControllersOnlyFeatureProvider());
// or: manager.FeatureProviders.Add(new ExcludeMessagingControllerFeatureProvider());
});
}

View File

@ -289,3 +289,6 @@ Then install the Tapeti.Flow.SQL NuGet package and register the SqlConnectionFlo
.WithFlow()
.RegisterAllControllers()
.Build();
.. caution:: The controller and method names for response handlers and converge methods are stored in the flow and must be valid when they are loaded again. Keep that in mind if you want to refactor the code; either keep the original class and method temporarily for backwards compatibility, optionally redirecting them internally to the new code, or make sure there are no persisted flows remaining.

View File

@ -74,7 +74,7 @@ A message is simply a plain object which can be serialized using `Json.NET <http
Creating a message controller
-----------------------------
To handle messages you need what Tapeti refers to as a "message controller". It is similar to an ASP.NET MVC controller if you're familiar with those, but it handles RabbitMQ messages instead of HTTP requests.
To handle messages you need what Tapeti refers to as a "message controller". It is similar to an ASP.NET controller if you're familiar with those, but it handles RabbitMQ messages instead of HTTP requests.
All you need to do is create a new class and annotate it with the MessageController attribute and a queue attribute. The name and folder of the class is not important to Tapeti, though you might want to agree on a standard in your team.
@ -102,7 +102,10 @@ Any public method in a message controller is considered a message handler. There
- The first parameter must be the message class.
- The return type can be void, Task, Task<message class> or a message class.
The name of the method is not important to Tapeti. Any parameter other than the first will be resolved using the IoC container, although it is considered best practice to use the constructor for dependency injection instead.
The name of the method is not important to Tapeti. Any parameter other than the first will be resolved in two ways:
1. Registered middleware can alter the behaviour of parameters. Tapeti includes one by default for CancellationToken parameters, see :ref:`parameterbinding` in :doc:`indepth`.
2. Any remaining parameters are resolved using the IoC container, although it is considered best practice to use the constructor for dependency injection instead.
A new controller is instantiated for each message, so it is safe to use public or private fields to store state while handling the message. Just don't expect it to be there for the next message. If you need this behaviour, take a look at the :doc:`flow`!

View File

@ -9,6 +9,46 @@ As described in the Getting started guide, a message is a plain object which can
When communicating between services it is considered best practice to define messages in separate class library assemblies which can be referenced in other services. This establishes a public interface between services and components without binding to the implementation.
.. _parameterbinding:
Parameter binding
-----------------
Tapeti will bind the parameters of message handler methods using the registered binding middleware.
Although stated in the Getting started guide that the first parameter is always assumed to be the message class, this is in fact handled by one of the default binding middleware implementations instead of being hardcoded in Tapeti. All of the default implementations play nice and will only apply to parameters not already bound by other middleware, making it easy to extend or change the default behaviour if desired.
In addition to the message class parameter, two additional default implementations are included:
CancellationToken
^^^^^^^^^^^^^^^^^
Similar to ASP.NET, Tapeti will bind parameters of type CancellationToken to a token which is cancelled when the connection to the RabbitMQ server is closed.
.. note:: This does not indicate whether the connection was closed by the application or lost unexpectedly, either scenario will cancel the token. This is by design, as any message in-flight will be put back on the queue and redelivered anyways.
Internally this CancellationToken is called ConnectionClosed, but any name can be used. For example:
::
public async Task<CountResponseMessage> CountRabbits(CountRequestMessage message,
CancellationToken cancellationToken)
{
var count = await rabbitRepository.Count(cancellationToken);
return new CountRabbitsResponseMessage
{
Count = count
};
}
Dependency injection
^^^^^^^^^^^^^^^^^^^^
Any parameter not bound by any other means will be resolved using the IoC container which is passed to the TapetiConnection.
.. note:: It is considered best practice to use the constructor for dependency injection instead.
Enums
-----
Special care must be taken when using enums in messages. For example, you have several services consuming a message containing an enum field. Some services will have logic which depends on a specific value, others will not use that specific field at all.
@ -82,6 +122,7 @@ If all message handlers bound to a durable queue are marked as obsolete, includi
If there are still messages in the queue it's pending removal will be logged but the consumers will run as normal to empty the queue. The queue will then remain until it is checked again when the application is restarted.
Request - response
------------------
Messages can be annotated with the Request attribute to indicate that they require a response. For example:

View File

@ -7,6 +7,7 @@ Tapeti documentation
introduction
gettingstarted
compatibility
indepth
dataannotations
flow