diff --git a/Tapeti/Connection/IConnectionEventListener.cs b/Tapeti/Connection/IConnectionEventListener.cs new file mode 100644 index 0000000..c64ced7 --- /dev/null +++ b/Tapeti/Connection/IConnectionEventListener.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti.Connection +{ + public interface IConnectionEventListener + { + void Connected(); + void Reconnected(); + void Disconnected(); + } +} diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index 8843a93..5046346 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -18,6 +18,7 @@ namespace Tapeti.Connection private readonly IConfig config; public TapetiConnectionParams ConnectionParams { get; set; } + public IConnectionEventListener ConnectionEventListener { get; set; } private readonly IMessageSerializer messageSerializer; private readonly IRoutingKeyStrategy routingKeyStrategy; @@ -187,7 +188,7 @@ namespace Tapeti.Connection VirtualHost = ConnectionParams.VirtualHost, UserName = ConnectionParams.Username, Password = ConnectionParams.Password, - AutomaticRecoveryEnabled = true, + AutomaticRecoveryEnabled = true, // The created connection is an IRecoverable RequestedHeartbeat = 30 }; @@ -201,6 +202,11 @@ namespace Tapeti.Connection if (ConnectionParams.PrefetchCount > 0) channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false); + ((IRecoverable)connection).Recovery += (sender, e) => ConnectionEventListener?.Reconnected(); + + channelInstance.ModelShutdown += (sender, e) => ConnectionEventListener?.Disconnected(); + + ConnectionEventListener?.Connected(); break; } catch (BrokerUnreachableException) diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 19b32db..493548a 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -56,6 +56,7 @@ + diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index 848dbac..98e8cda 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -21,10 +21,16 @@ namespace Tapeti worker = new Lazy(() => new TapetiWorker(config) { - ConnectionParams = Params ?? new TapetiConnectionParams() + ConnectionParams = Params ?? new TapetiConnectionParams(), + ConnectionEventListener = new ConnectionEventListener(this) }); } + public event EventHandler Connected; + + public event EventHandler Disconnected; + + public event EventHandler Reconnected; public async Task Subscribe(bool startConsuming = true) { @@ -61,5 +67,45 @@ namespace Tapeti { Close().Wait(); } + + private class ConnectionEventListener: IConnectionEventListener + { + private readonly TapetiConnection owner; + + internal ConnectionEventListener(TapetiConnection owner) + { + this.owner = owner; + } + + public void Connected() + { + owner.OnConnected(new EventArgs()); + } + + public void Disconnected() + { + owner.OnDisconnected(new EventArgs()); + } + + public void Reconnected() + { + owner.OnReconnected(new EventArgs()); + } + } + + protected virtual void OnConnected(EventArgs e) + { + Connected?.Invoke(this, e); + } + + protected virtual void OnReconnected(EventArgs e) + { + Reconnected?.Invoke(this, e); + } + + protected virtual void OnDisconnected(EventArgs e) + { + Disconnected?.Invoke(this, e); + } } } diff --git a/Test/Program.cs b/Test/Program.cs index 349ba4e..8f29209 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -35,6 +35,16 @@ namespace Test Params = new TapetiAppSettingsConnectionParams() }) { + connection.Connected += (sender, e) => { + Console.WriteLine("Event Connected"); + }; + connection.Disconnected += (sender, e) => { + Console.WriteLine("Event Disconnected"); + }; + connection.Reconnected += (sender, e) => { + Console.WriteLine("Event Reconnected"); + }; + Console.WriteLine("Subscribing..."); var subscriber = connection.Subscribe(false).Result;