using System;
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Connection;
// ReSharper disable UnusedMember.Global
// TODO more separation from the actual worker / RabbitMQ Client for unit testing purposes
namespace Tapeti
{
///
///
/// Creates a connection to RabbitMQ based on the provided Tapeti config.
///
public class TapetiConnection : IConnection
{
private readonly ITapetiConfig config;
///
/// Specifies the hostname and credentials to use when connecting to RabbitMQ.
/// Defaults to guest on localhost.
///
///
/// This property must be set before first subscribing or publishing, otherwise it
/// will use the default connection parameters.
///
public TapetiConnectionParams Params { get; set; }
private readonly Lazy client;
private TapetiSubscriber subscriber;
private bool disposed;
///
/// Creates a new instance of a TapetiConnection and registers a default IPublisher
/// in the IoC container as provided in the config.
///
///
public TapetiConnection(ITapetiConfig config)
{
this.config = config;
(config.DependencyResolver as IDependencyContainer)?.RegisterDefault(GetPublisher);
client = new Lazy(() => new TapetiClient(config, Params ?? new TapetiConnectionParams())
{
ConnectionEventListener = new ConnectionEventListener(this)
});
}
///
public event ConnectedEventHandler Connected;
///
public event DisconnectedEventHandler Disconnected;
///
public event ConnectedEventHandler Reconnected;
///
public async Task Subscribe(bool startConsuming = true)
{
if (subscriber == null)
{
subscriber = new TapetiSubscriber(() => client.Value, config);
await subscriber.ApplyBindings();
}
if (startConsuming)
await subscriber.Resume();
return subscriber;
}
///
public ISubscriber SubscribeSync(bool startConsuming = true)
{
return Subscribe(startConsuming).Result;
}
///
public IPublisher GetPublisher()
{
return new TapetiPublisher(config, () => client.Value);
}
///
public async Task Close()
{
if (client.IsValueCreated)
await client.Value.Close();
}
///
public void Dispose()
{
if (!disposed)
DisposeAsync().GetAwaiter().GetResult();
}
///
public async ValueTask DisposeAsync()
{
if (disposed)
return;
if (subscriber != null)
await subscriber.DisposeAsync();
await Close();
disposed = true;
}
private class ConnectionEventListener: IConnectionEventListener
{
private readonly TapetiConnection owner;
internal ConnectionEventListener(TapetiConnection owner)
{
this.owner = owner;
}
public void Connected(ConnectedEventArgs e)
{
owner.OnConnected(e);
}
public void Disconnected(DisconnectedEventArgs e)
{
owner.OnDisconnected(e);
}
public void Reconnected(ConnectedEventArgs e)
{
owner.OnReconnected(e);
}
}
///
/// Called when a connection to RabbitMQ has been established.
///
protected virtual void OnConnected(ConnectedEventArgs e)
{
var connectedEvent = Connected;
if (connectedEvent == null)
return;
Task.Run(() => connectedEvent.Invoke(this, e));
}
///
/// Called when the connection to RabbitMQ has been lost.
///
protected virtual void OnReconnected(ConnectedEventArgs e)
{
var reconnectedEvent = Reconnected;
if (reconnectedEvent == null && subscriber == null)
return;
subscriber?.Reconnect();
Task.Run(() => reconnectedEvent?.Invoke(this, e));
}
///
/// Called when the connection to RabbitMQ has been recovered after an unexpected disconnect.
///
protected virtual void OnDisconnected(DisconnectedEventArgs e)
{
var disconnectedEvent = Disconnected;
if (disconnectedEvent == null)
return;
subscriber?.Disconnect();
Task.Run(() => disconnectedEvent.Invoke(this, e));
}
}
}