1
0
mirror of synced 2024-07-05 17:50:35 +00:00
Tapeti/Tapeti.Cmd/Commands/ShovelCommand.cs

44 lines
1.3 KiB
C#

using RabbitMQ.Client;
using Tapeti.Cmd.RateLimiter;
namespace Tapeti.Cmd.Commands
{
public class ShovelCommand
{
public string QueueName { get; set; }
public string TargetQueueName { get; set; }
public bool RemoveMessages { get; set; }
public int? MaxCount { get; set; }
public int Execute(IModel sourceChannel, IModel targetChannel, IRateLimiter rateLimiter)
{
var messageCount = 0;
while (!MaxCount.HasValue || messageCount < MaxCount.Value)
{
var result = sourceChannel.BasicGet(QueueName, false);
if (result == null)
// No more messages on the queue
break;
// Since RabbitMQ client 6 we need to copy the body before calling another channel method
// like BasicPublish, or the published body will be corrupted
var bodyCopy = result.Body.ToArray();
rateLimiter.Execute(() =>
{
targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, bodyCopy);
messageCount++;
if (RemoveMessages)
sourceChannel.BasicAck(result.DeliveryTag, false);
});
}
return messageCount;
}
}
}