Merge branch 'release/2.7.6'
This commit is contained in:
commit
5fb3afbc9c
@ -24,4 +24,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -28,4 +28,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -28,4 +28,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
|
using System.Text;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
|
using Console = System.Console;
|
||||||
|
|
||||||
namespace Tapeti.Cmd.ConsoleHelper
|
namespace Tapeti.Cmd.ConsoleHelper
|
||||||
{
|
{
|
||||||
@ -138,39 +140,79 @@ namespace Tapeti.Cmd.ConsoleHelper
|
|||||||
|
|
||||||
public abstract bool Enabled { get; }
|
public abstract bool Enabled { get; }
|
||||||
|
|
||||||
|
public abstract void WriteCaptured(string value, Action processInput);
|
||||||
public abstract void WriteLine(string value);
|
public abstract void WriteLine(string value);
|
||||||
|
|
||||||
|
|
||||||
public void Confirm(string message)
|
public void Confirm(string message)
|
||||||
{
|
{
|
||||||
WriteLine(message);
|
WriteLine(message);
|
||||||
TryReadKey(false, out _);
|
|
||||||
|
// Clear any previous key entered before this confirmation
|
||||||
|
while (!Owner.Cancelled && Console.KeyAvailable)
|
||||||
|
Console.ReadKey(true);
|
||||||
|
|
||||||
|
while (!Owner.Cancelled && !Console.KeyAvailable)
|
||||||
|
Thread.Sleep(50);
|
||||||
|
|
||||||
|
if (Owner.Cancelled)
|
||||||
|
return;
|
||||||
|
|
||||||
|
Console.ReadKey(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public bool ConfirmYesNo(string message)
|
public bool ConfirmYesNo(string message)
|
||||||
{
|
{
|
||||||
WriteLine($"{message} (Y/N) ");
|
var confirmed = false;
|
||||||
if (!TryReadKey(true, out var key))
|
|
||||||
return false;
|
|
||||||
|
|
||||||
return key.KeyChar == 'y' || key.KeyChar == 'Y';
|
WriteCaptured($"{message} (Y/N) ", () =>
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private bool TryReadKey(bool showKeyOutput, out ConsoleKeyInfo keyInfo)
|
|
||||||
{
|
|
||||||
while (!Owner.Cancelled && !Console.KeyAvailable)
|
|
||||||
Thread.Sleep(50);
|
|
||||||
|
|
||||||
if (Owner.Cancelled)
|
|
||||||
{
|
{
|
||||||
keyInfo = default;
|
// Clear any previous key entered before this confirmation
|
||||||
return false;
|
while (!Owner.Cancelled && Console.KeyAvailable)
|
||||||
}
|
Console.ReadKey(true);
|
||||||
|
|
||||||
keyInfo = Console.ReadKey(!showKeyOutput);
|
var input = new StringBuilder();
|
||||||
return true;
|
|
||||||
|
while (!Owner.Cancelled)
|
||||||
|
{
|
||||||
|
if (!Console.KeyAvailable)
|
||||||
|
{
|
||||||
|
Thread.Sleep(50);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var keyInfo = Console.ReadKey(false);
|
||||||
|
|
||||||
|
// ReSharper disable once SwitchStatementHandlesSomeKnownEnumValuesWithDefault - by design
|
||||||
|
switch (keyInfo.Key)
|
||||||
|
{
|
||||||
|
case ConsoleKey.Enter:
|
||||||
|
Console.WriteLine();
|
||||||
|
confirmed = input.ToString().Equals("Y", StringComparison.CurrentCultureIgnoreCase);
|
||||||
|
return;
|
||||||
|
|
||||||
|
case ConsoleKey.Backspace:
|
||||||
|
if (input.Length > 0)
|
||||||
|
{
|
||||||
|
input.Remove(input.Length - 1, 1);
|
||||||
|
|
||||||
|
// We need to handle erasing the character ourselves, as we want to use ReadKey so that we can monitor Cancelled
|
||||||
|
Console.Write(" \b");
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
if (keyInfo.KeyChar != -1)
|
||||||
|
input.Append(keyInfo.KeyChar);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return confirmed;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -185,6 +227,22 @@ namespace Tapeti.Cmd.ConsoleHelper
|
|||||||
public override bool Enabled => true;
|
public override bool Enabled => true;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public override void WriteCaptured(string value, Action waitForInput)
|
||||||
|
{
|
||||||
|
Owner.AcquirePermanent();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Console.Write(value);
|
||||||
|
waitForInput();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
Owner.ReleasePermanent();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public override void WriteLine(string value)
|
public override void WriteLine(string value)
|
||||||
{
|
{
|
||||||
Owner.AcquirePermanent();
|
Owner.AcquirePermanent();
|
||||||
@ -217,6 +275,13 @@ namespace Tapeti.Cmd.ConsoleHelper
|
|||||||
public override bool Enabled => !Console.IsOutputRedirected;
|
public override bool Enabled => !Console.IsOutputRedirected;
|
||||||
|
|
||||||
|
|
||||||
|
public override void WriteCaptured(string value, Action waitForInput)
|
||||||
|
{
|
||||||
|
WriteLine(value);
|
||||||
|
waitForInput();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public override void WriteLine(string value)
|
public override void WriteLine(string value)
|
||||||
{
|
{
|
||||||
if (!Enabled)
|
if (!Enabled)
|
||||||
|
@ -26,7 +26,7 @@ namespace Tapeti.Cmd.RateLimiter
|
|||||||
if (batchCount > batchSize)
|
if (batchCount > batchSize)
|
||||||
{
|
{
|
||||||
Pause(console);
|
Pause(console);
|
||||||
batchCount = 0;
|
batchCount = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
decoratedRateLimiter.Execute(action);
|
decoratedRateLimiter.Execute(action);
|
||||||
|
@ -31,6 +31,12 @@ namespace Tapeti.Cmd.Serialization
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static bool OutputExists(string path)
|
||||||
|
{
|
||||||
|
return Directory.Exists(path) && Directory.GetFiles(path, "*.message.txt").Length > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,6 @@
|
|||||||
using System.IO;
|
using System.IO;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using CommandLine;
|
using CommandLine;
|
||||||
using RabbitMQ.Client;
|
|
||||||
using Tapeti.Cmd.ConsoleHelper;
|
using Tapeti.Cmd.ConsoleHelper;
|
||||||
using Tapeti.Cmd.Serialization;
|
using Tapeti.Cmd.Serialization;
|
||||||
|
|
||||||
@ -18,9 +17,15 @@ namespace Tapeti.Cmd.Verbs
|
|||||||
[Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")]
|
[Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")]
|
||||||
public string OutputPath { get; set; }
|
public string OutputPath { get; set; }
|
||||||
|
|
||||||
|
[Option('y', "overwrite", HelpText = "If the output exists, do not ask to overwrite.")]
|
||||||
|
public bool Overwrite { get; set; }
|
||||||
|
|
||||||
[Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")]
|
[Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")]
|
||||||
public bool RemoveMessages { get; set; }
|
public bool RemoveMessages { get; set; }
|
||||||
|
|
||||||
|
[Option("skip", HelpText = "(Default: 0) Number of messages in the queue to skip. Useful if a previous non-removing export was interrupted.", Default = 0)]
|
||||||
|
public int Skip { get; set; }
|
||||||
|
|
||||||
[Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")]
|
[Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")]
|
||||||
public int? MaxCount { get; set; }
|
public int? MaxCount { get; set; }
|
||||||
}
|
}
|
||||||
@ -40,12 +45,21 @@ namespace Tapeti.Cmd.Verbs
|
|||||||
public void Execute(IConsole console)
|
public void Execute(IConsole console)
|
||||||
{
|
{
|
||||||
var consoleWriter = console.GetPermanentWriter();
|
var consoleWriter = console.GetPermanentWriter();
|
||||||
|
|
||||||
|
using var messageSerializer = GetMessageSerializer(options, consoleWriter);
|
||||||
|
if (messageSerializer == null)
|
||||||
|
return;
|
||||||
|
|
||||||
var factory = options.CreateConnectionFactory(console);
|
var factory = options.CreateConnectionFactory(console);
|
||||||
using var messageSerializer = GetMessageSerializer(options);
|
|
||||||
using var connection = factory.CreateConnection();
|
using var connection = factory.CreateConnection();
|
||||||
using var channel = connection.CreateModel();
|
using var channel = connection.CreateModel();
|
||||||
|
|
||||||
var totalCount = (int)channel.MessageCount(options.QueueName);
|
var totalCount = (int)channel.MessageCount(options.QueueName);
|
||||||
|
|
||||||
|
var skip = Math.Max(options.Skip, 0);
|
||||||
|
if (skip > 0)
|
||||||
|
totalCount -= Math.Min(skip, totalCount);
|
||||||
|
|
||||||
if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount)
|
if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount)
|
||||||
totalCount = options.MaxCount.Value;
|
totalCount = options.MaxCount.Value;
|
||||||
|
|
||||||
@ -61,24 +75,28 @@ namespace Tapeti.Cmd.Verbs
|
|||||||
// No more messages on the queue
|
// No more messages on the queue
|
||||||
break;
|
break;
|
||||||
|
|
||||||
messageCount++;
|
if (skip > 0)
|
||||||
|
skip--;
|
||||||
messageSerializer.Serialize(new Message
|
else
|
||||||
{
|
{
|
||||||
DeliveryTag = result.DeliveryTag,
|
messageCount++;
|
||||||
Redelivered = result.Redelivered,
|
|
||||||
Exchange = result.Exchange,
|
|
||||||
RoutingKey = result.RoutingKey,
|
|
||||||
Queue = options.QueueName,
|
|
||||||
Properties = result.BasicProperties,
|
|
||||||
Body = result.Body.ToArray()
|
|
||||||
});
|
|
||||||
|
|
||||||
if (options.RemoveMessages)
|
messageSerializer.Serialize(new Message
|
||||||
channel.BasicAck(result.DeliveryTag, false);
|
{
|
||||||
|
DeliveryTag = result.DeliveryTag,
|
||||||
|
Redelivered = result.Redelivered,
|
||||||
|
Exchange = result.Exchange,
|
||||||
|
RoutingKey = result.RoutingKey,
|
||||||
|
Queue = options.QueueName,
|
||||||
|
Properties = result.BasicProperties,
|
||||||
|
Body = result.Body.ToArray()
|
||||||
|
});
|
||||||
|
|
||||||
|
if (options.RemoveMessages)
|
||||||
|
channel.BasicAck(result.DeliveryTag, false);
|
||||||
|
|
||||||
progressBar.Report(messageCount);
|
progressBar.Report(messageCount);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,16 +104,27 @@ namespace Tapeti.Cmd.Verbs
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static IMessageSerializer GetMessageSerializer(ExportOptions options)
|
private static IMessageSerializer GetMessageSerializer(ExportOptions options, IConsoleWriter consoleWriter)
|
||||||
{
|
{
|
||||||
switch (options.SerializationMethod)
|
switch (options.SerializationMethod)
|
||||||
{
|
{
|
||||||
case SerializationMethod.SingleFileJSON:
|
case SerializationMethod.SingleFileJSON:
|
||||||
|
// ReSharper disable once InvertIf - causes two lines of "new SingleFileJSONMessageSerializer". DRY ReSharper.
|
||||||
|
if (!options.Overwrite && File.Exists(options.OutputPath))
|
||||||
|
{
|
||||||
|
if (!consoleWriter.ConfirmYesNo($"The output file '{options.OutputPath}' already exists, do you want to overwrite it?"))
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
return new SingleFileJSONMessageSerializer(new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read), true, Encoding.UTF8);
|
return new SingleFileJSONMessageSerializer(new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read), true, Encoding.UTF8);
|
||||||
|
|
||||||
case SerializationMethod.EasyNetQHosepipe:
|
case SerializationMethod.EasyNetQHosepipe:
|
||||||
if (string.IsNullOrEmpty(options.OutputPath))
|
// ReSharper disable once InvertIf - causes two lines of "new SingleFileJSONMessageSerializer". DRY ReSharper.
|
||||||
throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization");
|
if (!options.Overwrite && EasyNetQMessageSerializer.OutputExists(options.OutputPath))
|
||||||
|
{
|
||||||
|
if (!consoleWriter.ConfirmYesNo($"The output path '{options.OutputPath}' already contains a previous export, do you want to overwrite it?"))
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
return new EasyNetQMessageSerializer(options.OutputPath);
|
return new EasyNetQMessageSerializer(options.OutputPath);
|
||||||
|
|
||||||
|
@ -29,6 +29,12 @@ namespace Tapeti.Cmd.Verbs
|
|||||||
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
|
[Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")]
|
||||||
public bool PublishToExchange { get; set; }
|
public bool PublishToExchange { get; set; }
|
||||||
|
|
||||||
|
[Option("skip", HelpText = "(Default: 0) Number of messages in the input to skip. Useful if a previous import was interrupted.", Default = 0)]
|
||||||
|
public int Skip { get; set; }
|
||||||
|
|
||||||
|
[Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to import.")]
|
||||||
|
public int? MaxCount { get; set; }
|
||||||
|
|
||||||
[Option("maxrate", HelpText = "The maximum amount of messages per second to import.")]
|
[Option("maxrate", HelpText = "The maximum amount of messages per second to import.")]
|
||||||
public int? MaxRate { get; set; }
|
public int? MaxRate { get; set; }
|
||||||
|
|
||||||
@ -61,6 +67,7 @@ namespace Tapeti.Cmd.Verbs
|
|||||||
|
|
||||||
var totalCount = messageSerializer.GetMessageCount();
|
var totalCount = messageSerializer.GetMessageCount();
|
||||||
var messageCount = 0;
|
var messageCount = 0;
|
||||||
|
var skip = Math.Max(options.Skip, 0);
|
||||||
|
|
||||||
|
|
||||||
ProgressBar progress = null;
|
ProgressBar progress = null;
|
||||||
@ -70,24 +77,27 @@ namespace Tapeti.Cmd.Verbs
|
|||||||
{
|
{
|
||||||
foreach (var message in messageSerializer.Deserialize(channel))
|
foreach (var message in messageSerializer.Deserialize(channel))
|
||||||
{
|
{
|
||||||
if (console.Cancelled)
|
if (console.Cancelled || (options.MaxCount.HasValue && messageCount >= options.MaxCount.Value))
|
||||||
break;
|
break;
|
||||||
|
|
||||||
rateLimiter.Execute(() =>
|
if (skip > 0)
|
||||||
{
|
skip--;
|
||||||
if (console.Cancelled)
|
else
|
||||||
return;
|
rateLimiter.Execute(() =>
|
||||||
|
{
|
||||||
|
if (console.Cancelled)
|
||||||
|
return;
|
||||||
|
|
||||||
var exchange = options.PublishToExchange ? message.Exchange : "";
|
var exchange = options.PublishToExchange ? message.Exchange : "";
|
||||||
var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue;
|
var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue;
|
||||||
|
|
||||||
// ReSharper disable AccessToDisposedClosure
|
// ReSharper disable AccessToDisposedClosure
|
||||||
channel.BasicPublish(exchange, routingKey, message.Properties, message.Body);
|
channel.BasicPublish(exchange, routingKey, message.Properties, message.Body);
|
||||||
messageCount++;
|
messageCount++;
|
||||||
|
|
||||||
progress?.Report(messageCount);
|
progress?.Report(messageCount);
|
||||||
// ReSharper restore AccessToDisposedClosure
|
// ReSharper restore AccessToDisposedClosure
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
|
@ -37,6 +37,9 @@ namespace Tapeti.Cmd.Verbs
|
|||||||
[Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")]
|
[Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")]
|
||||||
public string TargetPassword { get; set; }
|
public string TargetPassword { get; set; }
|
||||||
|
|
||||||
|
[Option("skip", HelpText = "(Default: 0) Number of messages in the queue to skip. Useful if a previous non-removing shovel was interrupted.", Default = 0)]
|
||||||
|
public int Skip { get; set; }
|
||||||
|
|
||||||
[Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")]
|
[Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")]
|
||||||
public int? MaxRate { get; set; }
|
public int? MaxRate { get; set; }
|
||||||
|
|
||||||
@ -93,6 +96,11 @@ namespace Tapeti.Cmd.Verbs
|
|||||||
var targetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName;
|
var targetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName;
|
||||||
|
|
||||||
var totalCount = (int)sourceChannel.MessageCount(options.QueueName);
|
var totalCount = (int)sourceChannel.MessageCount(options.QueueName);
|
||||||
|
|
||||||
|
var skip = Math.Max(options.Skip, 0);
|
||||||
|
if (skip > 0)
|
||||||
|
totalCount -= Math.Min(skip, totalCount);
|
||||||
|
|
||||||
if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount)
|
if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount)
|
||||||
totalCount = options.MaxCount.Value;
|
totalCount = options.MaxCount.Value;
|
||||||
|
|
||||||
@ -101,6 +109,18 @@ namespace Tapeti.Cmd.Verbs
|
|||||||
|
|
||||||
using (var progressBar = new ProgressBar(console, totalCount))
|
using (var progressBar = new ProgressBar(console, totalCount))
|
||||||
{
|
{
|
||||||
|
// Perform the skips outside of the rate limiter
|
||||||
|
while (skip > 0 && !console.Cancelled)
|
||||||
|
{
|
||||||
|
var result = sourceChannel.BasicGet(options.QueueName, false);
|
||||||
|
if (result == null)
|
||||||
|
// No more messages on the queue
|
||||||
|
return;
|
||||||
|
|
||||||
|
skip--;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
var hasMessage = true;
|
var hasMessage = true;
|
||||||
|
|
||||||
while (!console.Cancelled && hasMessage && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value))
|
while (!console.Cancelled && hasMessage && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value))
|
||||||
|
@ -28,4 +28,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -32,4 +32,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -41,4 +41,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -29,4 +29,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -28,4 +28,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -32,4 +32,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -33,4 +33,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -28,4 +28,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -28,4 +28,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -8,7 +8,7 @@ namespace Tapeti.Config
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Provides information about the message currently being handled.
|
/// Provides information about the message currently being handled.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public interface IMessageContext : IAsyncDisposable, IDisposable
|
public interface IMessageContext : IAsyncDisposable
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Provides access to the Tapeti config.
|
/// Provides access to the Tapeti config.
|
||||||
|
@ -66,21 +66,21 @@ namespace Tapeti.Default
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
|
||||||
public void Dispose()
|
|
||||||
{
|
|
||||||
foreach (var payload in payloads.Values)
|
|
||||||
(payload as IDisposable)?.Dispose();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async ValueTask DisposeAsync()
|
public async ValueTask DisposeAsync()
|
||||||
{
|
{
|
||||||
foreach (var payload in payloads.Values)
|
foreach (var payload in payloads.Values)
|
||||||
{
|
{
|
||||||
if (payload is IAsyncDisposable asyncDisposable)
|
switch (payload)
|
||||||
await asyncDisposable.DisposeAsync();
|
{
|
||||||
|
case IAsyncDisposable asyncDisposable:
|
||||||
|
await asyncDisposable.DisposeAsync();
|
||||||
|
break;
|
||||||
|
|
||||||
|
case IDisposable disposable:
|
||||||
|
disposable.Dispose();
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,4 +36,7 @@
|
|||||||
</None>
|
</None>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
|
||||||
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
26
appveyor.yml
26
appveyor.yml
@ -11,19 +11,19 @@ before_build:
|
|||||||
|
|
||||||
after_build:
|
after_build:
|
||||||
# Create NuGet packages
|
# Create NuGet packages
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti\Tapeti.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti\Tapeti.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Annotations\Tapeti.Annotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Annotations\Tapeti.Annotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.DataAnnotations.Extensions\Tapeti.DataAnnotations.Extensions.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.DataAnnotations.Extensions\Tapeti.DataAnnotations.Extensions.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Flow\Tapeti.Flow.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Flow\Tapeti.Flow.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Transient\Tapeti.Transient.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Transient\Tapeti.Transient.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Serilog\Tapeti.Serilog.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Serilog\Tapeti.Serilog.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Autofac\Tapeti.Autofac.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Autofac\Tapeti.Autofac.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.CastleWindsor\Tapeti.CastleWindsor.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.CastleWindsor\Tapeti.CastleWindsor.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Ninject\Tapeti.Ninject.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Ninject\Tapeti.Ninject.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.UnityContainer\Tapeti.UnityContainer.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
- cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.UnityContainer\Tapeti.UnityContainer.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion%
|
||||||
# Create Tapeti.Cmd release
|
# Create Tapeti.Cmd release
|
||||||
- cmd: dotnet publish -c Release -r win-x64 --self-contained=true -o publish\x64\selfcontained Tapeti.Cmd\Tapeti.Cmd.csproj
|
- cmd: dotnet publish -c Release -r win-x64 --self-contained=true -o publish\x64\selfcontained Tapeti.Cmd\Tapeti.Cmd.csproj
|
||||||
- cmd: dotnet publish -c Release -r win-x64 --self-contained=false -o publish\x64 Tapeti.Cmd\Tapeti.Cmd.csproj
|
- cmd: dotnet publish -c Release -r win-x64 --self-contained=false -o publish\x64 Tapeti.Cmd\Tapeti.Cmd.csproj
|
||||||
|
@ -45,9 +45,15 @@ Fetches messages from a queue and writes them to disk.
|
|||||||
-o <target>, --output <target>
|
-o <target>, --output <target>
|
||||||
*Required*. Path or filename (depending on the chosen serialization method) where the messages will be output to.
|
*Required*. Path or filename (depending on the chosen serialization method) where the messages will be output to.
|
||||||
|
|
||||||
|
-y, --overwrite
|
||||||
|
If the output exists, do not ask to overwrite.
|
||||||
|
|
||||||
-r, --remove
|
-r, --remove
|
||||||
If specified messages are acknowledged and removed from the queue. If not messages are kept.
|
If specified messages are acknowledged and removed from the queue. If not messages are kept.
|
||||||
|
|
||||||
|
--skip <count>
|
||||||
|
Number of messages in the input to skip. Useful if a previous non-removing export was interrupted.
|
||||||
|
|
||||||
-n <count>, --maxcount <count>
|
-n <count>, --maxcount <count>
|
||||||
Maximum number of messages to retrieve from the queue. If not specified all messages are exported.
|
Maximum number of messages to retrieve from the queue. If not specified all messages are exported.
|
||||||
|
|
||||||
@ -79,6 +85,12 @@ Read messages from disk as previously exported and publish them to a queue.
|
|||||||
-e, --exchange
|
-e, --exchange
|
||||||
If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.
|
If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.
|
||||||
|
|
||||||
|
--skip <count>
|
||||||
|
Number of messages in the input to skip. Useful if a previous import was interrupted.
|
||||||
|
|
||||||
|
-n <count>, --maxcount <count>
|
||||||
|
Maximum number of messages to import. If not specified all messages are imported.
|
||||||
|
|
||||||
-s <method>, --serialization <method>
|
-s <method>, --serialization <method>
|
||||||
The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.
|
The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.
|
||||||
|
|
||||||
@ -115,6 +127,9 @@ Reads messages from a queue and publishes them to another queue, optionally to a
|
|||||||
-r, --remove
|
-r, --remove
|
||||||
If specified messages are acknowledged and removed from the queue. If not messages are kept.
|
If specified messages are acknowledged and removed from the queue. If not messages are kept.
|
||||||
|
|
||||||
|
--skip <count>
|
||||||
|
Number of messages in the input to skip. Useful if a previous non-removing shovel was interrupted.
|
||||||
|
|
||||||
-n <count>, --maxcount <count>
|
-n <count>, --maxcount <count>
|
||||||
Maximum number of messages to retrieve from the queue. If not specified all messages are exported.
|
Maximum number of messages to retrieve from the queue. If not specified all messages are exported.
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user