Fixed consumers not restarting after a reconnect
This commit is contained in:
parent
a9045eea7e
commit
99bc839814
@ -40,6 +40,20 @@ namespace Tapeti.Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Called after the connection is lost and regained. Reapplies the bindings and if Resume
|
||||||
|
/// has already been called, restarts the consumers. For internal use only.
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
|
public async Task Reconnect()
|
||||||
|
{
|
||||||
|
await ApplyBindings();
|
||||||
|
|
||||||
|
if (consuming)
|
||||||
|
await ConsumeQueues();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async Task Resume()
|
public async Task Resume()
|
||||||
{
|
{
|
||||||
@ -47,7 +61,12 @@ namespace Tapeti.Connection
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
consuming = true;
|
consuming = true;
|
||||||
|
await ConsumeQueues();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private async Task ConsumeQueues()
|
||||||
|
{
|
||||||
var queues = config.Bindings.GroupBy(binding => binding.QueueName);
|
var queues = config.Bindings.GroupBy(binding => binding.QueueName);
|
||||||
|
|
||||||
await Task.WhenAll(queues.Select(async group =>
|
await Task.WhenAll(queues.Select(async group =>
|
||||||
|
@ -145,15 +145,15 @@ namespace Tapeti
|
|||||||
protected virtual void OnReconnected(EventArgs e)
|
protected virtual void OnReconnected(EventArgs e)
|
||||||
{
|
{
|
||||||
var reconnectedEvent = Reconnected;
|
var reconnectedEvent = Reconnected;
|
||||||
if (reconnectedEvent == null)
|
if (reconnectedEvent == null && subscriber == null)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Task.Run(() =>
|
Task.Run(async () =>
|
||||||
{
|
{
|
||||||
subscriber?.ApplyBindings().ContinueWith(t =>
|
if (subscriber != null)
|
||||||
{
|
await subscriber.Reconnect();
|
||||||
reconnectedEvent.Invoke(this, e);
|
|
||||||
});
|
reconnectedEvent?.Invoke(this, e);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user