diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs
index 7d2db6d..6fea4cc 100644
--- a/Tapeti/Config/IMessageContext.cs
+++ b/Tapeti/Config/IMessageContext.cs
@@ -1,4 +1,5 @@
using System;
+using System.Threading;
// ReSharper disable UnusedMemberInSuper.Global - public API
// ReSharper disable UnusedMember.Global
@@ -50,6 +51,13 @@ namespace Tapeti.Config
///
IBinding Binding { get; }
+ ///
+ /// Contains a CancellationToken which is cancelled when the connection to the RabbitMQ server is closed.
+ /// Note that this token is cancelled regardless of whether the connection will be reestablished, as any
+ /// messages still in the queue will be redelivered with a new token.
+ ///
+ CancellationToken ConnectionClosed { get; }
+
///
/// Stores additional properties in the message context which can be passed between middleware stages.
///
diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs
index 6d67241..fba63dd 100644
--- a/Tapeti/Connection/TapetiConsumer.cs
+++ b/Tapeti/Connection/TapetiConsumer.cs
@@ -74,7 +74,8 @@ namespace Tapeti.Connection
RawBody = body,
Message = message,
Properties = properties,
- Binding = null
+ Binding = null,
+ ConnectionClosed = CancellationToken.None
};
var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException);
@@ -118,7 +119,8 @@ namespace Tapeti.Connection
RawBody = messageContextData.RawBody,
Message = message,
Properties = messageContextData.Properties,
- Binding = binding
+ Binding = binding,
+ ConnectionClosed = cancellationToken
};
try
diff --git a/Tapeti/Default/CancellationTokenBinding.cs b/Tapeti/Default/CancellationTokenBinding.cs
new file mode 100644
index 0000000..01c8a72
--- /dev/null
+++ b/Tapeti/Default/CancellationTokenBinding.cs
@@ -0,0 +1,25 @@
+using System;
+using System.Linq;
+using System.Threading;
+using Tapeti.Config;
+
+namespace Tapeti.Default
+{
+ ///
+ ///
+ /// Binds a parameter of type CancellationToken to a token which is cancelled when the RabbitMQ connection is closed.
+ /// Similar to and very much inspired by ASP.NET's RequestAborted CancellationToken.
+ /// This middleware is included by default in the standard TapetiConfig.
+ ///
+ public class CancellationTokenBinding : IControllerBindingMiddleware
+ {
+ ///
+ public void Handle(IControllerBindingContext context, Action next)
+ {
+ foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(CancellationToken)))
+ parameter.SetBinding(messageContext => messageContext.ConnectionClosed);
+
+ next();
+ }
+ }
+}
diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs
index 40a153c..3b72e77 100644
--- a/Tapeti/Default/MessageContext.cs
+++ b/Tapeti/Default/MessageContext.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
using Tapeti.Config;
@@ -34,7 +35,10 @@ namespace Tapeti.Default
///
public IBinding Binding { get; set; }
-
+ ///
+ public CancellationToken ConnectionClosed { get; set; }
+
+
public void Store(T payload) where T : IMessageContextPayload
{
payloads.Add(typeof(T), payload);
diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs
index 57abd6e..ec2234c 100644
--- a/Tapeti/TapetiConfig.cs
+++ b/Tapeti/TapetiConfig.cs
@@ -35,6 +35,7 @@ namespace Tapeti
Use(new DependencyResolverBinding());
Use(new PublishResultBinding());
+ Use(new CancellationTokenBinding());
// Registered last so it runs first and the MessageClass is known to other middleware
Use(new MessageBinding());
diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs
index f9cb6e5..f26e3ca 100644
--- a/Tapeti/TapetiConnection.cs
+++ b/Tapeti/TapetiConnection.cs
@@ -151,10 +151,8 @@ namespace Tapeti
protected virtual void OnConnected(ConnectedEventArgs e)
{
var connectedEvent = Connected;
- if (connectedEvent == null)
- return;
-
- Task.Run(() => connectedEvent.Invoke(this, e));
+ if (connectedEvent != null)
+ Task.Run(() => connectedEvent.Invoke(this, e));
}
///
@@ -162,13 +160,11 @@ namespace Tapeti
///
protected virtual void OnReconnected(ConnectedEventArgs e)
{
- var reconnectedEvent = Reconnected;
- if (reconnectedEvent == null && subscriber == null)
- return;
-
subscriber?.Reconnect();
- Task.Run(() => reconnectedEvent?.Invoke(this, e));
+ var reconnectedEvent = Reconnected;
+ if (reconnectedEvent != null)
+ Task.Run(() => reconnectedEvent?.Invoke(this, e));
}
///
@@ -176,13 +172,11 @@ namespace Tapeti
///
protected virtual void OnDisconnected(DisconnectedEventArgs e)
{
- var disconnectedEvent = Disconnected;
- if (disconnectedEvent == null)
- return;
-
subscriber?.Disconnect();
- Task.Run(() => disconnectedEvent.Invoke(this, e));
+ var disconnectedEvent = Disconnected;
+ if (disconnectedEvent != null)
+ Task.Run(() => disconnectedEvent.Invoke(this, e));
}
}
}