2019-10-16 12:49:23 +00:00
|
|
|
|
using RabbitMQ.Client;
|
2020-07-03 13:51:41 +00:00
|
|
|
|
using Tapeti.Cmd.RateLimiter;
|
|
|
|
|
|
2019-10-16 12:49:23 +00:00
|
|
|
|
namespace Tapeti.Cmd.Commands
|
|
|
|
|
{
|
|
|
|
|
public class ShovelCommand
|
|
|
|
|
{
|
|
|
|
|
public string QueueName { get; set; }
|
|
|
|
|
public string TargetQueueName { get; set; }
|
|
|
|
|
public bool RemoveMessages { get; set; }
|
|
|
|
|
public int? MaxCount { get; set; }
|
|
|
|
|
|
|
|
|
|
|
2020-07-03 13:51:41 +00:00
|
|
|
|
public int Execute(IModel sourceChannel, IModel targetChannel, IRateLimiter rateLimiter)
|
2019-10-16 12:49:23 +00:00
|
|
|
|
{
|
|
|
|
|
var messageCount = 0;
|
|
|
|
|
|
|
|
|
|
while (!MaxCount.HasValue || messageCount < MaxCount.Value)
|
|
|
|
|
{
|
|
|
|
|
var result = sourceChannel.BasicGet(QueueName, false);
|
|
|
|
|
if (result == null)
|
|
|
|
|
// No more messages on the queue
|
|
|
|
|
break;
|
|
|
|
|
|
2021-08-24 11:18:53 +00:00
|
|
|
|
// Since RabbitMQ client 6 we need to copy the body before calling another channel method
|
|
|
|
|
// like BasicPublish, or the published body will be corrupted
|
|
|
|
|
var bodyCopy = result.Body.ToArray();
|
|
|
|
|
|
2019-10-16 12:49:23 +00:00
|
|
|
|
|
2020-07-03 13:51:41 +00:00
|
|
|
|
rateLimiter.Execute(() =>
|
|
|
|
|
{
|
2021-08-24 11:18:53 +00:00
|
|
|
|
targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, bodyCopy);
|
2020-07-03 13:51:41 +00:00
|
|
|
|
messageCount++;
|
2019-10-16 12:49:23 +00:00
|
|
|
|
|
2020-07-03 13:51:41 +00:00
|
|
|
|
if (RemoveMessages)
|
|
|
|
|
sourceChannel.BasicAck(result.DeliveryTag, false);
|
|
|
|
|
});
|
2019-10-16 12:49:23 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return messageCount;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|