1
0
mirror of synced 2024-06-18 10:57:39 +00:00

Implemented #37 Support injection of CancellationToken in message handlers

This commit is contained in:
Mark van Renswoude 2021-10-29 15:47:48 +02:00
parent 2ffae2e7fe
commit 56a842ea5c
6 changed files with 51 additions and 17 deletions

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Threading;
// ReSharper disable UnusedMemberInSuper.Global - public API // ReSharper disable UnusedMemberInSuper.Global - public API
// ReSharper disable UnusedMember.Global // ReSharper disable UnusedMember.Global
@ -50,6 +51,13 @@ namespace Tapeti.Config
/// </remarks> /// </remarks>
IBinding Binding { get; } IBinding Binding { get; }
/// <summary>
/// Contains a CancellationToken which is cancelled when the connection to the RabbitMQ server is closed.
/// Note that this token is cancelled regardless of whether the connection will be reestablished, as any
/// messages still in the queue will be redelivered with a new token.
/// </summary>
CancellationToken ConnectionClosed { get; }
/// <summary> /// <summary>
/// Stores additional properties in the message context which can be passed between middleware stages. /// Stores additional properties in the message context which can be passed between middleware stages.
/// </summary> /// </summary>

View File

@ -74,7 +74,8 @@ namespace Tapeti.Connection
RawBody = body, RawBody = body,
Message = message, Message = message,
Properties = properties, Properties = properties,
Binding = null Binding = null,
ConnectionClosed = CancellationToken.None
}; };
var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException); var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException);
@ -118,7 +119,8 @@ namespace Tapeti.Connection
RawBody = messageContextData.RawBody, RawBody = messageContextData.RawBody,
Message = message, Message = message,
Properties = messageContextData.Properties, Properties = messageContextData.Properties,
Binding = binding Binding = binding,
ConnectionClosed = cancellationToken
}; };
try try

View File

@ -0,0 +1,25 @@
using System;
using System.Linq;
using System.Threading;
using Tapeti.Config;
namespace Tapeti.Default
{
/// <inheritdoc />
/// <summary>
/// Binds a parameter of type CancellationToken to a token which is cancelled when the RabbitMQ connection is closed.
/// Similar to and very much inspired by ASP.NET's RequestAborted CancellationToken.
/// This middleware is included by default in the standard TapetiConfig.
/// </summary>
public class CancellationTokenBinding : IControllerBindingMiddleware
{
/// <inheritdoc />
public void Handle(IControllerBindingContext context, Action next)
{
foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(CancellationToken)))
parameter.SetBinding(messageContext => messageContext.ConnectionClosed);
next();
}
}
}

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
@ -34,7 +35,10 @@ namespace Tapeti.Default
/// <inheritdoc /> /// <inheritdoc />
public IBinding Binding { get; set; } public IBinding Binding { get; set; }
/// <inheritdoc />
public CancellationToken ConnectionClosed { get; set; }
public void Store<T>(T payload) where T : IMessageContextPayload public void Store<T>(T payload) where T : IMessageContextPayload
{ {
payloads.Add(typeof(T), payload); payloads.Add(typeof(T), payload);

View File

@ -35,6 +35,7 @@ namespace Tapeti
Use(new DependencyResolverBinding()); Use(new DependencyResolverBinding());
Use(new PublishResultBinding()); Use(new PublishResultBinding());
Use(new CancellationTokenBinding());
// Registered last so it runs first and the MessageClass is known to other middleware // Registered last so it runs first and the MessageClass is known to other middleware
Use(new MessageBinding()); Use(new MessageBinding());

View File

@ -151,10 +151,8 @@ namespace Tapeti
protected virtual void OnConnected(ConnectedEventArgs e) protected virtual void OnConnected(ConnectedEventArgs e)
{ {
var connectedEvent = Connected; var connectedEvent = Connected;
if (connectedEvent == null) if (connectedEvent != null)
return; Task.Run(() => connectedEvent.Invoke(this, e));
Task.Run(() => connectedEvent.Invoke(this, e));
} }
/// <summary> /// <summary>
@ -162,13 +160,11 @@ namespace Tapeti
/// </summary> /// </summary>
protected virtual void OnReconnected(ConnectedEventArgs e) protected virtual void OnReconnected(ConnectedEventArgs e)
{ {
var reconnectedEvent = Reconnected;
if (reconnectedEvent == null && subscriber == null)
return;
subscriber?.Reconnect(); subscriber?.Reconnect();
Task.Run(() => reconnectedEvent?.Invoke(this, e)); var reconnectedEvent = Reconnected;
if (reconnectedEvent != null)
Task.Run(() => reconnectedEvent?.Invoke(this, e));
} }
/// <summary> /// <summary>
@ -176,13 +172,11 @@ namespace Tapeti
/// </summary> /// </summary>
protected virtual void OnDisconnected(DisconnectedEventArgs e) protected virtual void OnDisconnected(DisconnectedEventArgs e)
{ {
var disconnectedEvent = Disconnected;
if (disconnectedEvent == null)
return;
subscriber?.Disconnect(); subscriber?.Disconnect();
Task.Run(() => disconnectedEvent.Invoke(this, e)); var disconnectedEvent = Disconnected;
if (disconnectedEvent != null)
Task.Run(() => disconnectedEvent.Invoke(this, e));
} }
} }
} }