diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index f0d3ec0..e91ff34 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -40,6 +40,20 @@ namespace Tapeti.Connection } + /// + /// Called after the connection is lost and regained. Reapplies the bindings and if Resume + /// has already been called, restarts the consumers. For internal use only. + /// + /// + public async Task Reconnect() + { + await ApplyBindings(); + + if (consuming) + await ConsumeQueues(); + } + + /// public async Task Resume() { @@ -47,7 +61,12 @@ namespace Tapeti.Connection return; consuming = true; + await ConsumeQueues(); + } + + private async Task ConsumeQueues() + { var queues = config.Bindings.GroupBy(binding => binding.QueueName); await Task.WhenAll(queues.Select(async group => diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index 3e167c5..75ef8c9 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -145,15 +145,15 @@ namespace Tapeti protected virtual void OnReconnected(EventArgs e) { var reconnectedEvent = Reconnected; - if (reconnectedEvent == null) + if (reconnectedEvent == null && subscriber == null) return; - Task.Run(() => + Task.Run(async () => { - subscriber?.ApplyBindings().ContinueWith(t => - { - reconnectedEvent.Invoke(this, e); - }); + if (subscriber != null) + await subscriber.Reconnect(); + + reconnectedEvent?.Invoke(this, e); }); }