1
0
mirror of synced 2024-11-22 09:13:51 +00:00

Connect Disconnect en Reconnect events toegevoegd aan de TapetiConnection

This commit is contained in:
Menno van Lavieren 2017-07-14 12:33:09 +02:00
parent d386a3101e
commit 50bcd26d40
5 changed files with 80 additions and 2 deletions

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Connection
{
public interface IConnectionEventListener
{
void Connected();
void Reconnected();
void Disconnected();
}
}

View File

@ -18,6 +18,7 @@ namespace Tapeti.Connection
private readonly IConfig config; private readonly IConfig config;
public TapetiConnectionParams ConnectionParams { get; set; } public TapetiConnectionParams ConnectionParams { get; set; }
public IConnectionEventListener ConnectionEventListener { get; set; }
private readonly IMessageSerializer messageSerializer; private readonly IMessageSerializer messageSerializer;
private readonly IRoutingKeyStrategy routingKeyStrategy; private readonly IRoutingKeyStrategy routingKeyStrategy;
@ -187,7 +188,7 @@ namespace Tapeti.Connection
VirtualHost = ConnectionParams.VirtualHost, VirtualHost = ConnectionParams.VirtualHost,
UserName = ConnectionParams.Username, UserName = ConnectionParams.Username,
Password = ConnectionParams.Password, Password = ConnectionParams.Password,
AutomaticRecoveryEnabled = true, AutomaticRecoveryEnabled = true, // The created connection is an IRecoverable
RequestedHeartbeat = 30 RequestedHeartbeat = 30
}; };
@ -201,6 +202,11 @@ namespace Tapeti.Connection
if (ConnectionParams.PrefetchCount > 0) if (ConnectionParams.PrefetchCount > 0)
channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false); channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false);
((IRecoverable)connection).Recovery += (sender, e) => ConnectionEventListener?.Reconnected();
channelInstance.ModelShutdown += (sender, e) => ConnectionEventListener?.Disconnected();
ConnectionEventListener?.Connected();
break; break;
} }
catch (BrokerUnreachableException) catch (BrokerUnreachableException)

View File

@ -56,6 +56,7 @@
<Compile Include="Config\IPublishContext.cs" /> <Compile Include="Config\IPublishContext.cs" />
<Compile Include="Config\IMessageFilterMiddleware.cs" /> <Compile Include="Config\IMessageFilterMiddleware.cs" />
<Compile Include="Config\IPublishMiddleware.cs" /> <Compile Include="Config\IPublishMiddleware.cs" />
<Compile Include="Connection\IConnectionEventListener.cs" />
<Compile Include="Connection\TapetiConsumer.cs" /> <Compile Include="Connection\TapetiConsumer.cs" />
<Compile Include="Connection\TapetiPublisher.cs" /> <Compile Include="Connection\TapetiPublisher.cs" />
<Compile Include="Connection\TapetiSubscriber.cs" /> <Compile Include="Connection\TapetiSubscriber.cs" />

View File

@ -21,10 +21,16 @@ namespace Tapeti
worker = new Lazy<TapetiWorker>(() => new TapetiWorker(config) worker = new Lazy<TapetiWorker>(() => new TapetiWorker(config)
{ {
ConnectionParams = Params ?? new TapetiConnectionParams() ConnectionParams = Params ?? new TapetiConnectionParams(),
ConnectionEventListener = new ConnectionEventListener(this)
}); });
} }
public event EventHandler Connected;
public event EventHandler Disconnected;
public event EventHandler Reconnected;
public async Task<ISubscriber> Subscribe(bool startConsuming = true) public async Task<ISubscriber> Subscribe(bool startConsuming = true)
{ {
@ -61,5 +67,45 @@ namespace Tapeti
{ {
Close().Wait(); Close().Wait();
} }
private class ConnectionEventListener: IConnectionEventListener
{
private readonly TapetiConnection owner;
internal ConnectionEventListener(TapetiConnection owner)
{
this.owner = owner;
}
public void Connected()
{
owner.OnConnected(new EventArgs());
}
public void Disconnected()
{
owner.OnDisconnected(new EventArgs());
}
public void Reconnected()
{
owner.OnReconnected(new EventArgs());
}
}
protected virtual void OnConnected(EventArgs e)
{
Connected?.Invoke(this, e);
}
protected virtual void OnReconnected(EventArgs e)
{
Reconnected?.Invoke(this, e);
}
protected virtual void OnDisconnected(EventArgs e)
{
Disconnected?.Invoke(this, e);
}
} }
} }

View File

@ -35,6 +35,16 @@ namespace Test
Params = new TapetiAppSettingsConnectionParams() Params = new TapetiAppSettingsConnectionParams()
}) })
{ {
connection.Connected += (sender, e) => {
Console.WriteLine("Event Connected");
};
connection.Disconnected += (sender, e) => {
Console.WriteLine("Event Disconnected");
};
connection.Reconnected += (sender, e) => {
Console.WriteLine("Event Reconnected");
};
Console.WriteLine("Subscribing..."); Console.WriteLine("Subscribing...");
var subscriber = connection.Subscribe(false).Result; var subscriber = connection.Subscribe(false).Result;