2021-09-04 09:33:59 +00:00
using System ;
using System.IO ;
using System.Text ;
using CommandLine ;
using RabbitMQ.Client ;
2021-09-04 12:01:03 +00:00
using Tapeti.Cmd.ConsoleHelper ;
2021-09-04 09:33:59 +00:00
using Tapeti.Cmd.RateLimiter ;
using Tapeti.Cmd.Serialization ;
namespace Tapeti.Cmd.Verbs
{
[Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")]
[ExecutableVerb(typeof(ImportVerb))]
public class ImportOptions : BaseMessageSerializerOptions
{
[Option('i', "input", Group = "Input", HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")]
public string InputFile { get ; set ; }
[Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public string InputMessage { get ; set ; }
[Option('c', "pipe", Group = "Input", HelpText = "Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")]
public bool InputPipe { get ; set ; }
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
public bool PublishToExchange { get ; set ; }
[Option("maxrate", HelpText = "The maximum amount of messages per second to import.")]
public int? MaxRate { get ; set ; }
2021-09-04 12:01:03 +00:00
[Option("batchsize", HelpText = "How many messages to import before pausing. Will wait for manual confirmation unless batchpausetime is specified.")]
public int? BatchSize { get ; set ; }
[Option("batchpausetime", HelpText = "How many seconds to wait before starting the next batch if batchsize is specified.")]
public int? BatchPauseTime { get ; set ; }
2021-09-04 09:33:59 +00:00
}
public class ImportVerb : IVerbExecuter
{
private readonly ImportOptions options ;
public ImportVerb ( ImportOptions options )
{
this . options = options ;
}
2021-09-04 12:01:03 +00:00
public void Execute ( IConsole console )
2021-09-04 09:33:59 +00:00
{
2021-09-04 12:01:03 +00:00
var consoleWriter = console . GetPermanentWriter ( ) ;
2021-09-04 09:33:59 +00:00
var factory = new ConnectionFactory
{
HostName = options . Host ,
Port = options . Port ,
VirtualHost = options . VirtualHost ,
UserName = options . Username ,
Password = options . Password
} ;
using var messageSerializer = GetMessageSerializer ( options ) ;
using var connection = factory . CreateConnection ( ) ;
using var channel = connection . CreateModel ( ) ;
2021-09-04 12:01:03 +00:00
var rateLimiter = RateLimiterFactory . Create ( console , options . MaxRate , options . BatchSize , options . BatchPauseTime ) ;
2021-09-04 09:33:59 +00:00
var totalCount = messageSerializer . GetMessageCount ( ) ;
var messageCount = 0 ;
ProgressBar progress = null ;
if ( totalCount > 0 )
2021-09-04 12:01:03 +00:00
progress = new ProgressBar ( console , totalCount ) ;
2021-09-04 09:33:59 +00:00
try
{
foreach ( var message in messageSerializer . Deserialize ( channel ) )
{
2021-09-04 12:01:03 +00:00
if ( console . Cancelled )
2021-09-04 09:33:59 +00:00
break ;
rateLimiter . Execute ( ( ) = >
{
2021-09-04 12:01:03 +00:00
if ( console . Cancelled )
return ;
2021-09-04 09:33:59 +00:00
var exchange = options . PublishToExchange ? message . Exchange : "" ;
var routingKey = options . PublishToExchange ? message . RoutingKey : message . Queue ;
// ReSharper disable AccessToDisposedClosure
channel . BasicPublish ( exchange , routingKey , message . Properties , message . Body ) ;
messageCount + + ;
progress ? . Report ( messageCount ) ;
// ReSharper restore AccessToDisposedClosure
} ) ;
}
}
finally
{
progress ? . Dispose ( ) ;
}
2021-09-04 12:01:03 +00:00
consoleWriter . WriteLine ( $"{messageCount} message{(messageCount != 1 ? " s " : " ")} published." ) ;
2021-09-04 09:33:59 +00:00
}
private static IMessageSerializer GetMessageSerializer ( ImportOptions options )
{
switch ( options . SerializationMethod )
{
case SerializationMethod . SingleFileJSON :
return new SingleFileJSONMessageSerializer ( GetInputStream ( options , out var disposeStream ) , disposeStream , Encoding . UTF8 ) ;
case SerializationMethod . EasyNetQHosepipe :
if ( string . IsNullOrEmpty ( options . InputFile ) )
throw new ArgumentException ( "An input path must be provided when using EasyNetQHosepipe serialization" ) ;
return new EasyNetQMessageSerializer ( options . InputFile ) ;
default :
throw new ArgumentOutOfRangeException ( nameof ( options . SerializationMethod ) , options . SerializationMethod , "Invalid SerializationMethod" ) ;
}
}
private static Stream GetInputStream ( ImportOptions options , out bool disposeStream )
{
if ( options . InputPipe )
{
disposeStream = false ;
return Console . OpenStandardInput ( ) ;
}
if ( ! string . IsNullOrEmpty ( options . InputMessage ) )
{
disposeStream = true ;
return new MemoryStream ( Encoding . UTF8 . GetBytes ( options . InputMessage ) ) ;
}
disposeStream = true ;
return new FileStream ( options . InputFile , FileMode . Open , FileAccess . Read , FileShare . ReadWrite ) ;
}
}
}