diff --git a/PettingZoo.Core/Connection/ConnectionParams.cs b/PettingZoo.Core/Connection/ConnectionParams.cs index fb63f5e..44e55e1 100644 --- a/PettingZoo.Core/Connection/ConnectionParams.cs +++ b/PettingZoo.Core/Connection/ConnectionParams.cs @@ -17,5 +17,11 @@ Username = username; Password = password; } + + + public override string ToString() + { + return $"{Host}:{Port}{VirtualHost}"; + } } } diff --git a/PettingZoo.Core/Connection/DynamicConnection.cs b/PettingZoo.Core/Connection/DynamicConnection.cs new file mode 100644 index 0000000..ba9043b --- /dev/null +++ b/PettingZoo.Core/Connection/DynamicConnection.cs @@ -0,0 +1,100 @@ +using System; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; + +namespace PettingZoo.Core.Connection +{ + public class DynamicConnection : IConnection + { + public Guid ConnectionId => currentConnection?.ConnectionId ?? Guid.Empty; + public ConnectionParams? ConnectionParams { get; private set; } + public ConnectionStatus Status { get; private set; } = ConnectionStatus.Disconnected; + public event EventHandler? StatusChanged; + + + private IConnection? currentConnection; + + + public async ValueTask DisposeAsync() + { + if (currentConnection != null) + await currentConnection.DisposeAsync(); + + GC.SuppressFinalize(this); + } + + + public void Connect() + { + CheckConnection(); + currentConnection.Connect(); + + } + + public async ValueTask Disconnect() + { + if (currentConnection == null) + return; + + var disconnectedConnectionId = currentConnection.ConnectionId; + await currentConnection.DisposeAsync(); + currentConnection = null; + + ConnectionStatusChanged(this, new StatusChangedEventArgs(disconnectedConnectionId, ConnectionStatus.Disconnected)); + } + + + public void SetConnection(IConnection connection) + { + if (currentConnection != null) + { + currentConnection.StatusChanged -= ConnectionStatusChanged; + ConnectionStatusChanged(this, new StatusChangedEventArgs(currentConnection.ConnectionId, ConnectionStatus.Disconnected)); + } + + currentConnection = connection; + + // Assume we get the new connection before Connect is called, thus before the status changes + if (currentConnection != null) + currentConnection.StatusChanged += ConnectionStatusChanged; + } + + + public ISubscriber Subscribe(string exchange, string routingKey) + { + CheckConnection(); + return currentConnection.Subscribe(exchange, routingKey); + } + + + public ISubscriber Subscribe() + { + CheckConnection(); + return currentConnection.Subscribe(); + } + + + public Task Publish(PublishMessageInfo messageInfo) + { + CheckConnection(); + return currentConnection.Publish(messageInfo); + } + + + private void ConnectionStatusChanged(object? sender, StatusChangedEventArgs e) + { + ConnectionParams = e.ConnectionParams; + Status = e.Status; + + StatusChanged?.Invoke(sender, e); + } + + + [MemberNotNull(nameof(currentConnection))] + private void CheckConnection() + { + if (currentConnection == null) + throw new InvalidOperationException("No current connection"); + } + } +} diff --git a/PettingZoo.Core/Connection/IConnection.cs b/PettingZoo.Core/Connection/IConnection.cs index f4ac798..6f49fd1 100644 --- a/PettingZoo.Core/Connection/IConnection.cs +++ b/PettingZoo.Core/Connection/IConnection.cs @@ -5,8 +5,15 @@ namespace PettingZoo.Core.Connection { public interface IConnection : IAsyncDisposable { + Guid ConnectionId { get; } + ConnectionParams? ConnectionParams { get; } + ConnectionStatus Status { get; } + event EventHandler StatusChanged; + + void Connect(); + ISubscriber Subscribe(string exchange, string routingKey); ISubscriber Subscribe(); @@ -25,14 +32,18 @@ namespace PettingZoo.Core.Connection public class StatusChangedEventArgs : EventArgs { + public Guid ConnectionId { get; } public ConnectionStatus Status { get; } - public string? Context { get; } + public ConnectionParams? ConnectionParams { get; } + public Exception? Exception { get; } - public StatusChangedEventArgs(ConnectionStatus status, string? context) + public StatusChangedEventArgs(Guid connectionId, ConnectionStatus status, ConnectionParams? connectionParams = null, Exception? exception = null) { + ConnectionId = connectionId; Status = status; - Context = context; + ConnectionParams = connectionParams; + Exception = exception; } } } diff --git a/PettingZoo.RabbitMQ/RabbitMQClientConnection.cs b/PettingZoo.RabbitMQ/RabbitMQClientConnection.cs index 9f790c8..1db1574 100644 --- a/PettingZoo.RabbitMQ/RabbitMQClientConnection.cs +++ b/PettingZoo.RabbitMQ/RabbitMQClientConnection.cs @@ -3,51 +3,56 @@ using System.Threading; using System.Threading.Tasks; using PettingZoo.Core.Connection; using RabbitMQ.Client; +using IConnection = RabbitMQ.Client.IConnection; namespace PettingZoo.RabbitMQ { public class RabbitMQClientConnection : Core.Connection.IConnection { + public Guid ConnectionId { get; } = Guid.NewGuid(); + public ConnectionParams? ConnectionParams { get; } + public ConnectionStatus Status { get; set; } + public event EventHandler? StatusChanged; + + private const int ConnectRetryDelay = 5000; private readonly CancellationTokenSource connectionTaskToken = new(); - private readonly Task connectionTask; + private Task? connectionTask; private readonly object connectionLock = new(); - private global::RabbitMQ.Client.IConnection? connection; - private IModel? model; + private IConnection? connection; - public event EventHandler? StatusChanged; - - public RabbitMQClientConnection(ConnectionParams connectionParams) { - connectionTask = Task.Factory.StartNew(() => TryConnection(connectionParams, connectionTaskToken.Token), CancellationToken.None); + ConnectionParams = connectionParams; } public async ValueTask DisposeAsync() { + GC.SuppressFinalize(this); + if (connectionTask == null) + return; + connectionTaskToken.Cancel(); if (!connectionTask.IsCompleted) await connectionTask; lock (connectionLock) { - if (model != null) - { - model.Dispose(); - model = null; - } - if (connection != null) { connection.Dispose(); connection = null; } } + } - GC.SuppressFinalize(this); + + public void Connect() + { + connectionTask = Task.Factory.StartNew(() => TryConnection(ConnectionParams!, connectionTaskToken.Token), CancellationToken.None); } @@ -67,6 +72,7 @@ namespace PettingZoo.RabbitMQ { lock (connectionLock) { + var model = connection?.CreateModel(); var subscriber = new RabbitMQClientSubscriber(model, exchange, routingKey); if (model != null) return subscriber; @@ -79,10 +85,10 @@ namespace PettingZoo.RabbitMQ lock (connectionLock) { - if (model == null) + if (connection == null) return; - subscriber.Connected(model); + subscriber.Connected(connection.CreateModel()); } StatusChanged -= ConnectSubscriber; @@ -97,12 +103,30 @@ namespace PettingZoo.RabbitMQ public Task Publish(PublishMessageInfo messageInfo) { - if (model == null) + IConnection? lockedConnection; + + lock (connectionLock) + { + lockedConnection = connection; + } + + if (lockedConnection == null) throw new InvalidOperationException("Not connected"); - model.BasicPublish(messageInfo.Exchange, messageInfo.RoutingKey, false, - RabbitMQClientPropertiesConverter.Convert(messageInfo.Properties, model.CreateBasicProperties()), - messageInfo.Body); + using (var model = lockedConnection.CreateModel()) + { + try + { + model.BasicPublish(messageInfo.Exchange, messageInfo.RoutingKey, false, + RabbitMQClientPropertiesConverter.Convert(messageInfo.Properties, + model.CreateBasicProperties()), + messageInfo.Body); + } + finally + { + model.Close(); + } + } return Task.CompletedTask; } @@ -119,22 +143,22 @@ namespace PettingZoo.RabbitMQ Password = connectionParams.Password }; - var statusContext = $"{connectionParams.Host}:{connectionParams.Port}{connectionParams.VirtualHost}"; - while (!cancellationToken.IsCancellationRequested) { - DoStatusChanged(ConnectionStatus.Connecting, statusContext); + DoStatusChanged(ConnectionStatus.Connecting); try { - connection = factory.CreateConnection(); - model = connection.CreateModel(); - - DoStatusChanged(ConnectionStatus.Connected, statusContext); + lock (connectionLock) + { + connection = factory.CreateConnection(); + } + + DoStatusChanged(ConnectionStatus.Connected); break; } catch (Exception e) { - DoStatusChanged(ConnectionStatus.Error, e.Message); + DoStatusChanged(ConnectionStatus.Error, e); try { @@ -148,9 +172,10 @@ namespace PettingZoo.RabbitMQ } - private void DoStatusChanged(ConnectionStatus status, string? context = null) + private void DoStatusChanged(ConnectionStatus status, Exception? exception = null) { - StatusChanged?.Invoke(this, new StatusChangedEventArgs(status, context)); + Status = status; + StatusChanged?.Invoke(this, new StatusChangedEventArgs(ConnectionId, status, ConnectionParams, exception)); } } } diff --git a/PettingZoo/UI/Main/MainWindowStrings.Designer.cs b/PettingZoo/UI/Main/MainWindowStrings.Designer.cs index fb45c3f..09bb89b 100644 --- a/PettingZoo/UI/Main/MainWindowStrings.Designer.cs +++ b/PettingZoo/UI/Main/MainWindowStrings.Designer.cs @@ -160,7 +160,7 @@ namespace PettingZoo.UI.Main { } /// - /// Looks up a localized string similar to Connected. + /// Looks up a localized string similar to Connected to {0}. /// public static string StatusConnected { get { diff --git a/PettingZoo/UI/Main/MainWindowStrings.resx b/PettingZoo/UI/Main/MainWindowStrings.resx index 7825578..b567899 100644 --- a/PettingZoo/UI/Main/MainWindowStrings.resx +++ b/PettingZoo/UI/Main/MainWindowStrings.resx @@ -151,7 +151,7 @@ Importing messages... - Connected + Connected to {0} Connecting to {0}... diff --git a/PettingZoo/UI/Main/MainWindowViewModel.cs b/PettingZoo/UI/Main/MainWindowViewModel.cs index 9aaa4f4..ea6a13a 100644 --- a/PettingZoo/UI/Main/MainWindowViewModel.cs +++ b/PettingZoo/UI/Main/MainWindowViewModel.cs @@ -43,7 +43,7 @@ namespace PettingZoo.UI.Main private readonly IExportImportFormatProvider exportImportFormatProvider; private SubscribeDialogParams? subscribeDialogParams; - private IConnection? connection; + private readonly DynamicConnection connection = new(); private string connectionStatus; private ITab? activeTab; private readonly Dictionary undockedTabs = new(); @@ -141,15 +141,15 @@ namespace PettingZoo.UI.Main closeTabCommand = new DelegateCommand(CloseTabExecute, HasActiveTabCanExecute); undockTabCommand = new DelegateCommand(UndockTabExecute, HasActiveTabCanExecute); importCommand = new DelegateCommand(ImportExecute); + + connection.StatusChanged += ConnectionStatusChanged; } public async ValueTask DisposeAsync() { GC.SuppressFinalize(this); - - if (connection != null) - await connection.DisposeAsync(); + await connection.DisposeAsync(); } @@ -159,13 +159,9 @@ namespace PettingZoo.UI.Main if (connectionSettings == null) return; - if (connection != null) - await connection.DisposeAsync(); - - connection = connectionFactory.CreateConnection(new ConnectionParams( + connection.SetConnection(connectionFactory.CreateConnection(new ConnectionParams( connectionSettings.Host, connectionSettings.VirtualHost, connectionSettings.Port, - connectionSettings.Username, connectionSettings.Password)); - connection.StatusChanged += ConnectionStatusChanged; + connectionSettings.Username, connectionSettings.Password))); if (connectionSettings.Subscribe) { @@ -173,40 +169,22 @@ namespace PettingZoo.UI.Main tabFactory.CreateSubscriberTab(connection, subscriber); } + connection.Connect(); ConnectionChanged(); } private async void DisconnectExecute() { - Tabs.Clear(); - - var capturedUndockedTabs = undockedTabs.ToList(); - undockedTabs.Clear(); - - foreach (var undockedTab in capturedUndockedTabs) - undockedTab.Value.Close(); - - RaisePropertyChanged(nameof(NoTabsVisibility)); - undockTabCommand.RaiseCanExecuteChanged(); - - if (connection != null) - { - await connection.DisposeAsync(); - connection = null; - } - - ConnectionStatus = GetConnectionStatus(null); - ConnectionStatusType = ConnectionStatusType.Error; - ConnectionChanged(); + await connection.Disconnect(); } private void SubscribeExecute() { - if (connection == null) + if (connection.Status != Core.Connection.ConnectionStatus.Connected) return; - + var newParams = subscribeDialog.Show(subscribeDialogParams); if (newParams == null) return; @@ -220,16 +198,16 @@ namespace PettingZoo.UI.Main private void PublishExecute() { - if (connection == null) + if (connection.Status != Core.Connection.ConnectionStatus.Connected) return; - + tabFactory.CreatePublisherTab(connection); } private bool IsConnectedCanExecute() { - return connection != null; + return connection.Status == Core.Connection.ConnectionStatus.Connected; } @@ -419,6 +397,8 @@ namespace PettingZoo.UI.Main Core.Connection.ConnectionStatus.Connecting => ConnectionStatusType.Connecting, _ => ConnectionStatusType.Error }; + + Application.Current.Dispatcher.BeginInvoke(ConnectionChanged); } @@ -427,9 +407,9 @@ namespace PettingZoo.UI.Main { return args?.Status switch { - Core.Connection.ConnectionStatus.Connecting => string.Format(MainWindowStrings.StatusConnecting, args.Context), - Core.Connection.ConnectionStatus.Connected => string.Format(MainWindowStrings.StatusConnected, args.Context), - Core.Connection.ConnectionStatus.Error => string.Format(MainWindowStrings.StatusError, args.Context), + Core.Connection.ConnectionStatus.Connecting => string.Format(MainWindowStrings.StatusConnecting, args.ConnectionParams), + Core.Connection.ConnectionStatus.Connected => string.Format(MainWindowStrings.StatusConnected, args.ConnectionParams), + Core.Connection.ConnectionStatus.Error => string.Format(MainWindowStrings.StatusError, args.Exception?.Message), Core.Connection.ConnectionStatus.Disconnected => MainWindowStrings.StatusDisconnected, _ => MainWindowStrings.StatusDisconnected }; diff --git a/PettingZoo/UI/Tab/ITabFactory.cs b/PettingZoo/UI/Tab/ITabFactory.cs index 78f5b8a..23f105b 100644 --- a/PettingZoo/UI/Tab/ITabFactory.cs +++ b/PettingZoo/UI/Tab/ITabFactory.cs @@ -4,7 +4,7 @@ namespace PettingZoo.UI.Tab { public interface ITabFactory { - void CreateSubscriberTab(IConnection? connection, ISubscriber subscriber); + void CreateSubscriberTab(IConnection connection, ISubscriber subscriber); string CreateReplySubscriberTab(IConnection connection); void CreatePublisherTab(IConnection connection, ReceivedMessageInfo? fromReceivedMessage = null); } diff --git a/PettingZoo/UI/Tab/Publisher/PublisherView.xaml b/PettingZoo/UI/Tab/Publisher/PublisherView.xaml index a1956b2..0d2b603 100644 --- a/PettingZoo/UI/Tab/Publisher/PublisherView.xaml +++ b/PettingZoo/UI/Tab/Publisher/PublisherView.xaml @@ -8,7 +8,7 @@ xmlns:svgc="http://sharpvectors.codeplex.com/svgc/" xmlns:valueConverters="clr-namespace:PettingZoo.WPF.ValueConverters;assembly=PettingZoo.WPF" mc:Ignorable="d" - d:DesignHeight="450" + d:DesignHeight="1200" d:DesignWidth="800" d:DataContext="{d:DesignInstance res:DesignTimePublisherViewModel, IsDesignTimeCreatable=True}" Background="White"> @@ -34,6 +34,7 @@ + @@ -81,6 +82,7 @@