From 56a842ea5c69527d942843df51e199758d5095da Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 29 Oct 2021 15:47:48 +0200 Subject: [PATCH] Implemented #37 Support injection of CancellationToken in message handlers --- Tapeti/Config/IMessageContext.cs | 8 +++++++ Tapeti/Connection/TapetiConsumer.cs | 6 ++++-- Tapeti/Default/CancellationTokenBinding.cs | 25 ++++++++++++++++++++++ Tapeti/Default/MessageContext.cs | 6 +++++- Tapeti/TapetiConfig.cs | 1 + Tapeti/TapetiConnection.cs | 22 +++++++------------ 6 files changed, 51 insertions(+), 17 deletions(-) create mode 100644 Tapeti/Default/CancellationTokenBinding.cs diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs index 7d2db6d..6fea4cc 100644 --- a/Tapeti/Config/IMessageContext.cs +++ b/Tapeti/Config/IMessageContext.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; // ReSharper disable UnusedMemberInSuper.Global - public API // ReSharper disable UnusedMember.Global @@ -50,6 +51,13 @@ namespace Tapeti.Config /// IBinding Binding { get; } + /// + /// Contains a CancellationToken which is cancelled when the connection to the RabbitMQ server is closed. + /// Note that this token is cancelled regardless of whether the connection will be reestablished, as any + /// messages still in the queue will be redelivered with a new token. + /// + CancellationToken ConnectionClosed { get; } + /// /// Stores additional properties in the message context which can be passed between middleware stages. /// diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 6d67241..fba63dd 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -74,7 +74,8 @@ namespace Tapeti.Connection RawBody = body, Message = message, Properties = properties, - Binding = null + Binding = null, + ConnectionClosed = CancellationToken.None }; var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException); @@ -118,7 +119,8 @@ namespace Tapeti.Connection RawBody = messageContextData.RawBody, Message = message, Properties = messageContextData.Properties, - Binding = binding + Binding = binding, + ConnectionClosed = cancellationToken }; try diff --git a/Tapeti/Default/CancellationTokenBinding.cs b/Tapeti/Default/CancellationTokenBinding.cs new file mode 100644 index 0000000..01c8a72 --- /dev/null +++ b/Tapeti/Default/CancellationTokenBinding.cs @@ -0,0 +1,25 @@ +using System; +using System.Linq; +using System.Threading; +using Tapeti.Config; + +namespace Tapeti.Default +{ + /// + /// + /// Binds a parameter of type CancellationToken to a token which is cancelled when the RabbitMQ connection is closed. + /// Similar to and very much inspired by ASP.NET's RequestAborted CancellationToken. + /// This middleware is included by default in the standard TapetiConfig. + /// + public class CancellationTokenBinding : IControllerBindingMiddleware + { + /// + public void Handle(IControllerBindingContext context, Action next) + { + foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(CancellationToken))) + parameter.SetBinding(messageContext => messageContext.ConnectionClosed); + + next(); + } + } +} diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs index 40a153c..3b72e77 100644 --- a/Tapeti/Default/MessageContext.cs +++ b/Tapeti/Default/MessageContext.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Tapeti.Config; @@ -34,7 +35,10 @@ namespace Tapeti.Default /// public IBinding Binding { get; set; } - + /// + public CancellationToken ConnectionClosed { get; set; } + + public void Store(T payload) where T : IMessageContextPayload { payloads.Add(typeof(T), payload); diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 57abd6e..ec2234c 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -35,6 +35,7 @@ namespace Tapeti Use(new DependencyResolverBinding()); Use(new PublishResultBinding()); + Use(new CancellationTokenBinding()); // Registered last so it runs first and the MessageClass is known to other middleware Use(new MessageBinding()); diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index f9cb6e5..f26e3ca 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -151,10 +151,8 @@ namespace Tapeti protected virtual void OnConnected(ConnectedEventArgs e) { var connectedEvent = Connected; - if (connectedEvent == null) - return; - - Task.Run(() => connectedEvent.Invoke(this, e)); + if (connectedEvent != null) + Task.Run(() => connectedEvent.Invoke(this, e)); } /// @@ -162,13 +160,11 @@ namespace Tapeti /// protected virtual void OnReconnected(ConnectedEventArgs e) { - var reconnectedEvent = Reconnected; - if (reconnectedEvent == null && subscriber == null) - return; - subscriber?.Reconnect(); - Task.Run(() => reconnectedEvent?.Invoke(this, e)); + var reconnectedEvent = Reconnected; + if (reconnectedEvent != null) + Task.Run(() => reconnectedEvent?.Invoke(this, e)); } /// @@ -176,13 +172,11 @@ namespace Tapeti /// protected virtual void OnDisconnected(DisconnectedEventArgs e) { - var disconnectedEvent = Disconnected; - if (disconnectedEvent == null) - return; - subscriber?.Disconnect(); - Task.Run(() => disconnectedEvent.Invoke(this, e)); + var disconnectedEvent = Disconnected; + if (disconnectedEvent != null) + Task.Run(() => disconnectedEvent.Invoke(this, e)); } } }