diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs index c3314c4..7c23127 100644 --- a/Tapeti/Config/IMessageContext.cs +++ b/Tapeti/Config/IMessageContext.cs @@ -2,11 +2,10 @@ namespace Tapeti.Config { - /// /// /// Provides information about the message currently being handled. /// - public interface IMessageContext : IDisposable + public interface IMessageContext : IAsyncDisposable, IDisposable { /// /// Provides access to the Tapeti config. @@ -49,7 +48,7 @@ namespace Tapeti.Config /// middleware stages (mostly for IControllerMiddlewareBase descendants). /// /// A unique key. It is recommended to prefix it with the package name which hosts the middleware to prevent conflicts - /// Will be disposed if the value implements IDisposable + /// Will be disposed if the value implements IDisposable or IAsyncDisposable void Store(string key, object value); /// diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 3c207a7..537a030 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -25,8 +25,17 @@ namespace Tapeti.Connection } + public async ValueTask DisposeAsync() + { + if (consuming) + await Stop(); + } + + public void Dispose() { + if (consuming) + Stop().GetAwaiter().GetResult(); } diff --git a/Tapeti/Default/ControllerMessageContext.cs b/Tapeti/Default/ControllerMessageContext.cs index 92ac61c..3d8fb55 100644 --- a/Tapeti/Default/ControllerMessageContext.cs +++ b/Tapeti/Default/ControllerMessageContext.cs @@ -1,4 +1,5 @@ -using Tapeti.Config; +using System.Threading.Tasks; +using Tapeti.Config; namespace Tapeti.Default { @@ -41,9 +42,19 @@ namespace Tapeti.Default /// public void Dispose() { + // Do not call decoratedContext.Dispose - by design } + /// + public ValueTask DisposeAsync() + { + // Do not call decoratedContext.DisposeAsync - by design + return default; + } + + + /// public void Store(string key, object value) { diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs index 4c9f1a0..9612990 100644 --- a/Tapeti/Default/MessageContext.cs +++ b/Tapeti/Default/MessageContext.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using Tapeti.Config; namespace Tapeti.Default @@ -39,6 +40,18 @@ namespace Tapeti.Default } + /// + public async ValueTask DisposeAsync() + { + foreach (var item in items.Values) + { + if (item is IAsyncDisposable asyncDisposable) + await asyncDisposable.DisposeAsync(); + } + } + + + /// public void Store(string key, object value) { diff --git a/Tapeti/IConnection.cs b/Tapeti/IConnection.cs index 4453362..8aab48a 100644 --- a/Tapeti/IConnection.cs +++ b/Tapeti/IConnection.cs @@ -47,11 +47,10 @@ namespace Tapeti public delegate void DisconnectedEventHandler(object sender, DisconnectedEventArgs e); - /// /// /// Represents a connection to a RabbitMQ server /// - public interface IConnection : IDisposable + public interface IConnection : IAsyncDisposable, IDisposable { /// /// Creates a subscriber to consume messages from the bound queues. diff --git a/Tapeti/ISubscriber.cs b/Tapeti/ISubscriber.cs index 1e7d864..a5e9975 100644 --- a/Tapeti/ISubscriber.cs +++ b/Tapeti/ISubscriber.cs @@ -5,11 +5,10 @@ using System.Threading.Tasks; namespace Tapeti { - /// /// /// Manages subscriptions to queues as configured by the bindings. /// - public interface ISubscriber : IDisposable + public interface ISubscriber : IAsyncDisposable, IDisposable { /// /// Starts consuming from the subscribed queues if not already started. diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 3a6632e..51630ec 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -18,6 +18,7 @@ + diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index ff72c3f..f9cb6e5 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -30,6 +30,8 @@ namespace Tapeti private readonly Lazy client; private TapetiSubscriber subscriber; + private bool disposed; + /// /// Creates a new instance of a TapetiConnection and registers a default IPublisher /// in the IoC container as provided in the config. @@ -97,12 +99,26 @@ namespace Tapeti /// public void Dispose() { - Close().Wait(); - - subscriber?.Dispose(); + if (!disposed) + DisposeAsync().GetAwaiter().GetResult(); } + /// + public async ValueTask DisposeAsync() + { + if (disposed) + return; + + if (subscriber != null) + await subscriber.DisposeAsync(); + + await Close(); + disposed = true; + } + + + private class ConnectionEventListener: IConnectionEventListener { private readonly TapetiConnection owner;