From 0fb2c4808310de0b1d7e94050e103520546124bd Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 15 Sep 2021 19:56:24 +0200 Subject: [PATCH 1/7] #33 Tapeti.Cmd confirm overwrite existing files when exporting Also fixed issue with input remaining in buffer which can cause accidental confirms --- Tapeti.Cmd/ConsoleHelper/ConsoleWrapper.cs | 103 ++++++++++++++---- .../EasyNetQMessageSerializer.cs | 6 + Tapeti.Cmd/Verbs/ExportVerb.cs | 27 ++++- docs/tapeticmd.rst | 3 + 4 files changed, 115 insertions(+), 24 deletions(-) diff --git a/Tapeti.Cmd/ConsoleHelper/ConsoleWrapper.cs b/Tapeti.Cmd/ConsoleHelper/ConsoleWrapper.cs index 4f5aaee..6c2bcc8 100644 --- a/Tapeti.Cmd/ConsoleHelper/ConsoleWrapper.cs +++ b/Tapeti.Cmd/ConsoleHelper/ConsoleWrapper.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Text; using System.Threading; +using Console = System.Console; namespace Tapeti.Cmd.ConsoleHelper { @@ -138,39 +140,79 @@ namespace Tapeti.Cmd.ConsoleHelper public abstract bool Enabled { get; } + public abstract void WriteCaptured(string value, Action processInput); public abstract void WriteLine(string value); public void Confirm(string message) { WriteLine(message); - TryReadKey(false, out _); + + // Clear any previous key entered before this confirmation + while (!Owner.Cancelled && Console.KeyAvailable) + Console.ReadKey(true); + + while (!Owner.Cancelled && !Console.KeyAvailable) + Thread.Sleep(50); + + if (Owner.Cancelled) + return; + + Console.ReadKey(true); } public bool ConfirmYesNo(string message) { - WriteLine($"{message} (Y/N) "); - if (!TryReadKey(true, out var key)) - return false; + var confirmed = false; - return key.KeyChar == 'y' || key.KeyChar == 'Y'; - } - - - private bool TryReadKey(bool showKeyOutput, out ConsoleKeyInfo keyInfo) - { - while (!Owner.Cancelled && !Console.KeyAvailable) - Thread.Sleep(50); - - if (Owner.Cancelled) + WriteCaptured($"{message} (Y/N) ", () => { - keyInfo = default; - return false; - } + // Clear any previous key entered before this confirmation + while (!Owner.Cancelled && Console.KeyAvailable) + Console.ReadKey(true); - keyInfo = Console.ReadKey(!showKeyOutput); - return true; + var input = new StringBuilder(); + + while (!Owner.Cancelled) + { + if (!Console.KeyAvailable) + { + Thread.Sleep(50); + continue; + } + + var keyInfo = Console.ReadKey(false); + + // ReSharper disable once SwitchStatementHandlesSomeKnownEnumValuesWithDefault - by design + switch (keyInfo.Key) + { + case ConsoleKey.Enter: + Console.WriteLine(); + confirmed = input.ToString().Equals("Y", StringComparison.CurrentCultureIgnoreCase); + return; + + case ConsoleKey.Backspace: + if (input.Length > 0) + { + input.Remove(input.Length - 1, 1); + + // We need to handle erasing the character ourselves, as we want to use ReadKey so that we can monitor Cancelled + Console.Write(" \b"); + } + + break; + + default: + if (keyInfo.KeyChar != -1) + input.Append(keyInfo.KeyChar); + + break; + } + } + }); + + return confirmed; } } @@ -185,6 +227,22 @@ namespace Tapeti.Cmd.ConsoleHelper public override bool Enabled => true; + + public override void WriteCaptured(string value, Action waitForInput) + { + Owner.AcquirePermanent(); + try + { + Console.Write(value); + waitForInput(); + } + finally + { + Owner.ReleasePermanent(); + } + } + + public override void WriteLine(string value) { Owner.AcquirePermanent(); @@ -217,6 +275,13 @@ namespace Tapeti.Cmd.ConsoleHelper public override bool Enabled => !Console.IsOutputRedirected; + public override void WriteCaptured(string value, Action waitForInput) + { + WriteLine(value); + waitForInput(); + } + + public override void WriteLine(string value) { if (!Enabled) diff --git a/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs b/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs index 570fb02..cabbcb3 100644 --- a/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs +++ b/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs @@ -31,6 +31,12 @@ namespace Tapeti.Cmd.Serialization } + public static bool OutputExists(string path) + { + return Directory.Exists(path) && Directory.GetFiles(path, "*.message.txt").Length > 0; + } + + public void Dispose() { } diff --git a/Tapeti.Cmd/Verbs/ExportVerb.cs b/Tapeti.Cmd/Verbs/ExportVerb.cs index d6313ef..9e62c9d 100644 --- a/Tapeti.Cmd/Verbs/ExportVerb.cs +++ b/Tapeti.Cmd/Verbs/ExportVerb.cs @@ -2,7 +2,6 @@ using System.IO; using System.Text; using CommandLine; -using RabbitMQ.Client; using Tapeti.Cmd.ConsoleHelper; using Tapeti.Cmd.Serialization; @@ -17,6 +16,9 @@ namespace Tapeti.Cmd.Verbs [Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")] public string OutputPath { get; set; } + + [Option('y', "overwrite", HelpText = "If the output exists, do not ask to overwrite.")] + public bool Overwrite { get; set; } [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")] public bool RemoveMessages { get; set; } @@ -40,8 +42,12 @@ namespace Tapeti.Cmd.Verbs public void Execute(IConsole console) { var consoleWriter = console.GetPermanentWriter(); + + using var messageSerializer = GetMessageSerializer(options, consoleWriter); + if (messageSerializer == null) + return; + var factory = options.CreateConnectionFactory(console); - using var messageSerializer = GetMessageSerializer(options); using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); @@ -86,16 +92,27 @@ namespace Tapeti.Cmd.Verbs } - private static IMessageSerializer GetMessageSerializer(ExportOptions options) + private static IMessageSerializer GetMessageSerializer(ExportOptions options, IConsoleWriter consoleWriter) { switch (options.SerializationMethod) { case SerializationMethod.SingleFileJSON: + // ReSharper disable once InvertIf - causes two lines of "new SingleFileJSONMessageSerializer". DRY ReSharper. + if (!options.Overwrite && File.Exists(options.OutputPath)) + { + if (!consoleWriter.ConfirmYesNo($"The output file '{options.OutputPath}' already exists, do you want to overwrite it?")) + return null; + } + return new SingleFileJSONMessageSerializer(new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read), true, Encoding.UTF8); case SerializationMethod.EasyNetQHosepipe: - if (string.IsNullOrEmpty(options.OutputPath)) - throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization"); + // ReSharper disable once InvertIf - causes two lines of "new SingleFileJSONMessageSerializer". DRY ReSharper. + if (!options.Overwrite && EasyNetQMessageSerializer.OutputExists(options.OutputPath)) + { + if (!consoleWriter.ConfirmYesNo($"The output path '{options.OutputPath}' already contains a previous export, do you want to overwrite it?")) + return null; + } return new EasyNetQMessageSerializer(options.OutputPath); diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst index a474baf..5b9b876 100644 --- a/docs/tapeticmd.rst +++ b/docs/tapeticmd.rst @@ -45,6 +45,9 @@ Fetches messages from a queue and writes them to disk. -o , --output *Required*. Path or filename (depending on the chosen serialization method) where the messages will be output to. +-y, --overwrite + If the output exists, do not ask to overwrite. + -r, --remove If specified messages are acknowledged and removed from the queue. If not messages are kept. From 254af4187509cfd1dd7f8e16e42e8005791ba615 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 15 Sep 2021 19:57:51 +0200 Subject: [PATCH 2/7] Fixed BatchSize sneaking in one more every time --- Tapeti.Cmd/RateLimiter/BatchSizeRateLimiter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tapeti.Cmd/RateLimiter/BatchSizeRateLimiter.cs b/Tapeti.Cmd/RateLimiter/BatchSizeRateLimiter.cs index c9e8ed9..058682a 100644 --- a/Tapeti.Cmd/RateLimiter/BatchSizeRateLimiter.cs +++ b/Tapeti.Cmd/RateLimiter/BatchSizeRateLimiter.cs @@ -26,7 +26,7 @@ namespace Tapeti.Cmd.RateLimiter if (batchCount > batchSize) { Pause(console); - batchCount = 0; + batchCount = 1; } decoratedRateLimiter.Execute(action); From f887cd8b7899578295b684883beae42ca4f10f35 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 15 Sep 2021 20:07:03 +0200 Subject: [PATCH 3/7] Implemented skip and maxcount parameters for Tapeti.Cmd import --- Tapeti.Cmd/Verbs/ImportVerb.cs | 38 +++++++++++++++++++++------------- docs/tapeticmd.rst | 6 ++++++ 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/Tapeti.Cmd/Verbs/ImportVerb.cs b/Tapeti.Cmd/Verbs/ImportVerb.cs index 87fa434..441d812 100644 --- a/Tapeti.Cmd/Verbs/ImportVerb.cs +++ b/Tapeti.Cmd/Verbs/ImportVerb.cs @@ -28,6 +28,12 @@ namespace Tapeti.Cmd.Verbs [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] public bool PublishToExchange { get; set; } + + [Option("skip", HelpText = "(Default: 0) Number of messages in the input to skip. Useful if a previous import was interrupted.", Default = 0)] + public int Skip { get; set; } + + [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to import.")] + public int? MaxCount { get; set; } [Option("maxrate", HelpText = "The maximum amount of messages per second to import.")] public int? MaxRate { get; set; } @@ -61,6 +67,7 @@ namespace Tapeti.Cmd.Verbs var totalCount = messageSerializer.GetMessageCount(); var messageCount = 0; + var skip = Math.Max(options.Skip, 0); ProgressBar progress = null; @@ -70,24 +77,27 @@ namespace Tapeti.Cmd.Verbs { foreach (var message in messageSerializer.Deserialize(channel)) { - if (console.Cancelled) + if (console.Cancelled || (options.MaxCount.HasValue && messageCount >= options.MaxCount.Value)) break; - rateLimiter.Execute(() => - { - if (console.Cancelled) - return; + if (skip > 0) + skip--; + else + rateLimiter.Execute(() => + { + if (console.Cancelled) + return; - var exchange = options.PublishToExchange ? message.Exchange : ""; - var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue; + var exchange = options.PublishToExchange ? message.Exchange : ""; + var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue; - // ReSharper disable AccessToDisposedClosure - channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); - messageCount++; - - progress?.Report(messageCount); - // ReSharper restore AccessToDisposedClosure - }); + // ReSharper disable AccessToDisposedClosure + channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); + messageCount++; + + progress?.Report(messageCount); + // ReSharper restore AccessToDisposedClosure + }); } } finally diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst index 5b9b876..adb2de5 100644 --- a/docs/tapeticmd.rst +++ b/docs/tapeticmd.rst @@ -82,6 +82,12 @@ Read messages from disk as previously exported and publish them to a queue. -e, --exchange If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue. +--skip + Number of messages in the input to skip. Useful if a previous import was interrupted. + +-n , --maxcount + Maximum number of messages to import. If not specified all messages are imported. + -s , --serialization The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information. From b3ea711c3b895d1a11def19f6c47049f9222f1a7 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 15 Sep 2021 20:16:15 +0200 Subject: [PATCH 4/7] Implemented skip parameter for export and shovel commands --- Tapeti.Cmd/Verbs/ExportVerb.cs | 40 ++++++++++++++++++++++------------ Tapeti.Cmd/Verbs/ShovelVerb.cs | 20 +++++++++++++++++ docs/tapeticmd.rst | 6 +++++ 3 files changed, 52 insertions(+), 14 deletions(-) diff --git a/Tapeti.Cmd/Verbs/ExportVerb.cs b/Tapeti.Cmd/Verbs/ExportVerb.cs index 9e62c9d..db7b023 100644 --- a/Tapeti.Cmd/Verbs/ExportVerb.cs +++ b/Tapeti.Cmd/Verbs/ExportVerb.cs @@ -23,6 +23,9 @@ namespace Tapeti.Cmd.Verbs [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")] public bool RemoveMessages { get; set; } + [Option("skip", HelpText = "(Default: 0) Number of messages in the queue to skip. Useful if a previous non-removing export was interrupted.", Default = 0)] + public int Skip { get; set; } + [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] public int? MaxCount { get; set; } } @@ -52,6 +55,11 @@ namespace Tapeti.Cmd.Verbs using var channel = connection.CreateModel(); var totalCount = (int)channel.MessageCount(options.QueueName); + + var skip = Math.Max(options.Skip, 0); + if (skip > 0) + totalCount -= Math.Min(skip, totalCount); + if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) totalCount = options.MaxCount.Value; @@ -67,24 +75,28 @@ namespace Tapeti.Cmd.Verbs // No more messages on the queue break; - messageCount++; - - messageSerializer.Serialize(new Message + if (skip > 0) + skip--; + else { - DeliveryTag = result.DeliveryTag, - Redelivered = result.Redelivered, - Exchange = result.Exchange, - RoutingKey = result.RoutingKey, - Queue = options.QueueName, - Properties = result.BasicProperties, - Body = result.Body.ToArray() - }); + messageCount++; - if (options.RemoveMessages) - channel.BasicAck(result.DeliveryTag, false); + messageSerializer.Serialize(new Message + { + DeliveryTag = result.DeliveryTag, + Redelivered = result.Redelivered, + Exchange = result.Exchange, + RoutingKey = result.RoutingKey, + Queue = options.QueueName, + Properties = result.BasicProperties, + Body = result.Body.ToArray() + }); + if (options.RemoveMessages) + channel.BasicAck(result.DeliveryTag, false); - progressBar.Report(messageCount); + progressBar.Report(messageCount); + } } } diff --git a/Tapeti.Cmd/Verbs/ShovelVerb.cs b/Tapeti.Cmd/Verbs/ShovelVerb.cs index 891abe8..18e4c4e 100644 --- a/Tapeti.Cmd/Verbs/ShovelVerb.cs +++ b/Tapeti.Cmd/Verbs/ShovelVerb.cs @@ -37,6 +37,9 @@ namespace Tapeti.Cmd.Verbs [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] public string TargetPassword { get; set; } + [Option("skip", HelpText = "(Default: 0) Number of messages in the queue to skip. Useful if a previous non-removing shovel was interrupted.", Default = 0)] + public int Skip { get; set; } + [Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")] public int? MaxRate { get; set; } @@ -93,6 +96,11 @@ namespace Tapeti.Cmd.Verbs var targetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName; var totalCount = (int)sourceChannel.MessageCount(options.QueueName); + + var skip = Math.Max(options.Skip, 0); + if (skip > 0) + totalCount -= Math.Min(skip, totalCount); + if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) totalCount = options.MaxCount.Value; @@ -101,6 +109,18 @@ namespace Tapeti.Cmd.Verbs using (var progressBar = new ProgressBar(console, totalCount)) { + // Perform the skips outside of the rate limiter + while (skip > 0 && !console.Cancelled) + { + var result = sourceChannel.BasicGet(options.QueueName, false); + if (result == null) + // No more messages on the queue + return; + + skip--; + } + + var hasMessage = true; while (!console.Cancelled && hasMessage && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value)) diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst index adb2de5..b271c4b 100644 --- a/docs/tapeticmd.rst +++ b/docs/tapeticmd.rst @@ -51,6 +51,9 @@ Fetches messages from a queue and writes them to disk. -r, --remove If specified messages are acknowledged and removed from the queue. If not messages are kept. +--skip + Number of messages in the input to skip. Useful if a previous non-removing export was interrupted. + -n , --maxcount Maximum number of messages to retrieve from the queue. If not specified all messages are exported. @@ -124,6 +127,9 @@ Reads messages from a queue and publishes them to another queue, optionally to a -r, --remove If specified messages are acknowledged and removed from the queue. If not messages are kept. +--skip + Number of messages in the input to skip. Useful if a previous non-removing shovel was interrupted. + -n , --maxcount Maximum number of messages to retrieve from the queue. If not specified all messages are exported. From c76bce30fa4afdfff9cc669a6ba8c5e5f46fcbb0 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 16 Sep 2021 11:41:03 +0200 Subject: [PATCH 5/7] Added SourceLink reference --- Tapeti.Annotations/Tapeti.Annotations.csproj | 3 +++ Tapeti.Autofac/Tapeti.Autofac.csproj | 3 +++ .../Tapeti.CastleWindsor.csproj | 3 +++ .../Tapeti.DataAnnotations.Extensions.csproj | 3 +++ .../Tapeti.DataAnnotations.csproj | 3 +++ Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj | 3 +++ Tapeti.Flow/Tapeti.Flow.csproj | 3 +++ Tapeti.Ninject/Tapeti.Ninject.csproj | 3 +++ Tapeti.Serilog/Tapeti.Serilog.csproj | 3 +++ .../Tapeti.SimpleInjector.csproj | 3 +++ Tapeti.Transient/Tapeti.Transient.csproj | 3 +++ .../Tapeti.UnityContainer.csproj | 3 +++ Tapeti/Tapeti.csproj | 3 +++ appveyor.yml | 26 +++++++++---------- 14 files changed, 52 insertions(+), 13 deletions(-) diff --git a/Tapeti.Annotations/Tapeti.Annotations.csproj b/Tapeti.Annotations/Tapeti.Annotations.csproj index 11c27dc..7dc38fc 100644 --- a/Tapeti.Annotations/Tapeti.Annotations.csproj +++ b/Tapeti.Annotations/Tapeti.Annotations.csproj @@ -24,4 +24,7 @@ + + + diff --git a/Tapeti.Autofac/Tapeti.Autofac.csproj b/Tapeti.Autofac/Tapeti.Autofac.csproj index dfc6146..b441b47 100644 --- a/Tapeti.Autofac/Tapeti.Autofac.csproj +++ b/Tapeti.Autofac/Tapeti.Autofac.csproj @@ -28,4 +28,7 @@ + + + diff --git a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj index b2623cf..d8546d8 100644 --- a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj +++ b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj @@ -28,4 +28,7 @@ + + + diff --git a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj index 3de7536..0e62b2b 100644 --- a/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj +++ b/Tapeti.DataAnnotations.Extensions/Tapeti.DataAnnotations.Extensions.csproj @@ -28,4 +28,7 @@ + + + diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj index 27bb111..17f465c 100644 --- a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj +++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj @@ -32,4 +32,7 @@ + + + diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj index c23f096..f3aaee5 100644 --- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj +++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj @@ -41,4 +41,7 @@ + + + diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index 40f2f79..b1d4c95 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -29,4 +29,7 @@ + + + diff --git a/Tapeti.Ninject/Tapeti.Ninject.csproj b/Tapeti.Ninject/Tapeti.Ninject.csproj index c16b08e..994a749 100644 --- a/Tapeti.Ninject/Tapeti.Ninject.csproj +++ b/Tapeti.Ninject/Tapeti.Ninject.csproj @@ -28,4 +28,7 @@ + + + diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj index 25d7d29..69efdee 100644 --- a/Tapeti.Serilog/Tapeti.Serilog.csproj +++ b/Tapeti.Serilog/Tapeti.Serilog.csproj @@ -32,4 +32,7 @@ + + + diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj index 050dd5d..de35d46 100644 --- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj @@ -33,4 +33,7 @@ + + + diff --git a/Tapeti.Transient/Tapeti.Transient.csproj b/Tapeti.Transient/Tapeti.Transient.csproj index 02df581..90f345a 100644 --- a/Tapeti.Transient/Tapeti.Transient.csproj +++ b/Tapeti.Transient/Tapeti.Transient.csproj @@ -28,4 +28,7 @@ + + + diff --git a/Tapeti.UnityContainer/Tapeti.UnityContainer.csproj b/Tapeti.UnityContainer/Tapeti.UnityContainer.csproj index 42399ca..bbd37c2 100644 --- a/Tapeti.UnityContainer/Tapeti.UnityContainer.csproj +++ b/Tapeti.UnityContainer/Tapeti.UnityContainer.csproj @@ -28,4 +28,7 @@ + + + diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 4751189..cb55780 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -36,4 +36,7 @@ + + + diff --git a/appveyor.yml b/appveyor.yml index 5680f86..9981577 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -11,19 +11,19 @@ before_build: after_build: # Create NuGet packages - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti\Tapeti.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Annotations\Tapeti.Annotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.DataAnnotations.Extensions\Tapeti.DataAnnotations.Extensions.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Flow\Tapeti.Flow.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Transient\Tapeti.Transient.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Serilog\Tapeti.Serilog.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Autofac\Tapeti.Autofac.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.CastleWindsor\Tapeti.CastleWindsor.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.Ninject\Tapeti.Ninject.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg --output output Tapeti.UnityContainer\Tapeti.UnityContainer.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti\Tapeti.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Annotations\Tapeti.Annotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.DataAnnotations.Extensions\Tapeti.DataAnnotations.Extensions.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Flow\Tapeti.Flow.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Transient\Tapeti.Transient.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Serilog\Tapeti.Serilog.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Autofac\Tapeti.Autofac.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.CastleWindsor\Tapeti.CastleWindsor.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Ninject\Tapeti.Ninject.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.UnityContainer\Tapeti.UnityContainer.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% # Create Tapeti.Cmd release - cmd: dotnet publish -c Release -r win-x64 --self-contained=true -o publish\x64\selfcontained Tapeti.Cmd\Tapeti.Cmd.csproj - cmd: dotnet publish -c Release -r win-x64 --self-contained=false -o publish\x64 Tapeti.Cmd\Tapeti.Cmd.csproj From 04182ad9bdc68edd04c28374a200ce6f795eb194 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 16 Sep 2021 11:44:46 +0200 Subject: [PATCH 6/7] Forget =true --- appveyor.yml | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/appveyor.yml b/appveyor.yml index 9981577..d666e25 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -11,19 +11,19 @@ before_build: after_build: # Create NuGet packages - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti\Tapeti.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Annotations\Tapeti.Annotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.DataAnnotations.Extensions\Tapeti.DataAnnotations.Extensions.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Flow\Tapeti.Flow.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Transient\Tapeti.Transient.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Serilog\Tapeti.Serilog.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Autofac\Tapeti.Autofac.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.CastleWindsor\Tapeti.CastleWindsor.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.Ninject\Tapeti.Ninject.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% - - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl --output output Tapeti.UnityContainer\Tapeti.UnityContainer.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti\Tapeti.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Annotations\Tapeti.Annotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.DataAnnotations.Extensions\Tapeti.DataAnnotations.Extensions.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Flow\Tapeti.Flow.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Transient\Tapeti.Transient.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Serilog\Tapeti.Serilog.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Autofac\Tapeti.Autofac.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.CastleWindsor\Tapeti.CastleWindsor.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.Ninject\Tapeti.Ninject.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% + - cmd: dotnet pack -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true --output output Tapeti.UnityContainer\Tapeti.UnityContainer.csproj /p:Configuration=Release /p:Version=%GitVersion_NuGetVersion% # Create Tapeti.Cmd release - cmd: dotnet publish -c Release -r win-x64 --self-contained=true -o publish\x64\selfcontained Tapeti.Cmd\Tapeti.Cmd.csproj - cmd: dotnet publish -c Release -r win-x64 --self-contained=false -o publish\x64 Tapeti.Cmd\Tapeti.Cmd.csproj From a9a38f2497dac45a36de9bb2f77cc144f7987054 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 16 Sep 2021 12:26:11 +0200 Subject: [PATCH 7/7] Fixed Flow response messages blocking the consumer Caused by non-async disposable payload in MessageContext never being disposed --- Tapeti/Config/IMessageContext.cs | 2 +- Tapeti/Default/MessageContext.cs | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs index ac6a0e5..7d2db6d 100644 --- a/Tapeti/Config/IMessageContext.cs +++ b/Tapeti/Config/IMessageContext.cs @@ -8,7 +8,7 @@ namespace Tapeti.Config /// /// Provides information about the message currently being handled. /// - public interface IMessageContext : IAsyncDisposable, IDisposable + public interface IMessageContext : IAsyncDisposable { /// /// Provides access to the Tapeti config. diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs index 6a001af..40a153c 100644 --- a/Tapeti/Default/MessageContext.cs +++ b/Tapeti/Default/MessageContext.cs @@ -66,21 +66,21 @@ namespace Tapeti.Default } - /// - public void Dispose() - { - foreach (var payload in payloads.Values) - (payload as IDisposable)?.Dispose(); - } - - /// public async ValueTask DisposeAsync() { foreach (var payload in payloads.Values) { - if (payload is IAsyncDisposable asyncDisposable) - await asyncDisposable.DisposeAsync(); + switch (payload) + { + case IAsyncDisposable asyncDisposable: + await asyncDisposable.DisposeAsync(); + break; + + case IDisposable disposable: + disposable.Dispose(); + break; + } } }