From be576a24090a3562a14112eef8fd492df04c7344 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 2 Sep 2021 16:16:11 +0200 Subject: [PATCH 1/4] Implemented #31: Include message details in exception logging (optionally) Refactored IControllerMessageContext into context payloads to get access to it in the exception handler --- Tapeti.Flow/ContextItems.cs | 20 ---- Tapeti.Flow/Default/FlowBindingMiddleware.cs | 6 +- .../Default/FlowContinuationMiddleware.cs | 62 +++++++----- Tapeti.Flow/Default/FlowHandlerContext.cs | 13 ++- Tapeti.Flow/Default/FlowProvider.cs | 16 ++-- Tapeti.Flow/FlowMessageContextPayload.cs | 33 +++++++ Tapeti.Flow/IFlowHandlerContext.cs | 4 +- Tapeti.Serilog/TapetiSeriLogger.cs | 44 ++++++++- .../Config/ControllerMessageContextPayload.cs | 32 +++++++ Tapeti/Config/IControllerBindingContext.cs | 4 +- Tapeti/Config/IControllerCleanupMiddleware.cs | 2 +- Tapeti/Config/IControllerFilterMiddleware.cs | 2 +- Tapeti/Config/IControllerMessageContext.cs | 20 ---- Tapeti/Config/IControllerMessageMiddleware.cs | 2 +- Tapeti/Config/IMessageContext.cs | 50 ++++++++++ Tapeti/Connection/TapetiConsumer.cs | 4 + Tapeti/Default/ConsoleLogger.cs | 35 +++++-- Tapeti/Default/ControllerMessageContext.cs | 71 -------------- Tapeti/Default/ControllerMethodBinding.cs | 38 ++++---- Tapeti/Default/MessageContext.cs | 95 +++++++++++++++++-- 20 files changed, 356 insertions(+), 197 deletions(-) delete mode 100644 Tapeti.Flow/ContextItems.cs create mode 100644 Tapeti.Flow/FlowMessageContextPayload.cs create mode 100644 Tapeti/Config/ControllerMessageContextPayload.cs delete mode 100644 Tapeti/Config/IControllerMessageContext.cs delete mode 100644 Tapeti/Default/ControllerMessageContext.cs diff --git a/Tapeti.Flow/ContextItems.cs b/Tapeti.Flow/ContextItems.cs deleted file mode 100644 index d82ae7d..0000000 --- a/Tapeti.Flow/ContextItems.cs +++ /dev/null @@ -1,20 +0,0 @@ -namespace Tapeti.Flow -{ - /// - /// Key names as used in the message context store. For internal use. - /// - public static class ContextItems - { - /// - /// Key given to the FlowContext object as stored in the message context. - /// - public const string FlowContext = "Tapeti.Flow.FlowContext"; - - /// - /// Indicates if the current message handler is the last one to be called before a - /// parallel flow is done and the convergeMethod will be called. - /// Temporarily disables storing the flow state. - /// - public const string FlowIsConverging = "Tapeti.Flow.IsConverging"; - } -} diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 8adc5a5..1e21847 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -74,16 +74,16 @@ namespace Tapeti.Flow.Default } - private static Task HandleYieldPoint(IControllerMessageContext context, IYieldPoint yieldPoint) + private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint) { var flowHandler = context.Config.DependencyResolver.Resolve(); return flowHandler.Execute(new FlowHandlerContext(context), yieldPoint); } - private static Task HandleParallelResponse(IControllerMessageContext context) + private static Task HandleParallelResponse(IMessageContext context) { - if (context.Get(ContextItems.FlowIsConverging, out _)) + if (context.TryGet(out var flowPayload) && flowPayload.FlowIsConverging) return Task.CompletedTask; var flowHandler = context.Config.DependencyResolver.Resolve(); diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index b07c819..3f34b49 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -12,24 +12,31 @@ namespace Tapeti.Flow.Default /// internal class FlowContinuationMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware { - public async Task Filter(IControllerMessageContext context, Func next) + public async Task Filter(IMessageContext context, Func next) { + if (!context.TryGet(out var controllerPayload)) + return; + var flowContext = await EnrichWithFlowContext(context); if (flowContext?.ContinuationMetadata == null) return; - if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method)) + if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method)) return; await next(); } - public async Task Handle(IControllerMessageContext context, Func next) + public async Task Handle(IMessageContext context, Func next) { - if (context.Get(ContextItems.FlowContext, out FlowContext flowContext)) - { - Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller); + if (!context.TryGet(out var controllerPayload)) + return; + + if (context.TryGet(out var flowPayload)) + { + var flowContext = flowPayload.FlowContext; + Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, controllerPayload.Controller); // Remove Continuation now because the IYieldPoint result handler will store the new state flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); @@ -38,28 +45,33 @@ namespace Tapeti.Flow.Default if (converge) // Indicate to the FlowBindingMiddleware that the state must not to be stored - context.Store(ContextItems.FlowIsConverging, null); + flowPayload.FlowIsConverging = true; await next(); if (converge) - await CallConvergeMethod(context, - flowContext.ContinuationMetadata.ConvergeMethodName, - flowContext.ContinuationMetadata.ConvergeMethodSync); + await CallConvergeMethod(context, controllerPayload, + flowContext.ContinuationMetadata.ConvergeMethodName, + flowContext.ContinuationMetadata.ConvergeMethodSync); } else await next(); } - public async Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next) + public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func next) { await next(); - if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext)) + if (!context.TryGet(out var controllerPayload)) return; - if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method)) + if (!context.TryGet(out var flowPayload)) + return; + + var flowContext = flowPayload.FlowContext; + + if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method)) // Do not call when the controller method was filtered, if the same message has two methods return; @@ -76,10 +88,10 @@ namespace Tapeti.Flow.Default - private static async Task EnrichWithFlowContext(IControllerMessageContext context) + private static async Task EnrichWithFlowContext(IMessageContext context) { - if (context.Get(ContextItems.FlowContext, out FlowContext flowContext)) - return flowContext; + if (context.TryGet(out var flowPayload)) + return flowPayload.FlowContext; if (context.Properties.CorrelationId == null) @@ -100,7 +112,7 @@ namespace Tapeti.Flow.Default if (flowState == null) return null; - flowContext = new FlowContext + var flowContext = new FlowContext { HandlerContext = new FlowHandlerContext(context), @@ -112,26 +124,28 @@ namespace Tapeti.Flow.Default }; // IDisposable items in the IMessageContext are automatically disposed - context.Store(ContextItems.FlowContext, flowContext); + context.Store(new FlowMessageContextPayload(flowContext)); return flowContext; } - private static async Task CallConvergeMethod(IControllerMessageContext context, string methodName, bool sync) + private static async Task CallConvergeMethod(IMessageContext context, ControllerMessageContextPayload controllerPayload, string methodName, bool sync) { IYieldPoint yieldPoint; + + - var method = context.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance); + var method = controllerPayload.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance); if (method == null) - throw new ArgumentException($"Unknown converge method in controller {context.Controller.GetType().Name}: {methodName}"); + throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {methodName}"); if (sync) - yieldPoint = (IYieldPoint)method.Invoke(context.Controller, new object[] {}); + yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] {}); else - yieldPoint = await (Task)method.Invoke(context.Controller, new object[] { }); + yieldPoint = await (Task)method.Invoke(controllerPayload.Controller, new object[] { }); if (yieldPoint == null) - throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for converge method {methodName}"); + throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {methodName}"); var flowHandler = context.Config.DependencyResolver.Resolve(); await flowHandler.Execute(new FlowHandlerContext(context), yieldPoint); diff --git a/Tapeti.Flow/Default/FlowHandlerContext.cs b/Tapeti.Flow/Default/FlowHandlerContext.cs index 6bcbab9..c1e8c38 100644 --- a/Tapeti.Flow/Default/FlowHandlerContext.cs +++ b/Tapeti.Flow/Default/FlowHandlerContext.cs @@ -18,15 +18,18 @@ namespace Tapeti.Flow.Default /// /// - public FlowHandlerContext(IControllerMessageContext source) + public FlowHandlerContext(IMessageContext source) { if (source == null) return; + if (!source.TryGet(out var controllerPayload)) + return; + Config = source.Config; - Controller = source.Controller; - Method = source.Binding.Method; - ControllerMessageContext = source; + Controller = controllerPayload.Controller; + Method = controllerPayload.Binding.Method; + MessageContext = source; } @@ -45,6 +48,6 @@ namespace Tapeti.Flow.Default public MethodInfo Method { get; set; } /// - public IControllerMessageContext ControllerMessageContext { get; set; } + public IMessageContext MessageContext { get; set; } } } diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index d7b28b4..821328e 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -162,16 +162,16 @@ namespace Tapeti.Flow.Default private static ReplyMetadata GetReply(IFlowHandlerContext context) { - var requestAttribute = context.ControllerMessageContext?.Message?.GetType().GetCustomAttribute(); + var requestAttribute = context.MessageContext?.Message?.GetType().GetCustomAttribute(); if (requestAttribute?.Response == null) return null; return new ReplyMetadata { - CorrelationId = context.ControllerMessageContext.Properties.CorrelationId, - ReplyTo = context.ControllerMessageContext.Properties.ReplyTo, + CorrelationId = context.MessageContext.Properties.CorrelationId, + ReplyTo = context.MessageContext.Properties.ReplyTo, ResponseTypeName = requestAttribute.Response.FullName, - Mandatory = context.ControllerMessageContext.Properties.Persistent.GetValueOrDefault(true) + Mandatory = context.MessageContext.Properties.Persistent.GetValueOrDefault(true) }; } @@ -206,8 +206,8 @@ namespace Tapeti.Flow.Default try { - var messageContext = context.ControllerMessageContext; - if (messageContext == null || !messageContext.Get(ContextItems.FlowContext, out flowContext)) + var messageContext = context.MessageContext; + if (messageContext == null || !messageContext.TryGet(out var flowPayload)) { flowContext = new FlowContext { @@ -218,7 +218,9 @@ namespace Tapeti.Flow.Default // in the messageContext as the yield point is the last to execute. disposeFlowContext = true; } - + else + flowContext = flowPayload.FlowContext; + try { await executableYieldPoint.Execute(flowContext); diff --git a/Tapeti.Flow/FlowMessageContextPayload.cs b/Tapeti.Flow/FlowMessageContextPayload.cs new file mode 100644 index 0000000..6fb97ce --- /dev/null +++ b/Tapeti.Flow/FlowMessageContextPayload.cs @@ -0,0 +1,33 @@ +using System; +using Tapeti.Config; +using Tapeti.Flow.Default; + +namespace Tapeti.Flow +{ + /// + /// Contains information about the flow for the current message. For internal use. + /// + internal class FlowMessageContextPayload : IMessageContextPayload, IDisposable + { + public FlowContext FlowContext { get; } + + /// + /// Indicates if the current message handler is the last one to be called before a + /// parallel flow is done and the convergeMethod will be called. + /// Temporarily disables storing the flow state. + /// + public bool FlowIsConverging { get; set; } + + + public FlowMessageContextPayload(FlowContext flowContext) + { + FlowContext = flowContext; + } + + + public void Dispose() + { + FlowContext?.Dispose(); + } + } +} diff --git a/Tapeti.Flow/IFlowHandlerContext.cs b/Tapeti.Flow/IFlowHandlerContext.cs index 08cce12..921dd4e 100644 --- a/Tapeti.Flow/IFlowHandlerContext.cs +++ b/Tapeti.Flow/IFlowHandlerContext.cs @@ -29,9 +29,9 @@ namespace Tapeti.Flow /// - /// Access to the controller message context if this is a continuated flow. + /// Access to the message context if this is a continuated flow. /// Will be null when in a starting flow. /// - IControllerMessageContext ControllerMessageContext { get; } + IMessageContext MessageContext { get; } } } diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs index a05d11c..98c7864 100644 --- a/Tapeti.Serilog/TapetiSeriLogger.cs +++ b/Tapeti.Serilog/TapetiSeriLogger.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Text; using Tapeti.Config; using ISerilogLogger = Serilog.ILogger; @@ -12,6 +13,21 @@ namespace Tapeti.Serilog /// public class TapetiSeriLogger: IBindingLogger { + /// + /// Implements the Tapeti ILogger interface for Serilog output. This version + /// includes the message body and information if available when an error occurs. + /// + public class WithMessageLogging : TapetiSeriLogger + { + /// + public WithMessageLogging(ISerilogLogger seriLogger) : base(seriLogger) { } + + internal override bool IncludeMessageInfo() => true; + } + + + + private readonly ISerilogLogger seriLogger; @@ -69,20 +85,38 @@ namespace Tapeti.Serilog /// public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult) { + var message = new StringBuilder("Tapeti: exception in message handler"); + var messageParams = new List(); + var contextLogger = seriLogger .ForContext("consumeResult", consumeResult) .ForContext("exchange", messageContext.Exchange) .ForContext("queue", messageContext.Queue) .ForContext("routingKey", messageContext.RoutingKey); - if (messageContext is IControllerMessageContext controllerMessageContext) + if (messageContext.TryGet(out var controllerPayload)) { contextLogger = contextLogger - .ForContext("controller", controllerMessageContext.Binding.Controller.FullName) - .ForContext("method", controllerMessageContext.Binding.Method.Name); + .ForContext("controller", controllerPayload.Binding.Controller.FullName) + .ForContext("method", controllerPayload.Binding.Method.Name); + + message.Append(" {controller}.{method}"); + messageParams.Add(controllerPayload.Binding.Controller.FullName); + messageParams.Add(controllerPayload.Binding.Method.Name); + } + + if (IncludeMessageInfo()) + { + message.Append(" on exchange {exchange}, queue {queue}, routingKey {routingKey}, replyTo {replyTo}, correlationId {correlationId} with body {body}"); + messageParams.Add(messageContext.Exchange); + messageParams.Add(messageContext.Queue); + messageParams.Add(messageContext.RoutingKey); + messageParams.Add(messageContext.Properties.ReplyTo); + messageParams.Add(messageContext.Properties.CorrelationId); + messageParams.Add(messageContext.RawBody != null ? Encoding.UTF8.GetString(messageContext.RawBody) : null); } - contextLogger.Error(exception, "Tapeti: exception in message handler"); + contextLogger.Error(exception, message.ToString(), messageParams.ToArray()); } /// @@ -134,5 +168,7 @@ namespace Tapeti.Serilog else seriLogger.Information("Tapeti: obsolete queue {queue} has been unbound but not yet deleted, {messageCount} messages remaining", queueName, messageCount); } + + internal virtual bool IncludeMessageInfo() => false; } } diff --git a/Tapeti/Config/ControllerMessageContextPayload.cs b/Tapeti/Config/ControllerMessageContextPayload.cs new file mode 100644 index 0000000..700d8a3 --- /dev/null +++ b/Tapeti/Config/ControllerMessageContextPayload.cs @@ -0,0 +1,32 @@ +namespace Tapeti.Config +{ + /// + /// + /// Extends the message context with information about the controller. + /// + public class ControllerMessageContextPayload : IMessageContextPayload + { + /// + /// An instance of the controller referenced by the binding. Note: can be null during Cleanup. + /// + public object Controller { get; } + + + /// + /// Provides access to the binding which is currently processing the message. + /// + public IControllerMethodBinding Binding { get; } + + + /// + /// Constructs the payload to enrich the message context with information about the controller. + /// + /// An instance of the controller referenced by the binding + /// The binding which is currently processing the message + public ControllerMessageContextPayload(object controller, IControllerMethodBinding binding) + { + Controller = controller; + Binding = binding; + } + } +} diff --git a/Tapeti/Config/IControllerBindingContext.cs b/Tapeti/Config/IControllerBindingContext.cs index 1c21cd7..37fb4d4 100644 --- a/Tapeti/Config/IControllerBindingContext.cs +++ b/Tapeti/Config/IControllerBindingContext.cs @@ -11,7 +11,7 @@ namespace Tapeti.Config /// Injects a value for a controller method parameter. /// /// - public delegate object ValueFactory(IControllerMessageContext context); + public delegate object ValueFactory(IMessageContext context); /// @@ -19,7 +19,7 @@ namespace Tapeti.Config /// /// /// - public delegate Task ResultHandler(IControllerMessageContext context, object value); + public delegate Task ResultHandler(IMessageContext context, object value); /// diff --git a/Tapeti/Config/IControllerCleanupMiddleware.cs b/Tapeti/Config/IControllerCleanupMiddleware.cs index 2f16269..86ef003 100644 --- a/Tapeti/Config/IControllerCleanupMiddleware.cs +++ b/Tapeti/Config/IControllerCleanupMiddleware.cs @@ -14,6 +14,6 @@ namespace Tapeti.Config /// /// /// Always call to allow the next in the chain to clean up - Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next); + Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func next); } } diff --git a/Tapeti/Config/IControllerFilterMiddleware.cs b/Tapeti/Config/IControllerFilterMiddleware.cs index ec8391a..6a30e20 100644 --- a/Tapeti/Config/IControllerFilterMiddleware.cs +++ b/Tapeti/Config/IControllerFilterMiddleware.cs @@ -15,6 +15,6 @@ namespace Tapeti.Config /// /// /// - Task Filter(IControllerMessageContext context, Func next); + Task Filter(IMessageContext context, Func next); } } diff --git a/Tapeti/Config/IControllerMessageContext.cs b/Tapeti/Config/IControllerMessageContext.cs deleted file mode 100644 index 16b650d..0000000 --- a/Tapeti/Config/IControllerMessageContext.cs +++ /dev/null @@ -1,20 +0,0 @@ -namespace Tapeti.Config -{ - /// - /// - /// Extends the message context with information about the controller. - /// - public interface IControllerMessageContext : IMessageContext - { - /// - /// An instance of the controller referenced by the binding. Note: is null during Cleanup. - /// - object Controller { get; } - - - /// - /// Provides access to the binding which is currently processing the message. - /// - new IControllerMethodBinding Binding { get; } - } -} diff --git a/Tapeti/Config/IControllerMessageMiddleware.cs b/Tapeti/Config/IControllerMessageMiddleware.cs index 65e777f..c381270 100644 --- a/Tapeti/Config/IControllerMessageMiddleware.cs +++ b/Tapeti/Config/IControllerMessageMiddleware.cs @@ -14,6 +14,6 @@ namespace Tapeti.Config /// /// /// Call to pass the message to the next handler in the chain or call the controller method - Task Handle(IControllerMessageContext context, Func next); + Task Handle(IMessageContext context, Func next); } } diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs index 7c23127..e3a0e05 100644 --- a/Tapeti/Config/IMessageContext.cs +++ b/Tapeti/Config/IMessageContext.cs @@ -1,5 +1,7 @@ using System; +// ReSharper disable UnusedMemberInSuper.Global - public API + namespace Tapeti.Config { /// @@ -27,6 +29,11 @@ namespace Tapeti.Config /// string RoutingKey { get; } + /// + /// Contains the raw body of the message. + /// + byte[] RawBody { get; } + /// /// Contains the decoded message instance. /// @@ -42,6 +49,36 @@ namespace Tapeti.Config /// IBinding Binding { get; } + /// + /// Stores additional properties in the message context which can be passed between middleware stages. + /// + /// + /// Only one instance of type T is stored, if Enrich was called before for this type an InvalidOperationException will be thrown. + /// + /// A class implementing IMessageContextPayload + void Store(T payload) where T : IMessageContextPayload; + + /// + /// Stored a new payload, or updates an existing one. + /// + /// A method returning the new payload to be stored + /// A method called when the payload exists + /// The payload type as passed to Enrich + void StoreOrUpdate(Func onAdd, Action onUpdate) where T : IMessageContextPayload; + + /// + /// Returns the properties as previously stored with Enrich. Throws a KeyNotFoundException + /// if the payload is not stored in this message context. + /// + /// The payload type as passed to Enrich + T Get() where T : IMessageContextPayload; + + + /// + /// Returns true and the payload value if this message context was previously enriched with the payload T. + /// + /// The payload type as passed to Enrich + bool TryGet(out T payload) where T : IMessageContextPayload; /// /// Stores a key-value pair in the context for passing information between the various @@ -49,6 +86,7 @@ namespace Tapeti.Config /// /// A unique key. It is recommended to prefix it with the package name which hosts the middleware to prevent conflicts /// Will be disposed if the value implements IDisposable or IAsyncDisposable + [Obsolete("For backwards compatibility only. Use Store payload for typed properties instead")] void Store(string key, object value); /// @@ -57,6 +95,18 @@ namespace Tapeti.Config /// /// /// True if the value was found, False otherwise + [Obsolete("For backwards compatibility only. Use Get payload overload for typed properties instead")] bool Get(string key, out T value) where T : class; } + + + /// + /// Base interface for additional properties added to the message context. + /// + /// + /// Descendants implementing IDisposable or IAsyncDisposable will be disposed along with the message context. + /// + public interface IMessageContextPayload + { + } } diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 014fb20..58fddeb 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -57,6 +57,7 @@ namespace Tapeti.Connection return await DispatchMessage(message, new MessageContextData { + RawBody = body, Exchange = exchange, RoutingKey = routingKey, Properties = properties @@ -70,6 +71,7 @@ namespace Tapeti.Connection Queue = queueName, Exchange = exchange, RoutingKey = routingKey, + RawBody = body, Message = message, Properties = properties, Binding = null @@ -112,6 +114,7 @@ namespace Tapeti.Connection Queue = queueName, Exchange = messageContextData.Exchange, RoutingKey = messageContextData.RoutingKey, + RawBody = messageContextData.RawBody, Message = message, Properties = messageContextData.Properties, Binding = binding @@ -174,6 +177,7 @@ namespace Tapeti.Connection private struct MessageContextData { + public byte[] RawBody; public string Exchange; public string RoutingKey; public IMessageProperties Properties; diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index 41a8c08..d2787f3 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -11,6 +11,19 @@ namespace Tapeti.Default /// public class ConsoleLogger : IBindingLogger { + /// + /// Default ILogger implementation for console applications. This version + /// includes the message body if available when an error occurs. + /// + public class WithMessageLogging : ConsoleLogger + { + /// + public WithMessageLogging() : base() { } + + internal override bool IncludeMessageBody() => true; + } + + /// public void Connect(IConnectContext connectContext) { @@ -39,17 +52,23 @@ namespace Tapeti.Default public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult) { Console.WriteLine("[Tapeti] Exception while handling message"); - Console.WriteLine($" Result : {consumeResult}"); - Console.WriteLine($" Exchange : {messageContext.Exchange}"); - Console.WriteLine($" Queue : {messageContext.Queue}"); - Console.WriteLine($" RoutingKey : {messageContext.RoutingKey}"); + Console.WriteLine($" Result : {consumeResult}"); + Console.WriteLine($" Exchange : {messageContext.Exchange}"); + Console.WriteLine($" Queue : {messageContext.Queue}"); + Console.WriteLine($" RoutingKey : {messageContext.RoutingKey}"); + Console.WriteLine($" ReplyTo : {messageContext.Properties.ReplyTo}"); + Console.WriteLine($" CorrelationId : {messageContext.Properties.CorrelationId}"); - if (messageContext is IControllerMessageContext controllerMessageContext) + if (messageContext.TryGet(out var controllerPayload)) { - Console.WriteLine($" Controller : {controllerMessageContext.Binding.Controller.FullName}"); - Console.WriteLine($" Method : {controllerMessageContext.Binding.Method.Name}"); + Console.WriteLine($" Controller : {controllerPayload.Binding.Controller.FullName}"); + Console.WriteLine($" Method : {controllerPayload.Binding.Method.Name}"); } + if (IncludeMessageBody()) + Console.WriteLine($" Body : {(messageContext.RawBody != null ? Encoding.UTF8.GetString(messageContext.RawBody) : "")}"); + + Console.WriteLine(); Console.WriteLine(exception); } @@ -102,5 +121,7 @@ namespace Tapeti.Default ? $"[Tapeti] Obsolete queue was deleted: {queueName}" : $"[Tapeti] Obsolete queue bindings removed: {queueName}, {messageCount} messages remaining"); } + + internal virtual bool IncludeMessageBody() => false; } } diff --git a/Tapeti/Default/ControllerMessageContext.cs b/Tapeti/Default/ControllerMessageContext.cs deleted file mode 100644 index 3d8fb55..0000000 --- a/Tapeti/Default/ControllerMessageContext.cs +++ /dev/null @@ -1,71 +0,0 @@ -using System.Threading.Tasks; -using Tapeti.Config; - -namespace Tapeti.Default -{ - internal class ControllerMessageContext : IControllerMessageContext - { - private readonly IMessageContext decoratedContext; - - /// - public object Controller { get; set; } - - /// - public ITapetiConfig Config => decoratedContext.Config; - - /// - public string Queue => decoratedContext.Queue; - - /// - public string Exchange => decoratedContext.Exchange; - - /// - public string RoutingKey => decoratedContext.RoutingKey; - - /// - public object Message => decoratedContext.Message; - - /// - public IMessageProperties Properties => decoratedContext.Properties; - - - IBinding IMessageContext.Binding => decoratedContext.Binding; - IControllerMethodBinding IControllerMessageContext.Binding => decoratedContext.Binding as IControllerMethodBinding; - - - public ControllerMessageContext(IMessageContext decoratedContext) - { - this.decoratedContext = decoratedContext; - } - - - /// - public void Dispose() - { - // Do not call decoratedContext.Dispose - by design - } - - - /// - public ValueTask DisposeAsync() - { - // Do not call decoratedContext.DisposeAsync - by design - return default; - } - - - - /// - public void Store(string key, object value) - { - decoratedContext.Store(key, value); - } - - - /// - public bool Get(string key, out T value) where T : class - { - return decoratedContext.Get(key, out value); - } - } -} diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index bedb7cd..03b44da 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -160,39 +160,30 @@ namespace Tapeti.Default public async Task Invoke(IMessageContext context) { var controller = dependencyResolver.Resolve(bindingInfo.ControllerType); - - await using var controllerContext = new ControllerMessageContext(context) - { - Controller = controller - }; + context.Store(new ControllerMessageContextPayload(controller, context.Binding as IControllerMethodBinding)); - if (!await FilterAllowed(controllerContext)) + if (!await FilterAllowed(context)) return; await MiddlewareHelper.GoAsync( bindingInfo.MessageMiddleware, - async (handler, next) => await handler.Handle(controllerContext, next), - async () => await messageHandler(controllerContext)); + async (handler, next) => await handler.Handle(context, next), + async () => await messageHandler(context)); } /// public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult) { - await using var controllerContext = new ControllerMessageContext(context) - { - Controller = null - }; - await MiddlewareHelper.GoAsync( bindingInfo.CleanupMiddleware, - async (handler, next) => await handler.Cleanup(controllerContext, consumeResult, next), + async (handler, next) => await handler.Cleanup(context, consumeResult, next), () => Task.CompletedTask); } - private async Task FilterAllowed(IControllerMessageContext context) + private async Task FilterAllowed(IMessageContext context) { var allowed = false; await MiddlewareHelper.GoAsync( @@ -208,7 +199,7 @@ namespace Tapeti.Default } - private delegate Task MessageHandlerFunc(IControllerMessageContext context); + private delegate Task MessageHandlerFunc(IMessageContext context); private MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable parameterFactories, ResultHandler resultHandler) @@ -233,9 +224,10 @@ namespace Tapeti.Default { return context => { + var controllerPayload = context.Get(); try { - var result = method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()); + var result = method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()); return resultHandler(context, result); } catch (Exception e) @@ -250,9 +242,10 @@ namespace Tapeti.Default { return context => { + var controllerPayload = context.Get(); try { - method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()); + method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()); return Task.CompletedTask; } catch (Exception e) @@ -268,9 +261,10 @@ namespace Tapeti.Default { return context => { + var controllerPayload = context.Get(); try { - return (Task) method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()); + return (Task) method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()); } catch (Exception e) { @@ -285,9 +279,10 @@ namespace Tapeti.Default { return context => { + var controllerPayload = context.Get(); try { - return (Task)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()); + return (Task)method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()); } catch (Exception e) { @@ -302,9 +297,10 @@ namespace Tapeti.Default { return context => { + var controllerPayload = context.Get(); try { - return Task.FromResult(method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray())); + return Task.FromResult(method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray())); } catch (Exception e) { diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs index 1729dc3..6a001af 100644 --- a/Tapeti/Default/MessageContext.cs +++ b/Tapeti/Default/MessageContext.cs @@ -7,7 +7,7 @@ namespace Tapeti.Default { internal class MessageContext : IMessageContext { - private readonly Dictionary items = new(); + private readonly Dictionary payloads = new(); /// @@ -22,6 +22,9 @@ namespace Tapeti.Default /// public string RoutingKey { get; set; } + /// + public byte[] RawBody { get; set; } + /// public object Message { get; set; } @@ -31,21 +34,52 @@ namespace Tapeti.Default /// public IBinding Binding { get; set; } + + public void Store(T payload) where T : IMessageContextPayload + { + payloads.Add(typeof(T), payload); + } + + public void StoreOrUpdate(Func onAdd, Action onUpdate) where T : IMessageContextPayload + { + if (payloads.TryGetValue(typeof(T), out var payload)) + onUpdate((T)payload); + else + payloads.Add(typeof(T), onAdd()); + } + + public T Get() where T : IMessageContextPayload + { + return (T)payloads[typeof(T)]; + } + + public bool TryGet(out T payload) where T : IMessageContextPayload + { + if (payloads.TryGetValue(typeof(T), out var payloadValue)) + { + payload = (T)payloadValue; + return true; + } + + payload = default; + return false; + } + /// public void Dispose() { - foreach (var item in items.Values) - (item as IDisposable)?.Dispose(); + foreach (var payload in payloads.Values) + (payload as IDisposable)?.Dispose(); } /// public async ValueTask DisposeAsync() { - foreach (var item in items.Values) + foreach (var payload in payloads.Values) { - if (item is IAsyncDisposable asyncDisposable) + if (payload is IAsyncDisposable asyncDisposable) await asyncDisposable.DisposeAsync(); } } @@ -55,21 +89,66 @@ namespace Tapeti.Default /// public void Store(string key, object value) { - items.Add(key, value); + StoreOrUpdate( + () => new KeyValuePayload(key, value), + payload => payload.Add(key, value)); } /// public bool Get(string key, out T value) where T : class { - if (!items.TryGetValue(key, out var objectValue)) + if (!TryGet(out var payload) || + !payload.TryGetValue(key, out var objectValue)) { - value = default(T); + value = null; return false; } value = (T)objectValue; return true; } + + + // ReSharper disable once InconsistentNaming + public class KeyValuePayload : IMessageContextPayload, IDisposable, IAsyncDisposable + { + private readonly Dictionary items = new(); + + + public KeyValuePayload(string key, object value) + { + Add(key, value); + } + + + public void Add(string key, object value) + { + items.Add(key, value); + } + + + public bool TryGetValue(string key, out object value) + { + return items.TryGetValue(key, out value); + } + + + public void Dispose() + { + foreach (var item in items.Values) + (item as IDisposable)?.Dispose(); + } + + + public async ValueTask DisposeAsync() + { + foreach (var item in items.Values) + { + if (item is IAsyncDisposable asyncDisposable) + await asyncDisposable.DisposeAsync(); + } + } + } } } From 0bed6a8f92aaa130fead627522030e0aca9bdfca Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Sat, 4 Sep 2021 11:33:59 +0200 Subject: [PATCH 2/4] Added progress indicators to Tapeti.Cmd Refactored Tapeti.Cmd internals --- Tapeti.Cmd/ASCII/ProgressBar.cs | 103 +++ Tapeti.Cmd/Commands/ExportCommand.cs | 46 -- Tapeti.Cmd/Commands/ImportCommand.cs | 33 - Tapeti.Cmd/Commands/ShovelCommand.cs | 43 -- Tapeti.Cmd/Parser/BindingParser.cs | 23 + Tapeti.Cmd/Program.cs | 629 ++---------------- .../EasyNetQMessageSerializer.cs | 19 +- .../Serialization/IMessageSerializer.cs | 2 + .../SingleFileJSONMessageSerializer.cs | 54 +- Tapeti.Cmd/Tapeti.Cmd.csproj | 1 + Tapeti.Cmd/Verbs/BaseConnectionOptions.cs | 22 + .../Verbs/BaseMessageSerializerOptions.cs | 17 + Tapeti.Cmd/Verbs/BindQueueVerb.cs | 54 ++ Tapeti.Cmd/Verbs/DeclareQueueVerb.cs | 57 ++ Tapeti.Cmd/Verbs/ExampleVerb.cs | 53 ++ Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs | 30 + Tapeti.Cmd/Verbs/ExportVerb.cs | 121 ++++ Tapeti.Cmd/Verbs/ImportVerb.cs | 149 +++++ Tapeti.Cmd/Verbs/PurgeVerb.cs | 58 ++ Tapeti.Cmd/Verbs/RemoveQueueVerb.cs | 90 +++ Tapeti.Cmd/Verbs/ShovelVerb.cs | 181 +++++ Tapeti.Cmd/Verbs/UnbindQueueVerb.cs | 54 ++ 22 files changed, 1111 insertions(+), 728 deletions(-) create mode 100644 Tapeti.Cmd/ASCII/ProgressBar.cs delete mode 100644 Tapeti.Cmd/Commands/ExportCommand.cs delete mode 100644 Tapeti.Cmd/Commands/ImportCommand.cs delete mode 100644 Tapeti.Cmd/Commands/ShovelCommand.cs create mode 100644 Tapeti.Cmd/Parser/BindingParser.cs create mode 100644 Tapeti.Cmd/Verbs/BaseConnectionOptions.cs create mode 100644 Tapeti.Cmd/Verbs/BaseMessageSerializerOptions.cs create mode 100644 Tapeti.Cmd/Verbs/BindQueueVerb.cs create mode 100644 Tapeti.Cmd/Verbs/DeclareQueueVerb.cs create mode 100644 Tapeti.Cmd/Verbs/ExampleVerb.cs create mode 100644 Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs create mode 100644 Tapeti.Cmd/Verbs/ExportVerb.cs create mode 100644 Tapeti.Cmd/Verbs/ImportVerb.cs create mode 100644 Tapeti.Cmd/Verbs/PurgeVerb.cs create mode 100644 Tapeti.Cmd/Verbs/RemoveQueueVerb.cs create mode 100644 Tapeti.Cmd/Verbs/ShovelVerb.cs create mode 100644 Tapeti.Cmd/Verbs/UnbindQueueVerb.cs diff --git a/Tapeti.Cmd/ASCII/ProgressBar.cs b/Tapeti.Cmd/ASCII/ProgressBar.cs new file mode 100644 index 0000000..f357457 --- /dev/null +++ b/Tapeti.Cmd/ASCII/ProgressBar.cs @@ -0,0 +1,103 @@ +using System; +using System.Text; + +namespace Tapeti.Cmd.ASCII +{ + public class ProgressBar : IDisposable, IProgress + { + private static readonly TimeSpan UpdateInterval = TimeSpan.FromMilliseconds(20); + + private readonly int max; + private readonly int width; + private readonly bool showPosition; + private int position; + + private readonly bool enabled; + private DateTime lastUpdate = DateTime.MinValue; + private int lastOutputLength; + + + public ProgressBar(int max, int width = 10, bool showPosition = true) + { + if (width <= 0) + throw new ArgumentOutOfRangeException(nameof(width), "Width must be greater than zero"); + + if (max <= 0) + throw new ArgumentOutOfRangeException(nameof(max), "Max must be greater than zero"); + + this.max = max; + this.width = width; + this.showPosition = showPosition; + + enabled = !Console.IsOutputRedirected; + if (!enabled) + return; + + Console.CursorVisible = false; + Redraw(); + } + + + public void Dispose() + { + if (!enabled || lastOutputLength <= 0) + return; + + Console.CursorLeft = 0; + Console.Write(new string(' ', lastOutputLength)); + Console.CursorLeft = 0; + Console.CursorVisible = true; + } + + + public void Report(int value) + { + if (!enabled) + return; + + value = Math.Max(0, Math.Min(max, value)); + position = value; + + var now = DateTime.Now; + if (now - lastUpdate < UpdateInterval) + return; + + lastUpdate = now; + Redraw(); + } + + + private void Redraw() + { + var output = new StringBuilder("["); + + var blockCount = (int)Math.Truncate((decimal)position / max * width); + if (blockCount > 0) + output.Append(new string('#', blockCount)); + + if (blockCount < width) + output.Append(new string('.', width - blockCount)); + + output.Append("] "); + + if (showPosition) + { + output + .Append(position.ToString("N0")).Append(" / ").Append(max.ToString("N0")) + .Append(" (").Append((int) Math.Truncate((decimal) position / max * 100)).Append("%)"); + } + else + output.Append(" ").Append((int)Math.Truncate((decimal)position / max * 100)).Append("%"); + + + var newLength = output.Length; + if (newLength < lastOutputLength) + output.Append(new string(' ', lastOutputLength - output.Length)); + + Console.CursorLeft = 0; + Console.Write(output); + + lastOutputLength = newLength; + } + } +} diff --git a/Tapeti.Cmd/Commands/ExportCommand.cs b/Tapeti.Cmd/Commands/ExportCommand.cs deleted file mode 100644 index 013a9bb..0000000 --- a/Tapeti.Cmd/Commands/ExportCommand.cs +++ /dev/null @@ -1,46 +0,0 @@ -using RabbitMQ.Client; -using Tapeti.Cmd.Serialization; - -namespace Tapeti.Cmd.Commands -{ - public class ExportCommand - { - public IMessageSerializer MessageSerializer { get; set; } - - public string QueueName { get; set; } - public bool RemoveMessages { get; set; } - public int? MaxCount { get; set; } - - - public int Execute(IModel channel) - { - var messageCount = 0; - - while (!MaxCount.HasValue || messageCount < MaxCount.Value) - { - var result = channel.BasicGet(QueueName, false); - if (result == null) - // No more messages on the queue - break; - - messageCount++; - - MessageSerializer.Serialize(new Message - { - DeliveryTag = result.DeliveryTag, - Redelivered = result.Redelivered, - Exchange = result.Exchange, - RoutingKey = result.RoutingKey, - Queue = QueueName, - Properties = result.BasicProperties, - Body = result.Body.ToArray() - }); - - if (RemoveMessages) - channel.BasicAck(result.DeliveryTag, false); - } - - return messageCount; - } - } -} diff --git a/Tapeti.Cmd/Commands/ImportCommand.cs b/Tapeti.Cmd/Commands/ImportCommand.cs deleted file mode 100644 index ecb3eb1..0000000 --- a/Tapeti.Cmd/Commands/ImportCommand.cs +++ /dev/null @@ -1,33 +0,0 @@ -using RabbitMQ.Client; -using Tapeti.Cmd.RateLimiter; -using Tapeti.Cmd.Serialization; - -namespace Tapeti.Cmd.Commands -{ - public class ImportCommand - { - public IMessageSerializer MessageSerializer { get; set; } - - public bool DirectToQueue { get; set; } - - - public int Execute(IModel channel, IRateLimiter rateLimiter) - { - var messageCount = 0; - - foreach (var message in MessageSerializer.Deserialize(channel)) - { - rateLimiter.Execute(() => - { - var exchange = DirectToQueue ? "" : message.Exchange; - var routingKey = DirectToQueue ? message.Queue : message.RoutingKey; - - channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); - messageCount++; - }); - } - - return messageCount; - } - } -} diff --git a/Tapeti.Cmd/Commands/ShovelCommand.cs b/Tapeti.Cmd/Commands/ShovelCommand.cs deleted file mode 100644 index 3de6aca..0000000 --- a/Tapeti.Cmd/Commands/ShovelCommand.cs +++ /dev/null @@ -1,43 +0,0 @@ -using RabbitMQ.Client; -using Tapeti.Cmd.RateLimiter; - -namespace Tapeti.Cmd.Commands -{ - public class ShovelCommand - { - public string QueueName { get; set; } - public string TargetQueueName { get; set; } - public bool RemoveMessages { get; set; } - public int? MaxCount { get; set; } - - - public int Execute(IModel sourceChannel, IModel targetChannel, IRateLimiter rateLimiter) - { - var messageCount = 0; - - while (!MaxCount.HasValue || messageCount < MaxCount.Value) - { - var result = sourceChannel.BasicGet(QueueName, false); - if (result == null) - // No more messages on the queue - break; - - // Since RabbitMQ client 6 we need to copy the body before calling another channel method - // like BasicPublish, or the published body will be corrupted - var bodyCopy = result.Body.ToArray(); - - - rateLimiter.Execute(() => - { - targetChannel.BasicPublish("", TargetQueueName, result.BasicProperties, bodyCopy); - messageCount++; - - if (RemoveMessages) - sourceChannel.BasicAck(result.DeliveryTag, false); - }); - } - - return messageCount; - } - } -} diff --git a/Tapeti.Cmd/Parser/BindingParser.cs b/Tapeti.Cmd/Parser/BindingParser.cs new file mode 100644 index 0000000..b839c73 --- /dev/null +++ b/Tapeti.Cmd/Parser/BindingParser.cs @@ -0,0 +1,23 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Tapeti.Cmd.Parser +{ + public static class BindingParser + { + public static Tuple[] Parse(IEnumerable bindings) + { + return bindings + .Select(b => + { + var parts = b.Split(':'); + if (parts.Length != 2) + throw new InvalidOperationException($"Invalid binding format: {b}"); + + return new Tuple(parts[0], parts[1]); + }) + .ToArray(); + } + } +} diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index 1171015..b1f563a 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -1,602 +1,57 @@ using System; -using System.Collections.Generic; using System.Diagnostics; -using System.IO; using System.Linq; -using System.Text; +using System.Reflection; using CommandLine; -using RabbitMQ.Client; -using RabbitMQ.Client.Exceptions; -using Tapeti.Cmd.Commands; -using Tapeti.Cmd.Mock; -using Tapeti.Cmd.RateLimiter; -using Tapeti.Cmd.Serialization; +using Tapeti.Cmd.Verbs; namespace Tapeti.Cmd { public class Program { - public class CommonOptions - { - [Option('h', "host", HelpText = "Hostname of the RabbitMQ server.", Default = "localhost")] - public string Host { get; set; } - - [Option("port", HelpText = "AMQP port of the RabbitMQ server.", Default = 5672)] - public int Port { get; set; } - - [Option('v', "virtualhost", HelpText = "Virtual host used for the RabbitMQ connection.", Default = "/")] - public string VirtualHost { get; set; } - - [Option('u', "username", HelpText = "Username used to connect to the RabbitMQ server.", Default = "guest")] - public string Username { get; set; } - - [Option('p', "password", HelpText = "Password used to connect to the RabbitMQ server.", Default = "guest")] - public string Password { get; set; } - } - - - public enum SerializationMethod - { - SingleFileJSON, - EasyNetQHosepipe - } - - - public class MessageSerializerOptions : CommonOptions - { - [Option('s', "serialization", HelpText = "The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe.", Default = SerializationMethod.SingleFileJSON)] - public SerializationMethod SerializationMethod { get; set; } - } - - - - [Verb("export", HelpText = "Fetch messages from a queue and write it to disk.")] - public class ExportOptions : MessageSerializerOptions - { - [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")] - public string QueueName { get; set; } - - [Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")] - public string OutputPath { get; set; } - - [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")] - public bool RemoveMessages { get; set; } - - [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] - public int? MaxCount { get; set; } - } - - - [Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")] - public class ImportOptions : MessageSerializerOptions - { - [Option('i', "input", Group = "Input", HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")] - public string InputFile { get; set; } - - [Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] - public string InputMessage { get; set; } - - [Option('c', "pipe", Group = "Input", HelpText = "Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] - public bool InputPipe { get; set; } - - [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] - public bool PublishToExchange { get; set; } - - [Option("maxrate", HelpText = "The maximum amount of messages per second to import.")] - public int? MaxRate { get; set; } - } - - - [Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")] - public class ExampleOptions - { - } - - - [Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")] - public class ShovelOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")] - public string QueueName { get; set; } - - [Option('t', "targetqueue", HelpText = "The target queue to publish the messages to. Defaults to the source queue if a different target host, port or virtualhost is specified. Otherwise it must be different from the source queue.")] - public string TargetQueueName { get; set; } - - [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the source queue. If not messages are kept.")] - public bool RemoveMessages { get; set; } - - [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] - public int? MaxCount { get; set; } - - [Option("targethost", HelpText = "Hostname of the target RabbitMQ server. Defaults to the source host. Note that you may still specify a different targetusername for example.")] - public string TargetHost { get; set; } - - [Option("targetport", HelpText = "AMQP port of the target RabbitMQ server. Defaults to the source port.")] - public int? TargetPort { get; set; } - - [Option("targetvirtualhost", HelpText = "Virtual host used for the target RabbitMQ connection. Defaults to the source virtualhost.")] - public string TargetVirtualHost { get; set; } - - [Option("targetusername", HelpText = "Username used to connect to the target RabbitMQ server. Defaults to the source username.")] - public string TargetUsername { get; set; } - - [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] - public string TargetPassword { get; set; } - - [Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")] - public int? MaxRate { get; set; } - } - - - [Verb("purge", HelpText = "Removes all messages from a queue destructively.")] - public class PurgeOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The queue to purge.")] - public string QueueName { get; set; } - - [Option("confirm", HelpText = "Confirms the purging of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] - public bool Confirm { get; set; } - } - - - [Verb("declarequeue", HelpText = "Declares a durable queue without arguments, compatible with Tapeti.")] - public class DeclareQueueOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The name of the queue to declare.")] - public string QueueName { get; set; } - - [Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: :")] - public IEnumerable Bindings { get; set; } - } - - - [Verb("removequeue", HelpText = "Removes a durable queue.")] - public class RemoveQueueOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The name of the queue to remove.")] - public string QueueName { get; set; } - - [Option("confirm", HelpText = "Confirms the removal of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] - public bool Confirm { get; set; } - - [Option("confirmpurge", HelpText = "Confirms the removal of the specified queue even if there still are messages in the queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] - public bool ConfirmPurge { get; set; } - } - - - [Verb("bindqueue", HelpText = "Add a binding to a queue.")] - public class BindQueueOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The name of the queue to add the binding(s) to.")] - public string QueueName { get; set; } - - [Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: :")] - public IEnumerable Bindings { get; set; } - } - - - [Verb("unbindqueue", HelpText = "Remove a binding from a queue.")] - public class UnbindQueueOptions : CommonOptions - { - [Option('q', "queue", Required = true, HelpText = "The name of the queue to remove the binding(s) from.")] - public string QueueName { get; set; } - - [Option('b', "bindings", Required = false, HelpText = "One or more bindings to remove from the queue. Format: :")] - public IEnumerable Bindings { get; set; } - } - - public static int Main(string[] args) { - return Parser.Default.ParseArguments(args) - .MapResult( - (ExportOptions o) => ExecuteVerb(o, RunExport), - (ImportOptions o) => ExecuteVerb(o, RunImport), - (ExampleOptions o) => ExecuteVerb(o, RunExample), - (ShovelOptions o) => ExecuteVerb(o, RunShovel), - (PurgeOptions o) => ExecuteVerb(o, RunPurge), - (DeclareQueueOptions o) => ExecuteVerb(o, RunDeclareQueue), - (RemoveQueueOptions o) => ExecuteVerb(o, RunRemoveQueue), - (BindQueueOptions o) => ExecuteVerb(o, RunBindQueue), - (UnbindQueueOptions o) => ExecuteVerb(o, RunUnbindQueue), - errs => - { - if (!Debugger.IsAttached) - return 1; - - Console.WriteLine("Press any Enter key to continue..."); - Console.ReadLine(); - return 1; - } - ); - } - - - private static int ExecuteVerb(T options, Action execute) where T : class - { - try - { - execute(options); - return 0; - } - catch (Exception e) - { - Console.WriteLine(e.Message); - return 1; - } - } - - - private static IConnection GetConnection(CommonOptions options) - { - var factory = new ConnectionFactory - { - HostName = options.Host, - Port = options.Port, - VirtualHost = options.VirtualHost, - UserName = options.Username, - Password = options.Password - }; - - return factory.CreateConnection(); - } - - - private static IMessageSerializer GetMessageSerializer(ImportOptions options) - { - switch (options.SerializationMethod) - { - case SerializationMethod.SingleFileJSON: - return new SingleFileJSONMessageSerializer(GetInputStream(options, out var disposeStream), disposeStream, Encoding.UTF8); - - case SerializationMethod.EasyNetQHosepipe: - if (string.IsNullOrEmpty(options.InputFile)) - throw new ArgumentException("An input path must be provided when using EasyNetQHosepipe serialization"); - - return new EasyNetQMessageSerializer(options.InputFile); - - default: - throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); - } - } - - - private static Stream GetInputStream(ImportOptions options, out bool disposeStream) - { - if (options.InputPipe) - { - disposeStream = false; - return Console.OpenStandardInput(); - } - - if (!string.IsNullOrEmpty(options.InputMessage)) - { - disposeStream = true; - return new MemoryStream(Encoding.UTF8.GetBytes(options.InputMessage)); - } - - disposeStream = true; - return new FileStream(options.InputFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); - } - - - private static IMessageSerializer GetMessageSerializer(ExportOptions options) - { - switch (options.SerializationMethod) - { - case SerializationMethod.SingleFileJSON: - return new SingleFileJSONMessageSerializer(GetOutputStream(options, out var disposeStream), disposeStream, Encoding.UTF8); - - case SerializationMethod.EasyNetQHosepipe: - if (string.IsNullOrEmpty(options.OutputPath)) - throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization"); - - return new EasyNetQMessageSerializer(options.OutputPath); - - default: - throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); - } - } - - - private static Stream GetOutputStream(ExportOptions options, out bool disposeStream) - { - disposeStream = true; - return new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read); - } - - - private static IRateLimiter GetRateLimiter(int? maxRate) - { - if (!maxRate.HasValue || maxRate.Value <= 0) - return new NoRateLimiter(); - - return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1)); - } - - - private static void RunExport(ExportOptions options) - { - int messageCount; - - using (var messageSerializer = GetMessageSerializer(options)) - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - messageCount = new ExportCommand - { - MessageSerializer = messageSerializer, - - QueueName = options.QueueName, - RemoveMessages = options.RemoveMessages, - MaxCount = options.MaxCount - }.Execute(channel); - } - - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported."); - } - - - private static void RunImport(ImportOptions options) - { - int messageCount; - - using (var messageSerializer = GetMessageSerializer(options)) - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - messageCount = new ImportCommand - { - MessageSerializer = messageSerializer, - - DirectToQueue = !options.PublishToExchange - }.Execute(channel, GetRateLimiter(options.MaxRate)); - } - - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); - } - - - private static void RunExample(ExampleOptions options) - { - using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false))) - { - messageSerializer.Serialize(new Message - { - Exchange = "example", - Queue = "example.queue", - RoutingKey = "example.routing.key", - DeliveryTag = 42, - Properties = new MockBasicProperties - { - ContentType = "application/json", - DeliveryMode = 2, - Headers = new Dictionary - { - { "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") } - }, - ReplyTo = "reply.queue", - Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()) - }, - Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }") - }); - } - } - - - private static void RunShovel(ShovelOptions options) - { - int messageCount; - - using (var sourceConnection = GetConnection(options)) - using (var sourceChannel = sourceConnection.CreateModel()) - { - var shovelCommand = new ShovelCommand - { - QueueName = options.QueueName, - TargetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName, - RemoveMessages = options.RemoveMessages, - MaxCount = options.MaxCount - }; - - - if (RequiresSecondConnection(options)) - { - using (var targetConnection = GetTargetConnection(options)) - using (var targetChannel = targetConnection.CreateModel()) - { - messageCount = shovelCommand.Execute(sourceChannel, targetChannel, GetRateLimiter(options.MaxRate)); - } - } - else - messageCount = shovelCommand.Execute(sourceChannel, sourceChannel, GetRateLimiter(options.MaxRate)); - } - - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled."); - } - - - private static bool RequiresSecondConnection(ShovelOptions options) - { - if (!string.IsNullOrEmpty(options.TargetHost) && options.TargetHost != options.Host) - return true; - - if (options.TargetPort.HasValue && options.TargetPort.Value != options.Port) - return true; - - if (!string.IsNullOrEmpty(options.TargetVirtualHost) && options.TargetVirtualHost != options.VirtualHost) - return true; - - - // All relevant target host parameters are either omitted or the same. This means the queue must be different - // to prevent an infinite loop. - if (string.IsNullOrEmpty(options.TargetQueueName) || options.TargetQueueName == options.QueueName) - throw new ArgumentException("Target queue must be different from the source queue when shoveling within the same (virtual) host"); - - - if (!string.IsNullOrEmpty(options.TargetUsername) && options.TargetUsername != options.Username) - return true; - - // ReSharper disable once ConvertIfStatementToReturnStatement - if (!string.IsNullOrEmpty(options.TargetPassword) && options.TargetPassword != options.Password) - return true; - - - // Everything's the same, we can use the same channel - return false; - } - - - private static IConnection GetTargetConnection(ShovelOptions options) - { - var factory = new ConnectionFactory - { - HostName = !string.IsNullOrEmpty(options.TargetHost) ? options.TargetHost : options.Host, - Port = options.TargetPort ?? options.Port, - VirtualHost = !string.IsNullOrEmpty(options.TargetVirtualHost) ? options.TargetVirtualHost : options.VirtualHost, - UserName = !string.IsNullOrEmpty(options.TargetUsername) ? options.TargetUsername : options.Username, - Password = !string.IsNullOrEmpty(options.TargetPassword) ? options.TargetPassword : options.Password - }; - - return factory.CreateConnection(); - } - - - private static void RunPurge(PurgeOptions options) - { - if (!options.Confirm) - { - Console.Write($"Do you want to purge the queue '{options.QueueName}'? (Y/N) "); - var answer = Console.ReadLine(); - - if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) - return; - } - - uint messageCount; - - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - messageCount = channel.QueuePurge(options.QueueName); - } - - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'."); - } - - - private static void RunDeclareQueue(DeclareQueueOptions options) - { - // Parse early to fail early - var bindings = ParseBindings(options.Bindings); - - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - channel.QueueDeclare(options.QueueName, true, false, false); - - foreach (var (exchange, routingKey) in bindings) - channel.QueueBind(options.QueueName, exchange, routingKey); - } - - Console.WriteLine($"Queue {options.QueueName} declared with {bindings.Length} binding{(bindings.Length != 1 ? "s" : "")}."); - } - - - private static void RunRemoveQueue(RemoveQueueOptions options) - { - if (!options.Confirm) - { - Console.Write($"Do you want to remove the queue '{options.QueueName}'? (Y/N) "); - var answer = Console.ReadLine(); - - if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) - return; - } - - uint messageCount; - - try - { - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - messageCount = channel.QueueDelete(options.QueueName, true, true); - } - } - catch (OperationInterruptedException e) - { - if (e.ShutdownReason.ReplyCode == 406) - { - if (!options.ConfirmPurge) - { - Console.Write($"There are messages remaining. Do you want to purge the queue '{options.QueueName}'? (Y/N) "); - var answer = Console.ReadLine(); - - if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) - return; - } - - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - messageCount = channel.QueueDelete(options.QueueName, true, false); - } - } - else - throw; - } - - Console.WriteLine(messageCount == 0 - ? $"Empty or non-existent queue '{options.QueueName}' removed." - : $"{messageCount} message{(messageCount != 1 ? "s" : "")} purged while removing '{options.QueueName}'."); - } - - - private static void RunBindQueue(BindQueueOptions options) - { - var bindings = ParseBindings(options.Bindings); - - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - foreach (var (exchange, routingKey) in bindings) - channel.QueueBind(options.QueueName, exchange, routingKey); - } - - Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} added to queue {options.QueueName}."); - } - - - private static void RunUnbindQueue(UnbindQueueOptions options) - { - var bindings = ParseBindings(options.Bindings); - - using (var connection = GetConnection(options)) - using (var channel = connection.CreateModel()) - { - foreach (var (exchange, routingKey) in bindings) - channel.QueueUnbind(options.QueueName, exchange, routingKey); - } - - Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} removed from queue {options.QueueName}."); - } - - - - private static Tuple[] ParseBindings(IEnumerable bindings) - { - return bindings - .Select(b => - { - var parts = b.Split(':'); - if (parts.Length != 2) - throw new InvalidOperationException($"Invalid binding format: {b}"); - - return new Tuple(parts[0], parts[1]); - }) + var exitCode = 1; + var verbTypes = Assembly.GetExecutingAssembly().GetTypes() + .Where(t => t.GetCustomAttribute() != null) .ToArray(); + + CommandLine.Parser.Default.ParseArguments(args, verbTypes.ToArray()) + .WithParsed(o => + { + try + { + var executableVerbAttribute = o.GetType().GetCustomAttribute(); + var executer = Activator.CreateInstance(executableVerbAttribute.VerbExecuter, o) as IVerbExecuter; + + // Should have been validated by the ExecutableVerbAttribute + Debug.Assert(executer != null, nameof(executer) + " != null"); + + executer.Execute(); + exitCode = 0; + } + catch (Exception e) + { + Console.WriteLine(e.Message); + DebugConfirmClose(); + } + }) + .WithNotParsed(_ => + { + DebugConfirmClose(); + }); + + return exitCode; + } + + + private static void DebugConfirmClose() + { + if (!Debugger.IsAttached) + return; + + Console.WriteLine("Press any Enter key to continue..."); + Console.ReadLine(); } } } diff --git a/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs b/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs index d0b99ee..570fb02 100644 --- a/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs +++ b/Tapeti.Cmd/Serialization/EasyNetQMessageSerializer.cs @@ -11,22 +11,23 @@ namespace Tapeti.Cmd.Serialization { public class EasyNetQMessageSerializer : IMessageSerializer { - private static readonly Regex InvalidCharRegex = new Regex(@"[\\\/:\*\?\""\<\>|]", RegexOptions.Compiled); + private static readonly Regex InvalidCharRegex = new(@"[\\\/:\*\?\""\<\>|]", RegexOptions.Compiled); - private readonly string path; private readonly Lazy writablePath; private int messageCount; + private readonly Lazy files; + public EasyNetQMessageSerializer(string path) { - this.path = path; - writablePath = new Lazy(() => { Directory.CreateDirectory(path); return path; }); + + files = new Lazy(() => Directory.GetFiles(path, "*.*.message.txt")); } @@ -60,9 +61,15 @@ namespace Tapeti.Cmd.Serialization } + public int GetMessageCount() + { + return files.Value.Length; + } + + public IEnumerable Deserialize(IModel channel) { - foreach (var file in Directory.GetFiles(path, "*.*.message.txt")) + foreach (var file in files.Value) { const string messageTag = ".message."; @@ -303,7 +310,7 @@ namespace Tapeti.Cmd.Serialization public Message ToMessage() { - return new Message + return new() { //ConsumerTag = DeliveryTag = DeliverTag, diff --git a/Tapeti.Cmd/Serialization/IMessageSerializer.cs b/Tapeti.Cmd/Serialization/IMessageSerializer.cs index 3aae717..831b4df 100644 --- a/Tapeti.Cmd/Serialization/IMessageSerializer.cs +++ b/Tapeti.Cmd/Serialization/IMessageSerializer.cs @@ -19,6 +19,8 @@ namespace Tapeti.Cmd.Serialization public interface IMessageSerializer : IDisposable { void Serialize(Message message); + + int GetMessageCount(); IEnumerable Deserialize(IModel channel); } } diff --git a/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs b/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs index 05f2e88..2452157 100644 --- a/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs +++ b/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerializer.cs @@ -14,8 +14,11 @@ namespace Tapeti.Cmd.Serialization private readonly bool disposeStream; private readonly Encoding encoding; + // StreamReader.DefaultBufferSize is private :-/ + private const int DefaultBufferSize = 1024; - private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings + + private static readonly JsonSerializerSettings SerializerSettings = new() { NullValueHandling = NullValueHandling.Ignore }; @@ -40,23 +43,48 @@ namespace Tapeti.Cmd.Serialization exportFile.Value.WriteLine(serialized); } + + public int GetMessageCount() + { + if (!stream.CanSeek) + return 0; + + var position = stream.Position; + try + { + var lineCount = 0; + using var reader = new StreamReader(stream, encoding, true, DefaultBufferSize, true); + + while (!reader.EndOfStream) + { + if (!string.IsNullOrEmpty(reader.ReadLine())) + lineCount++; + } + + return lineCount; + } + finally + { + stream.Position = position; + } + } + public IEnumerable Deserialize(IModel channel) { - using (var reader = new StreamReader(stream, encoding)) + using var reader = new StreamReader(stream, encoding, true, DefaultBufferSize, true); + + while (!reader.EndOfStream) { - while (!reader.EndOfStream) - { - var serialized = reader.ReadLine(); - if (string.IsNullOrEmpty(serialized)) - continue; + var serialized = reader.ReadLine(); + if (string.IsNullOrEmpty(serialized)) + continue; - var serializableMessage = JsonConvert.DeserializeObject(serialized); - if (serializableMessage == null) - continue; + var serializableMessage = JsonConvert.DeserializeObject(serialized); + if (serializableMessage == null) + continue; - yield return serializableMessage.ToMessage(channel); - } + yield return serializableMessage.ToMessage(channel); } } @@ -135,7 +163,7 @@ namespace Tapeti.Cmd.Serialization public Message ToMessage(IModel channel) { - return new Message + return new() { DeliveryTag = DeliveryTag, Redelivered = Redelivered, diff --git a/Tapeti.Cmd/Tapeti.Cmd.csproj b/Tapeti.Cmd/Tapeti.Cmd.csproj index 388750a..d5eda89 100644 --- a/Tapeti.Cmd/Tapeti.Cmd.csproj +++ b/Tapeti.Cmd/Tapeti.Cmd.csproj @@ -12,6 +12,7 @@ https://github.com/MvRens/Tapeti 2.0.0 Tapeti Command-line Utility + latest diff --git a/Tapeti.Cmd/Verbs/BaseConnectionOptions.cs b/Tapeti.Cmd/Verbs/BaseConnectionOptions.cs new file mode 100644 index 0000000..2f09302 --- /dev/null +++ b/Tapeti.Cmd/Verbs/BaseConnectionOptions.cs @@ -0,0 +1,22 @@ +using CommandLine; + +namespace Tapeti.Cmd.Verbs +{ + public class BaseConnectionOptions + { + [Option('h', "host", HelpText = "Hostname of the RabbitMQ server.", Default = "localhost")] + public string Host { get; set; } + + [Option("port", HelpText = "AMQP port of the RabbitMQ server.", Default = 5672)] + public int Port { get; set; } + + [Option('v', "virtualhost", HelpText = "Virtual host used for the RabbitMQ connection.", Default = "/")] + public string VirtualHost { get; set; } + + [Option('u', "username", HelpText = "Username used to connect to the RabbitMQ server.", Default = "guest")] + public string Username { get; set; } + + [Option('p', "password", HelpText = "Password used to connect to the RabbitMQ server.", Default = "guest")] + public string Password { get; set; } + } +} diff --git a/Tapeti.Cmd/Verbs/BaseMessageSerializerOptions.cs b/Tapeti.Cmd/Verbs/BaseMessageSerializerOptions.cs new file mode 100644 index 0000000..89727ed --- /dev/null +++ b/Tapeti.Cmd/Verbs/BaseMessageSerializerOptions.cs @@ -0,0 +1,17 @@ +using CommandLine; + +namespace Tapeti.Cmd.Verbs +{ + public enum SerializationMethod + { + SingleFileJSON, + EasyNetQHosepipe + } + + + public class BaseMessageSerializerOptions : BaseConnectionOptions + { + [Option('s', "serialization", HelpText = "The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe.", Default = SerializationMethod.SingleFileJSON)] + public SerializationMethod SerializationMethod { get; set; } + } +} diff --git a/Tapeti.Cmd/Verbs/BindQueueVerb.cs b/Tapeti.Cmd/Verbs/BindQueueVerb.cs new file mode 100644 index 0000000..368b7df --- /dev/null +++ b/Tapeti.Cmd/Verbs/BindQueueVerb.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.Parser; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("bindqueue", HelpText = "Add a binding to a queue.")] + [ExecutableVerb(typeof(BindQueueVerb))] + public class BindQueueOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The name of the queue to add the binding(s) to.")] + public string QueueName { get; set; } + + [Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: :")] + public IEnumerable Bindings { get; set; } + } + + + public class BindQueueVerb : IVerbExecuter + { + private readonly BindQueueOptions options; + + + public BindQueueVerb(BindQueueOptions options) + { + this.options = options; + } + + + public void Execute() + { + var bindings = BindingParser.Parse(options.Bindings); + + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + foreach (var (exchange, routingKey) in bindings) + channel.QueueBind(options.QueueName, exchange, routingKey); + + Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} added to queue {options.QueueName}."); + } + } +} diff --git a/Tapeti.Cmd/Verbs/DeclareQueueVerb.cs b/Tapeti.Cmd/Verbs/DeclareQueueVerb.cs new file mode 100644 index 0000000..5ebd72f --- /dev/null +++ b/Tapeti.Cmd/Verbs/DeclareQueueVerb.cs @@ -0,0 +1,57 @@ +using System; +using System.Collections.Generic; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.Parser; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("declarequeue", HelpText = "Declares a durable queue without arguments, compatible with Tapeti.")] + [ExecutableVerb(typeof(DeclareQueueVerb))] + public class DeclareQueueOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The name of the queue to declare.")] + public string QueueName { get; set; } + + [Option('b', "bindings", Required = false, HelpText = "One or more bindings to add to the queue. Format: :")] + public IEnumerable Bindings { get; set; } + } + + + public class DeclareQueueVerb : IVerbExecuter + { + private readonly DeclareQueueOptions options; + + + public DeclareQueueVerb(DeclareQueueOptions options) + { + this.options = options; + } + + + public void Execute() + { + // Parse early to fail early + var bindings = BindingParser.Parse(options.Bindings); + + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + channel.QueueDeclare(options.QueueName, true, false, false); + + foreach (var (exchange, routingKey) in bindings) + channel.QueueBind(options.QueueName, exchange, routingKey); + + Console.WriteLine($"Queue {options.QueueName} declared with {bindings.Length} binding{(bindings.Length != 1 ? "s" : "")}."); + } + } +} diff --git a/Tapeti.Cmd/Verbs/ExampleVerb.cs b/Tapeti.Cmd/Verbs/ExampleVerb.cs new file mode 100644 index 0000000..6998e70 --- /dev/null +++ b/Tapeti.Cmd/Verbs/ExampleVerb.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.Mock; +using Tapeti.Cmd.Serialization; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")] + [ExecutableVerb(typeof(ExampleVerb))] + public class ExampleOptions + { + } + + + public class ExampleVerb : IVerbExecuter + { + public ExampleVerb(ExampleOptions options) + { + // Prevent compiler warnings, the parameter is expected by the Activator + Debug.Assert(options != null); + } + + + public void Execute() + { + using var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false)); + + messageSerializer.Serialize(new Message + { + Exchange = "example", + Queue = "example.queue", + RoutingKey = "example.routing.key", + DeliveryTag = 42, + Properties = new MockBasicProperties + { + ContentType = "application/json", + DeliveryMode = 2, + Headers = new Dictionary + { + { "classType", Encoding.UTF8.GetBytes("Tapeti.Cmd.Example:Tapeti.Cmd") } + }, + ReplyTo = "reply.queue", + Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()) + }, + Body = Encoding.UTF8.GetBytes("{ \"Hello\": \"world!\" }") + }); + } + } +} diff --git a/Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs b/Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs new file mode 100644 index 0000000..aaeec9e --- /dev/null +++ b/Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs @@ -0,0 +1,30 @@ +using System; + +namespace Tapeti.Cmd.Verbs +{ + /// + /// Implementations are expected to have a constructor which accepts the options class + /// associated with the ExecutableVerb attribute. + /// + public interface IVerbExecuter + { + void Execute(); + } + + + + [AttributeUsage(AttributeTargets.Class)] + public class ExecutableVerbAttribute : Attribute + { + public Type VerbExecuter { get; } + + + public ExecutableVerbAttribute(Type verbExecuter) + { + if (!typeof(IVerbExecuter).IsAssignableFrom(verbExecuter)) + throw new InvalidCastException("Type must support IVerbExecuter"); + + VerbExecuter = verbExecuter; + } + } +} diff --git a/Tapeti.Cmd/Verbs/ExportVerb.cs b/Tapeti.Cmd/Verbs/ExportVerb.cs new file mode 100644 index 0000000..33af936 --- /dev/null +++ b/Tapeti.Cmd/Verbs/ExportVerb.cs @@ -0,0 +1,121 @@ +using System; +using System.IO; +using System.Text; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.ASCII; +using Tapeti.Cmd.Serialization; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("export", HelpText = "Fetch messages from a queue and write it to disk.")] + [ExecutableVerb(typeof(ExportVerb))] + public class ExportOptions : BaseMessageSerializerOptions + { + [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")] + public string QueueName { get; set; } + + [Option('o', "output", Required = true, HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be output to.")] + public string OutputPath { get; set; } + + [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the queue. If not messages are kept.")] + public bool RemoveMessages { get; set; } + + [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] + public int? MaxCount { get; set; } + } + + + public class ExportVerb : IVerbExecuter + { + private readonly ExportOptions options; + + + public ExportVerb(ExportOptions options) + { + this.options = options; + } + + + public void Execute() + { + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var messageSerializer = GetMessageSerializer(options); + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + var totalCount = (int)channel.MessageCount(options.QueueName); + if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) + totalCount = options.MaxCount.Value; + + Console.WriteLine($"Exporting {totalCount} message{(totalCount != 1 ? "s" : "")} (actual number may differ if queue has active consumers or publishers)"); + var messageCount = 0; + var cancelled = false; + + Console.CancelKeyPress += (_, args) => + { + args.Cancel = true; + cancelled = true; + }; + + using (var progressBar = new ProgressBar(totalCount)) + { + while (!cancelled && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value)) + { + var result = channel.BasicGet(options.QueueName, false); + if (result == null) + // No more messages on the queue + break; + + messageCount++; + + messageSerializer.Serialize(new Message + { + DeliveryTag = result.DeliveryTag, + Redelivered = result.Redelivered, + Exchange = result.Exchange, + RoutingKey = result.RoutingKey, + Queue = options.QueueName, + Properties = result.BasicProperties, + Body = result.Body.ToArray() + }); + + if (options.RemoveMessages) + channel.BasicAck(result.DeliveryTag, false); + + + progressBar.Report(messageCount); + } + } + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported."); + } + + + private static IMessageSerializer GetMessageSerializer(ExportOptions options) + { + switch (options.SerializationMethod) + { + case SerializationMethod.SingleFileJSON: + return new SingleFileJSONMessageSerializer(new FileStream(options.OutputPath, FileMode.Create, FileAccess.Write, FileShare.Read), true, Encoding.UTF8); + + case SerializationMethod.EasyNetQHosepipe: + if (string.IsNullOrEmpty(options.OutputPath)) + throw new ArgumentException("An output path must be provided when using EasyNetQHosepipe serialization"); + + return new EasyNetQMessageSerializer(options.OutputPath); + + default: + throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); + } + } + } +} diff --git a/Tapeti.Cmd/Verbs/ImportVerb.cs b/Tapeti.Cmd/Verbs/ImportVerb.cs new file mode 100644 index 0000000..1040220 --- /dev/null +++ b/Tapeti.Cmd/Verbs/ImportVerb.cs @@ -0,0 +1,149 @@ +using System; +using System.IO; +using System.Text; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.ASCII; +using Tapeti.Cmd.RateLimiter; +using Tapeti.Cmd.Serialization; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("import", HelpText = "Read messages from disk as previously exported and publish them to a queue.")] + [ExecutableVerb(typeof(ImportVerb))] + public class ImportOptions : BaseMessageSerializerOptions + { + [Option('i', "input", Group = "Input", HelpText = "Path or filename (depending on the chosen serialization method) where the messages will be read from.")] + public string InputFile { get; set; } + + [Option('m', "message", Group = "Input", HelpText = "Single message to be sent, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] + public string InputMessage { get; set; } + + [Option('c', "pipe", Group = "Input", HelpText = "Messages are read from the standard input pipe, in the same format as used for SingleFileJSON. Serialization argument has no effect when using this input.")] + public bool InputPipe { get; set; } + + [Option('e', "exchange", HelpText = "If specified publishes to the originating exchange using the original routing key. By default these are ignored and the message is published directly to the originating queue.")] + public bool PublishToExchange { get; set; } + + [Option("maxrate", HelpText = "The maximum amount of messages per second to import.")] + public int? MaxRate { get; set; } + } + + + public class ImportVerb : IVerbExecuter + { + private readonly ImportOptions options; + + + public ImportVerb(ImportOptions options) + { + this.options = options; + } + + + public void Execute() + { + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var messageSerializer = GetMessageSerializer(options); + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + var rateLimiter = GetRateLimiter(options.MaxRate); + + var totalCount = messageSerializer.GetMessageCount(); + var messageCount = 0; + var cancelled = false; + + Console.CancelKeyPress += (_, args) => + { + args.Cancel = true; + cancelled = true; + }; + + ProgressBar progress = null; + if (totalCount > 0) + progress = new ProgressBar(totalCount); + try + { + foreach (var message in messageSerializer.Deserialize(channel)) + { + if (cancelled) + break; + + rateLimiter.Execute(() => + { + var exchange = options.PublishToExchange ? message.Exchange : ""; + var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue; + + // ReSharper disable AccessToDisposedClosure + channel.BasicPublish(exchange, routingKey, message.Properties, message.Body); + messageCount++; + + progress?.Report(messageCount); + // ReSharper restore AccessToDisposedClosure + }); + } + } + finally + { + progress?.Dispose(); + } + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); + } + + + private static IMessageSerializer GetMessageSerializer(ImportOptions options) + { + switch (options.SerializationMethod) + { + case SerializationMethod.SingleFileJSON: + return new SingleFileJSONMessageSerializer(GetInputStream(options, out var disposeStream), disposeStream, Encoding.UTF8); + + case SerializationMethod.EasyNetQHosepipe: + if (string.IsNullOrEmpty(options.InputFile)) + throw new ArgumentException("An input path must be provided when using EasyNetQHosepipe serialization"); + + return new EasyNetQMessageSerializer(options.InputFile); + + default: + throw new ArgumentOutOfRangeException(nameof(options.SerializationMethod), options.SerializationMethod, "Invalid SerializationMethod"); + } + } + + + private static Stream GetInputStream(ImportOptions options, out bool disposeStream) + { + if (options.InputPipe) + { + disposeStream = false; + return Console.OpenStandardInput(); + } + + if (!string.IsNullOrEmpty(options.InputMessage)) + { + disposeStream = true; + return new MemoryStream(Encoding.UTF8.GetBytes(options.InputMessage)); + } + + disposeStream = true; + return new FileStream(options.InputFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); + } + + + private static IRateLimiter GetRateLimiter(int? maxRate) + { + if (!maxRate.HasValue || maxRate.Value <= 0) + return new NoRateLimiter(); + + return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1)); + } + } +} diff --git a/Tapeti.Cmd/Verbs/PurgeVerb.cs b/Tapeti.Cmd/Verbs/PurgeVerb.cs new file mode 100644 index 0000000..30c6d00 --- /dev/null +++ b/Tapeti.Cmd/Verbs/PurgeVerb.cs @@ -0,0 +1,58 @@ +using System; +using CommandLine; +using RabbitMQ.Client; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("purge", HelpText = "Removes all messages from a queue destructively.")] + [ExecutableVerb(typeof(PurgeVerb))] + public class PurgeOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The queue to purge.")] + public string QueueName { get; set; } + + [Option("confirm", HelpText = "Confirms the purging of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] + public bool Confirm { get; set; } + } + + + public class PurgeVerb : IVerbExecuter + { + private readonly PurgeOptions options; + + + public PurgeVerb(PurgeOptions options) + { + this.options = options; + } + + + public void Execute() + { + if (!options.Confirm) + { + Console.Write($"Do you want to purge the queue '{options.QueueName}'? (Y/N) "); + var answer = Console.ReadLine(); + + if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + return; + } + + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + var messageCount = channel.QueuePurge(options.QueueName); + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'."); + } + } +} diff --git a/Tapeti.Cmd/Verbs/RemoveQueueVerb.cs b/Tapeti.Cmd/Verbs/RemoveQueueVerb.cs new file mode 100644 index 0000000..f83b4d1 --- /dev/null +++ b/Tapeti.Cmd/Verbs/RemoveQueueVerb.cs @@ -0,0 +1,90 @@ +using System; +using CommandLine; +using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("removequeue", HelpText = "Removes a durable queue.")] + [ExecutableVerb(typeof(RemoveQueueVerb))] + public class RemoveQueueOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The name of the queue to remove.")] + public string QueueName { get; set; } + + [Option("confirm", HelpText = "Confirms the removal of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] + public bool Confirm { get; set; } + + [Option("confirmpurge", HelpText = "Confirms the removal of the specified queue even if there still are messages in the queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)] + public bool ConfirmPurge { get; set; } + } + + + public class RemoveQueueVerb : IVerbExecuter + { + private readonly RemoveQueueOptions options; + + + public RemoveQueueVerb(RemoveQueueOptions options) + { + this.options = options; + } + + + public void Execute() + { + if (!options.Confirm) + { + Console.Write($"Do you want to remove the queue '{options.QueueName}'? (Y/N) "); + var answer = Console.ReadLine(); + + if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + return; + } + + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + uint messageCount; + + try + { + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + messageCount = channel.QueueDelete(options.QueueName, true, true); + } + catch (OperationInterruptedException e) + { + if (e.ShutdownReason.ReplyCode == 406) + { + if (!options.ConfirmPurge) + { + Console.Write($"There are messages remaining. Do you want to purge the queue '{options.QueueName}'? (Y/N) "); + var answer = Console.ReadLine(); + + if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + return; + } + + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + messageCount = channel.QueueDelete(options.QueueName, true, false); + } + else + throw; + } + + Console.WriteLine(messageCount == 0 + ? $"Empty or non-existent queue '{options.QueueName}' removed." + : $"{messageCount} message{(messageCount != 1 ? "s" : "")} purged while removing '{options.QueueName}'."); + } + } +} diff --git a/Tapeti.Cmd/Verbs/ShovelVerb.cs b/Tapeti.Cmd/Verbs/ShovelVerb.cs new file mode 100644 index 0000000..c601b5a --- /dev/null +++ b/Tapeti.Cmd/Verbs/ShovelVerb.cs @@ -0,0 +1,181 @@ +using System; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.ASCII; +using Tapeti.Cmd.RateLimiter; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("shovel", HelpText = "Reads messages from a queue and publishes them to another queue, optionally to another RabbitMQ server.")] + [ExecutableVerb(typeof(ShovelVerb))] + public class ShovelOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The queue to read the messages from.")] + public string QueueName { get; set; } + + [Option('t', "targetqueue", HelpText = "The target queue to publish the messages to. Defaults to the source queue if a different target host, port or virtualhost is specified. Otherwise it must be different from the source queue.")] + public string TargetQueueName { get; set; } + + [Option('r', "remove", HelpText = "If specified messages are acknowledged and removed from the source queue. If not messages are kept.")] + public bool RemoveMessages { get; set; } + + [Option('n', "maxcount", HelpText = "(Default: all) Maximum number of messages to retrieve from the queue.")] + public int? MaxCount { get; set; } + + [Option("targethost", HelpText = "Hostname of the target RabbitMQ server. Defaults to the source host. Note that you may still specify a different targetusername for example.")] + public string TargetHost { get; set; } + + [Option("targetport", HelpText = "AMQP port of the target RabbitMQ server. Defaults to the source port.")] + public int? TargetPort { get; set; } + + [Option("targetvirtualhost", HelpText = "Virtual host used for the target RabbitMQ connection. Defaults to the source virtualhost.")] + public string TargetVirtualHost { get; set; } + + [Option("targetusername", HelpText = "Username used to connect to the target RabbitMQ server. Defaults to the source username.")] + public string TargetUsername { get; set; } + + [Option("targetpassword", HelpText = "Password used to connect to the target RabbitMQ server. Defaults to the source password.")] + public string TargetPassword { get; set; } + + [Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")] + public int? MaxRate { get; set; } + } + + + public class ShovelVerb : IVerbExecuter + { + private readonly ShovelOptions options; + + + public ShovelVerb(ShovelOptions options) + { + this.options = options; + } + + + public void Execute() + { + var sourceFactory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var sourceConnection = sourceFactory.CreateConnection(); + using var sourceChannel = sourceConnection.CreateModel(); + + if (RequiresSecondConnection(options)) + { + var targetFactory = new ConnectionFactory + { + HostName = !string.IsNullOrEmpty(options.TargetHost) ? options.TargetHost : options.Host, + Port = options.TargetPort ?? options.Port, + VirtualHost = !string.IsNullOrEmpty(options.TargetVirtualHost) ? options.TargetVirtualHost : options.VirtualHost, + UserName = !string.IsNullOrEmpty(options.TargetUsername) ? options.TargetUsername : options.Username, + Password = !string.IsNullOrEmpty(options.TargetPassword) ? options.TargetPassword : options.Password + }; + + using var targetConnection = targetFactory.CreateConnection(); + using var targetChannel = targetConnection.CreateModel(); + + Shovel(options, sourceChannel, targetChannel); + } + else + Shovel(options, sourceChannel, sourceChannel); + } + + + private static void Shovel(ShovelOptions options, IModel sourceChannel, IModel targetChannel) + { + var rateLimiter = GetRateLimiter(options.MaxRate); + var targetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName; + + var totalCount = (int)sourceChannel.MessageCount(options.QueueName); + if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) + totalCount = options.MaxCount.Value; + + Console.WriteLine($"Shoveling {totalCount} message{(totalCount != 1 ? "s" : "")} (actual number may differ if queue has active consumers or publishers)"); + var messageCount = 0; + var cancelled = false; + + Console.CancelKeyPress += (_, args) => + { + args.Cancel = true; + cancelled = true; + }; + + using (var progressBar = new ProgressBar(totalCount)) + { + while (!cancelled && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value)) + { + var result = sourceChannel.BasicGet(options.QueueName, false); + if (result == null) + // No more messages on the queue + break; + + // Since RabbitMQ client 6 we need to copy the body before calling another channel method + // like BasicPublish, or the published body will be corrupted if sourceChannel and targetChannel are the same + var bodyCopy = result.Body.ToArray(); + + + rateLimiter.Execute(() => + { + targetChannel.BasicPublish("", targetQueueName, result.BasicProperties, bodyCopy); + messageCount++; + + if (options.RemoveMessages) + sourceChannel.BasicAck(result.DeliveryTag, false); + + // ReSharper disable once AccessToDisposedClosure + progressBar.Report(messageCount); + }); + } + } + + Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled."); + } + + + private static bool RequiresSecondConnection(ShovelOptions options) + { + if (!string.IsNullOrEmpty(options.TargetHost) && options.TargetHost != options.Host) + return true; + + if (options.TargetPort.HasValue && options.TargetPort.Value != options.Port) + return true; + + if (!string.IsNullOrEmpty(options.TargetVirtualHost) && options.TargetVirtualHost != options.VirtualHost) + return true; + + + // All relevant target host parameters are either omitted or the same. This means the queue must be different + // to prevent an infinite loop. + if (string.IsNullOrEmpty(options.TargetQueueName) || options.TargetQueueName == options.QueueName) + throw new ArgumentException("Target queue must be different from the source queue when shoveling within the same (virtual) host"); + + + if (!string.IsNullOrEmpty(options.TargetUsername) && options.TargetUsername != options.Username) + return true; + + // ReSharper disable once ConvertIfStatementToReturnStatement + if (!string.IsNullOrEmpty(options.TargetPassword) && options.TargetPassword != options.Password) + return true; + + + // Everything's the same, we can use the same channel + return false; + } + + + private static IRateLimiter GetRateLimiter(int? maxRate) + { + if (!maxRate.HasValue || maxRate.Value <= 0) + return new NoRateLimiter(); + + return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1)); + } + } +} diff --git a/Tapeti.Cmd/Verbs/UnbindQueueVerb.cs b/Tapeti.Cmd/Verbs/UnbindQueueVerb.cs new file mode 100644 index 0000000..12c91f2 --- /dev/null +++ b/Tapeti.Cmd/Verbs/UnbindQueueVerb.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using CommandLine; +using RabbitMQ.Client; +using Tapeti.Cmd.Parser; + +namespace Tapeti.Cmd.Verbs +{ + [Verb("unbindqueue", HelpText = "Remove a binding from a queue.")] + [ExecutableVerb(typeof(UnbindQueueVerb))] + public class UnbindQueueOptions : BaseConnectionOptions + { + [Option('q', "queue", Required = true, HelpText = "The name of the queue to remove the binding(s) from.")] + public string QueueName { get; set; } + + [Option('b', "bindings", Required = false, HelpText = "One or more bindings to remove from the queue. Format: :")] + public IEnumerable Bindings { get; set; } + } + + + public class UnbindQueueVerb : IVerbExecuter + { + private readonly UnbindQueueOptions options; + + + public UnbindQueueVerb(UnbindQueueOptions options) + { + this.options = options; + } + + + public void Execute() + { + var bindings = BindingParser.Parse(options.Bindings); + + var factory = new ConnectionFactory + { + HostName = options.Host, + Port = options.Port, + VirtualHost = options.VirtualHost, + UserName = options.Username, + Password = options.Password + }; + + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + + foreach (var (exchange, routingKey) in bindings) + channel.QueueUnbind(options.QueueName, exchange, routingKey); + + Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} removed from queue {options.QueueName}."); + } + } +} From e157598fa77566c363a9efd2b40fce13b3e40803 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Sat, 4 Sep 2021 14:01:03 +0200 Subject: [PATCH 3/4] Implemented #32: Progress and batching for Tapeti.Cmd import Refactored console interaction to support this feature Updated documentation with recently added verbs --- Tapeti.Cmd/ConsoleHelper/ConsoleWrapper.cs | 259 ++++++++++++++++++ Tapeti.Cmd/ConsoleHelper/IConsole.cs | 44 +++ .../{ASCII => ConsoleHelper}/ProgressBar.cs | 31 +-- Tapeti.Cmd/Program.cs | 7 +- .../RateLimiter/BatchSizeRateLimiter.cs | 80 ++++++ Tapeti.Cmd/RateLimiter/RateLimiterFactory.cs | 29 ++ Tapeti.Cmd/Verbs/BindQueueVerb.cs | 6 +- Tapeti.Cmd/Verbs/DeclareQueueVerb.cs | 9 +- Tapeti.Cmd/Verbs/ExampleVerb.cs | 3 +- Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs | 3 +- Tapeti.Cmd/Verbs/ExportVerb.cs | 20 +- Tapeti.Cmd/Verbs/ImportVerb.cs | 37 ++- Tapeti.Cmd/Verbs/PurgeVerb.cs | 12 +- Tapeti.Cmd/Verbs/RemoveQueueVerb.cs | 17 +- Tapeti.Cmd/Verbs/ShovelVerb.cs | 68 +++-- Tapeti.Cmd/Verbs/UnbindQueueVerb.cs | 6 +- docs/tapeticmd.rst | 109 +++++++- 17 files changed, 621 insertions(+), 119 deletions(-) create mode 100644 Tapeti.Cmd/ConsoleHelper/ConsoleWrapper.cs create mode 100644 Tapeti.Cmd/ConsoleHelper/IConsole.cs rename Tapeti.Cmd/{ASCII => ConsoleHelper}/ProgressBar.cs (73%) create mode 100644 Tapeti.Cmd/RateLimiter/BatchSizeRateLimiter.cs create mode 100644 Tapeti.Cmd/RateLimiter/RateLimiterFactory.cs diff --git a/Tapeti.Cmd/ConsoleHelper/ConsoleWrapper.cs b/Tapeti.Cmd/ConsoleHelper/ConsoleWrapper.cs new file mode 100644 index 0000000..05f8541 --- /dev/null +++ b/Tapeti.Cmd/ConsoleHelper/ConsoleWrapper.cs @@ -0,0 +1,259 @@ +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Tapeti.Cmd.ConsoleHelper +{ + public class ConsoleWrapper : IConsole + { + private readonly List temporaryWriters = new(); + private bool temporaryActive; + + private int temporaryCursorTop; + + + public ConsoleWrapper() + { + temporaryCursorTop = Console.CursorTop; + + Console.CancelKeyPress += (_, args) => + { + if (Cancelled) + return; + + using var consoleWriter = GetPermanentWriter(); + consoleWriter.WriteLine("Cancelling..."); + + args.Cancel = true; + Cancelled = true; + }; + } + + + public void Dispose() + { + foreach (var writer in temporaryWriters) + writer.Dispose(); + + Console.CursorVisible = true; + GC.SuppressFinalize(this); + } + + + public bool Cancelled { get; private set; } + + public IConsoleWriter GetPermanentWriter() + { + return new PermanentWriter(this); + } + + + public IConsoleWriter GetTemporaryWriter() + { + var writer = new TemporaryWriter(this, temporaryWriters.Count); + temporaryWriters.Add(writer); + + return writer; + } + + + private void AcquirePermanent() + { + if (!temporaryActive) + return; + + foreach (var writer in temporaryWriters) + { + Console.SetCursorPosition(0, temporaryCursorTop + writer.RelativePosition); + writer.Clear(); + } + + Console.SetCursorPosition(0, temporaryCursorTop); + Console.CursorVisible = true; + temporaryActive = false; + } + + + private void ReleasePermanent() + { + if (temporaryWriters.Count == 0) + { + temporaryCursorTop = Console.CursorTop; + return; + } + + foreach (var writer in temporaryWriters) + { + writer.Restore(); + Console.WriteLine(); + } + + // Store the cursor position afterwards to account for buffer scrolling + temporaryCursorTop = Console.CursorTop - temporaryWriters.Count; + Console.CursorVisible = false; + temporaryActive = true; + } + + + private void AcquireTemporary(TemporaryWriter writer) + { + Console.SetCursorPosition(0, temporaryCursorTop + writer.RelativePosition); + + if (temporaryActive) + return; + + Console.CursorVisible = false; + temporaryActive = true; + } + + + private void DisposeWriter(BaseWriter writer) + { + if (writer is not TemporaryWriter temporaryWriter) + return; + + Console.SetCursorPosition(0, temporaryCursorTop + temporaryWriter.RelativePosition); + temporaryWriter.Clear(); + + temporaryWriters.Remove(temporaryWriter); + } + + + private abstract class BaseWriter : IConsoleWriter + { + protected readonly ConsoleWrapper Owner; + + + protected BaseWriter(ConsoleWrapper owner) + { + Owner = owner; + } + + + public virtual void Dispose() + { + Owner.DisposeWriter(this); + GC.SuppressFinalize(this); + } + + public abstract bool Enabled { get; } + + public abstract void WriteLine(string value); + + + public void Confirm(string message) + { + WriteLine(message); + TryReadKey(false, out var _); + } + + + public bool ConfirmYesNo(string message) + { + WriteLine($"{message} (Y/N) "); + if (!TryReadKey(true, out var key)) + return false; + + return key.KeyChar == 'y' || key.KeyChar == 'Y'; + } + + + private bool TryReadKey(bool showKeyOutput, out ConsoleKeyInfo keyInfo) + { + while (!Owner.Cancelled && !Console.KeyAvailable) + Thread.Sleep(50); + + if (Owner.Cancelled) + { + keyInfo = default; + return false; + } + + keyInfo = Console.ReadKey(!showKeyOutput); + return true; + } + } + + + private class PermanentWriter : BaseWriter + { + public PermanentWriter(ConsoleWrapper owner) : base(owner) + { + } + + + public override bool Enabled => true; + + + public override void WriteLine(string value) + { + Owner.AcquirePermanent(); + try + { + Console.WriteLine(value); + } + finally + { + Owner.ReleasePermanent(); + } + } + } + + + private class TemporaryWriter : BaseWriter + { + public int RelativePosition { get; } + + private bool isActive; + private string storedValue; + + + public TemporaryWriter(ConsoleWrapper owner, int relativePosition) : base(owner) + { + RelativePosition = relativePosition; + } + + + public override bool Enabled => !Console.IsOutputRedirected; + + + public override void WriteLine(string value) + { + if (!Enabled) + return; + + Owner.AcquireTemporary(this); + Console.Write(value); + + if (!string.IsNullOrEmpty(storedValue) && storedValue.Length > value.Length) + // Clear characters remaining from the previous value + Console.Write(new string(' ', storedValue.Length - value.Length)); + + storedValue = value; + isActive = true; + } + + + public void Clear() + { + if (!isActive) + return; + + if (!string.IsNullOrEmpty(storedValue)) + Console.Write(new string(' ', storedValue.Length)); + + isActive = false; + } + + + public void Restore() + { + if (string.IsNullOrEmpty(storedValue)) + return; + + Console.Write(storedValue); + isActive = true; + } + } + } +} diff --git a/Tapeti.Cmd/ConsoleHelper/IConsole.cs b/Tapeti.Cmd/ConsoleHelper/IConsole.cs new file mode 100644 index 0000000..d563d1e --- /dev/null +++ b/Tapeti.Cmd/ConsoleHelper/IConsole.cs @@ -0,0 +1,44 @@ +using System; + +namespace Tapeti.Cmd.ConsoleHelper +{ + /// + /// Wraps access to the console to provide cooperation between temporary outputs like the + /// progress bar and batch confirmations. Temporary outputs hide the cursor and will be + /// automatically be erased and restored when a permanent writer is called. + /// + /// + /// Temporary outputs are automatically supressed when the console output is redirected. + /// The Enabled property will reflect this. + /// + public interface IConsole : IDisposable + { + bool Cancelled { get; } + + IConsoleWriter GetPermanentWriter(); + IConsoleWriter GetTemporaryWriter(); + } + + + /// + /// For simplicity outputs only support one line of text. + /// For temporary writers, each call to WriteLine will overwrite the previous and clear any + /// extra characters if the previous value was longer. + /// + public interface IConsoleWriter : IDisposable + { + bool Enabled { get; } + + void WriteLine(string value); + + /// + /// Waits for any user input. + /// + void Confirm(string message); + + /// + /// Waits for user confirmation (Y/N). + /// + bool ConfirmYesNo(string message); + } +} diff --git a/Tapeti.Cmd/ASCII/ProgressBar.cs b/Tapeti.Cmd/ConsoleHelper/ProgressBar.cs similarity index 73% rename from Tapeti.Cmd/ASCII/ProgressBar.cs rename to Tapeti.Cmd/ConsoleHelper/ProgressBar.cs index f357457..2d08be8 100644 --- a/Tapeti.Cmd/ASCII/ProgressBar.cs +++ b/Tapeti.Cmd/ConsoleHelper/ProgressBar.cs @@ -1,12 +1,13 @@ using System; using System.Text; -namespace Tapeti.Cmd.ASCII +namespace Tapeti.Cmd.ConsoleHelper { public class ProgressBar : IDisposable, IProgress { private static readonly TimeSpan UpdateInterval = TimeSpan.FromMilliseconds(20); + private readonly IConsoleWriter consoleWriter; private readonly int max; private readonly int width; private readonly bool showPosition; @@ -14,39 +15,33 @@ namespace Tapeti.Cmd.ASCII private readonly bool enabled; private DateTime lastUpdate = DateTime.MinValue; - private int lastOutputLength; - public ProgressBar(int max, int width = 10, bool showPosition = true) + public ProgressBar(IConsole console, int max, int width = 10, bool showPosition = true) { if (width <= 0) throw new ArgumentOutOfRangeException(nameof(width), "Width must be greater than zero"); if (max <= 0) throw new ArgumentOutOfRangeException(nameof(max), "Max must be greater than zero"); - + + consoleWriter = console.GetTemporaryWriter(); + this.max = max; this.width = width; this.showPosition = showPosition; - enabled = !Console.IsOutputRedirected; + enabled = consoleWriter.Enabled; if (!enabled) return; - Console.CursorVisible = false; Redraw(); } public void Dispose() { - if (!enabled || lastOutputLength <= 0) - return; - - Console.CursorLeft = 0; - Console.Write(new string(' ', lastOutputLength)); - Console.CursorLeft = 0; - Console.CursorVisible = true; + consoleWriter.Dispose(); } @@ -89,15 +84,7 @@ namespace Tapeti.Cmd.ASCII else output.Append(" ").Append((int)Math.Truncate((decimal)position / max * 100)).Append("%"); - - var newLength = output.Length; - if (newLength < lastOutputLength) - output.Append(new string(' ', lastOutputLength - output.Length)); - - Console.CursorLeft = 0; - Console.Write(output); - - lastOutputLength = newLength; + consoleWriter.WriteLine(output.ToString()); } } } diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index b1f563a..787d326 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Linq; using System.Reflection; using CommandLine; +using Tapeti.Cmd.ConsoleHelper; using Tapeti.Cmd.Verbs; namespace Tapeti.Cmd @@ -26,8 +27,10 @@ namespace Tapeti.Cmd // Should have been validated by the ExecutableVerbAttribute Debug.Assert(executer != null, nameof(executer) + " != null"); - - executer.Execute(); + + using var consoleWrapper = new ConsoleWrapper(); + + executer.Execute(consoleWrapper); exitCode = 0; } catch (Exception e) diff --git a/Tapeti.Cmd/RateLimiter/BatchSizeRateLimiter.cs b/Tapeti.Cmd/RateLimiter/BatchSizeRateLimiter.cs new file mode 100644 index 0000000..c9e8ed9 --- /dev/null +++ b/Tapeti.Cmd/RateLimiter/BatchSizeRateLimiter.cs @@ -0,0 +1,80 @@ +using System; +using System.Threading; +using Tapeti.Cmd.ConsoleHelper; + +namespace Tapeti.Cmd.RateLimiter +{ + public abstract class BaseBatchSizeRateLimiter : IRateLimiter + { + private readonly IConsole console; + private readonly IRateLimiter decoratedRateLimiter; + private readonly int batchSize; + private int batchCount; + + + protected BaseBatchSizeRateLimiter(IConsole console, IRateLimiter decoratedRateLimiter, int batchSize) + { + this.console = console; + this.decoratedRateLimiter = decoratedRateLimiter; + this.batchSize = batchSize; + } + + + public void Execute(Action action) + { + batchCount++; + if (batchCount > batchSize) + { + Pause(console); + batchCount = 0; + } + + decoratedRateLimiter.Execute(action); + } + + + protected abstract void Pause(IConsole console); + } + + + public class ManualBatchSizeRateLimiter : BaseBatchSizeRateLimiter + { + public ManualBatchSizeRateLimiter(IConsole console, IRateLimiter decoratedRateLimiter, int batchSize) : base(console, decoratedRateLimiter, batchSize) + { + } + + + protected override void Pause(IConsole console) + { + using var consoleWriter = console.GetTemporaryWriter(); + consoleWriter.Confirm("Press any key to continue with the next batch..."); + } + } + + + public class TimedBatchSizeRateLimiter : BaseBatchSizeRateLimiter + { + private readonly int batchPauseTime; + + + public TimedBatchSizeRateLimiter(IConsole console, IRateLimiter decoratedRateLimiter, int batchSize, int batchPauseTime) : base(console, decoratedRateLimiter, batchSize) + { + this.batchPauseTime = batchPauseTime; + } + + + protected override void Pause(IConsole console) + { + using var consoleWriter = console.GetTemporaryWriter(); + + var remaining = batchPauseTime; + while (remaining > 0 && !console.Cancelled) + { + consoleWriter.WriteLine($"Next batch in {remaining} second{(remaining != 1 ? "s" : "")}..."); + + Thread.Sleep(1000); + remaining--; + } + } + } +} diff --git a/Tapeti.Cmd/RateLimiter/RateLimiterFactory.cs b/Tapeti.Cmd/RateLimiter/RateLimiterFactory.cs new file mode 100644 index 0000000..45ed0e2 --- /dev/null +++ b/Tapeti.Cmd/RateLimiter/RateLimiterFactory.cs @@ -0,0 +1,29 @@ +using System; +using Tapeti.Cmd.ConsoleHelper; + +namespace Tapeti.Cmd.RateLimiter +{ + public static class RateLimiterFactory + { + public static IRateLimiter Create(IConsole console, int? maxRate, int? batchSize, int? batchPauseTime) + { + IRateLimiter rateLimiter; + + if (maxRate > 0) + rateLimiter = new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1)); + else + rateLimiter = new NoRateLimiter(); + + // ReSharper disable once InvertIf - I don't like the readability of that flow + if (batchSize > 0) + { + if (batchPauseTime > 0) + rateLimiter = new TimedBatchSizeRateLimiter(console, rateLimiter, batchSize.Value, batchPauseTime.Value); + else + rateLimiter = new ManualBatchSizeRateLimiter(console, rateLimiter, batchSize.Value); + } + + return rateLimiter; + } + } +} diff --git a/Tapeti.Cmd/Verbs/BindQueueVerb.cs b/Tapeti.Cmd/Verbs/BindQueueVerb.cs index 368b7df..fc4416d 100644 --- a/Tapeti.Cmd/Verbs/BindQueueVerb.cs +++ b/Tapeti.Cmd/Verbs/BindQueueVerb.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using CommandLine; using RabbitMQ.Client; +using Tapeti.Cmd.ConsoleHelper; using Tapeti.Cmd.Parser; namespace Tapeti.Cmd.Verbs @@ -29,8 +30,9 @@ namespace Tapeti.Cmd.Verbs } - public void Execute() + public void Execute(IConsole console) { + var consoleWriter = console.GetPermanentWriter(); var bindings = BindingParser.Parse(options.Bindings); var factory = new ConnectionFactory @@ -48,7 +50,7 @@ namespace Tapeti.Cmd.Verbs foreach (var (exchange, routingKey) in bindings) channel.QueueBind(options.QueueName, exchange, routingKey); - Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} added to queue {options.QueueName}."); + consoleWriter.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} added to queue {options.QueueName}."); } } } diff --git a/Tapeti.Cmd/Verbs/DeclareQueueVerb.cs b/Tapeti.Cmd/Verbs/DeclareQueueVerb.cs index 5ebd72f..a119233 100644 --- a/Tapeti.Cmd/Verbs/DeclareQueueVerb.cs +++ b/Tapeti.Cmd/Verbs/DeclareQueueVerb.cs @@ -2,11 +2,12 @@ using System.Collections.Generic; using CommandLine; using RabbitMQ.Client; +using Tapeti.Cmd.ConsoleHelper; using Tapeti.Cmd.Parser; namespace Tapeti.Cmd.Verbs { - [Verb("declarequeue", HelpText = "Declares a durable queue without arguments, compatible with Tapeti.")] + [Verb("declarequeue", HelpText = "Declares a durable queue without arguments.")] [ExecutableVerb(typeof(DeclareQueueVerb))] public class DeclareQueueOptions : BaseConnectionOptions { @@ -29,8 +30,10 @@ namespace Tapeti.Cmd.Verbs } - public void Execute() + public void Execute(IConsole console) { + var consoleWriter = console.GetPermanentWriter(); + // Parse early to fail early var bindings = BindingParser.Parse(options.Bindings); @@ -51,7 +54,7 @@ namespace Tapeti.Cmd.Verbs foreach (var (exchange, routingKey) in bindings) channel.QueueBind(options.QueueName, exchange, routingKey); - Console.WriteLine($"Queue {options.QueueName} declared with {bindings.Length} binding{(bindings.Length != 1 ? "s" : "")}."); + consoleWriter.WriteLine($"Queue {options.QueueName} declared with {bindings.Length} binding{(bindings.Length != 1 ? "s" : "")}."); } } } diff --git a/Tapeti.Cmd/Verbs/ExampleVerb.cs b/Tapeti.Cmd/Verbs/ExampleVerb.cs index 6998e70..b7fcb6e 100644 --- a/Tapeti.Cmd/Verbs/ExampleVerb.cs +++ b/Tapeti.Cmd/Verbs/ExampleVerb.cs @@ -4,6 +4,7 @@ using System.Diagnostics; using System.Text; using CommandLine; using RabbitMQ.Client; +using Tapeti.Cmd.ConsoleHelper; using Tapeti.Cmd.Mock; using Tapeti.Cmd.Serialization; @@ -25,7 +26,7 @@ namespace Tapeti.Cmd.Verbs } - public void Execute() + public void Execute(IConsole console) { using var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false)); diff --git a/Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs b/Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs index aaeec9e..b172dd2 100644 --- a/Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs +++ b/Tapeti.Cmd/Verbs/ExecutableVerbAttribute.cs @@ -1,4 +1,5 @@ using System; +using Tapeti.Cmd.ConsoleHelper; namespace Tapeti.Cmd.Verbs { @@ -8,7 +9,7 @@ namespace Tapeti.Cmd.Verbs /// public interface IVerbExecuter { - void Execute(); + void Execute(IConsole console); } diff --git a/Tapeti.Cmd/Verbs/ExportVerb.cs b/Tapeti.Cmd/Verbs/ExportVerb.cs index 33af936..ac687cd 100644 --- a/Tapeti.Cmd/Verbs/ExportVerb.cs +++ b/Tapeti.Cmd/Verbs/ExportVerb.cs @@ -3,7 +3,7 @@ using System.IO; using System.Text; using CommandLine; using RabbitMQ.Client; -using Tapeti.Cmd.ASCII; +using Tapeti.Cmd.ConsoleHelper; using Tapeti.Cmd.Serialization; namespace Tapeti.Cmd.Verbs @@ -37,8 +37,9 @@ namespace Tapeti.Cmd.Verbs } - public void Execute() + public void Execute(IConsole console) { + var consoleWriter = console.GetPermanentWriter(); var factory = new ConnectionFactory { HostName = options.Host, @@ -56,19 +57,12 @@ namespace Tapeti.Cmd.Verbs if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) totalCount = options.MaxCount.Value; - Console.WriteLine($"Exporting {totalCount} message{(totalCount != 1 ? "s" : "")} (actual number may differ if queue has active consumers or publishers)"); + consoleWriter.WriteLine($"Exporting {totalCount} message{(totalCount != 1 ? "s" : "")} (actual number may differ if queue has active consumers or publishers)"); var messageCount = 0; - var cancelled = false; - - Console.CancelKeyPress += (_, args) => - { - args.Cancel = true; - cancelled = true; - }; - using (var progressBar = new ProgressBar(totalCount)) + using (var progressBar = new ProgressBar(console, totalCount)) { - while (!cancelled && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value)) + while (!console.Cancelled && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value)) { var result = channel.BasicGet(options.QueueName, false); if (result == null) @@ -96,7 +90,7 @@ namespace Tapeti.Cmd.Verbs } } - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported."); + consoleWriter.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} exported."); } diff --git a/Tapeti.Cmd/Verbs/ImportVerb.cs b/Tapeti.Cmd/Verbs/ImportVerb.cs index 1040220..cb2eb73 100644 --- a/Tapeti.Cmd/Verbs/ImportVerb.cs +++ b/Tapeti.Cmd/Verbs/ImportVerb.cs @@ -3,7 +3,7 @@ using System.IO; using System.Text; using CommandLine; using RabbitMQ.Client; -using Tapeti.Cmd.ASCII; +using Tapeti.Cmd.ConsoleHelper; using Tapeti.Cmd.RateLimiter; using Tapeti.Cmd.Serialization; @@ -27,6 +27,12 @@ namespace Tapeti.Cmd.Verbs [Option("maxrate", HelpText = "The maximum amount of messages per second to import.")] public int? MaxRate { get; set; } + + [Option("batchsize", HelpText = "How many messages to import before pausing. Will wait for manual confirmation unless batchpausetime is specified.")] + public int? BatchSize { get; set; } + + [Option("batchpausetime", HelpText = "How many seconds to wait before starting the next batch if batchsize is specified.")] + public int? BatchPauseTime { get; set; } } @@ -41,8 +47,9 @@ namespace Tapeti.Cmd.Verbs } - public void Execute() + public void Execute(IConsole console) { + var consoleWriter = console.GetPermanentWriter(); var factory = new ConnectionFactory { HostName = options.Host, @@ -55,30 +62,27 @@ namespace Tapeti.Cmd.Verbs using var messageSerializer = GetMessageSerializer(options); using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); - var rateLimiter = GetRateLimiter(options.MaxRate); + var rateLimiter = RateLimiterFactory.Create(console, options.MaxRate, options.BatchSize, options.BatchPauseTime); var totalCount = messageSerializer.GetMessageCount(); var messageCount = 0; - var cancelled = false; - Console.CancelKeyPress += (_, args) => - { - args.Cancel = true; - cancelled = true; - }; ProgressBar progress = null; if (totalCount > 0) - progress = new ProgressBar(totalCount); + progress = new ProgressBar(console, totalCount); try { foreach (var message in messageSerializer.Deserialize(channel)) { - if (cancelled) + if (console.Cancelled) break; rateLimiter.Execute(() => { + if (console.Cancelled) + return; + var exchange = options.PublishToExchange ? message.Exchange : ""; var routingKey = options.PublishToExchange ? message.RoutingKey : message.Queue; @@ -96,7 +100,7 @@ namespace Tapeti.Cmd.Verbs progress?.Dispose(); } - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); + consoleWriter.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} published."); } @@ -136,14 +140,5 @@ namespace Tapeti.Cmd.Verbs disposeStream = true; return new FileStream(options.InputFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); } - - - private static IRateLimiter GetRateLimiter(int? maxRate) - { - if (!maxRate.HasValue || maxRate.Value <= 0) - return new NoRateLimiter(); - - return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1)); - } } } diff --git a/Tapeti.Cmd/Verbs/PurgeVerb.cs b/Tapeti.Cmd/Verbs/PurgeVerb.cs index 30c6d00..bd99cd2 100644 --- a/Tapeti.Cmd/Verbs/PurgeVerb.cs +++ b/Tapeti.Cmd/Verbs/PurgeVerb.cs @@ -1,6 +1,7 @@ using System; using CommandLine; using RabbitMQ.Client; +using Tapeti.Cmd.ConsoleHelper; namespace Tapeti.Cmd.Verbs { @@ -27,14 +28,13 @@ namespace Tapeti.Cmd.Verbs } - public void Execute() + public void Execute(IConsole console) { + var consoleWriter = console.GetPermanentWriter(); + if (!options.Confirm) { - Console.Write($"Do you want to purge the queue '{options.QueueName}'? (Y/N) "); - var answer = Console.ReadLine(); - - if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + if (!consoleWriter.ConfirmYesNo($"Do you want to purge the queue '{options.QueueName}'?")) return; } @@ -52,7 +52,7 @@ namespace Tapeti.Cmd.Verbs var messageCount = channel.QueuePurge(options.QueueName); - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'."); + consoleWriter.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'."); } } } diff --git a/Tapeti.Cmd/Verbs/RemoveQueueVerb.cs b/Tapeti.Cmd/Verbs/RemoveQueueVerb.cs index f83b4d1..59066c3 100644 --- a/Tapeti.Cmd/Verbs/RemoveQueueVerb.cs +++ b/Tapeti.Cmd/Verbs/RemoveQueueVerb.cs @@ -2,6 +2,7 @@ using CommandLine; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; +using Tapeti.Cmd.ConsoleHelper; namespace Tapeti.Cmd.Verbs { @@ -31,14 +32,13 @@ namespace Tapeti.Cmd.Verbs } - public void Execute() + public void Execute(IConsole console) { + var consoleWriter = console.GetPermanentWriter(); + if (!options.Confirm) { - Console.Write($"Do you want to remove the queue '{options.QueueName}'? (Y/N) "); - var answer = Console.ReadLine(); - - if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + if (!consoleWriter.ConfirmYesNo($"Do you want to remove the queue '{options.QueueName}'?")) return; } @@ -66,10 +66,7 @@ namespace Tapeti.Cmd.Verbs { if (!options.ConfirmPurge) { - Console.Write($"There are messages remaining. Do you want to purge the queue '{options.QueueName}'? (Y/N) "); - var answer = Console.ReadLine(); - - if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase)) + if (!consoleWriter.ConfirmYesNo($"There are messages remaining. Do you want to purge the queue '{options.QueueName}'?")) return; } @@ -82,7 +79,7 @@ namespace Tapeti.Cmd.Verbs throw; } - Console.WriteLine(messageCount == 0 + consoleWriter.WriteLine(messageCount == 0 ? $"Empty or non-existent queue '{options.QueueName}' removed." : $"{messageCount} message{(messageCount != 1 ? "s" : "")} purged while removing '{options.QueueName}'."); } diff --git a/Tapeti.Cmd/Verbs/ShovelVerb.cs b/Tapeti.Cmd/Verbs/ShovelVerb.cs index c601b5a..08b96a3 100644 --- a/Tapeti.Cmd/Verbs/ShovelVerb.cs +++ b/Tapeti.Cmd/Verbs/ShovelVerb.cs @@ -1,7 +1,7 @@ using System; using CommandLine; using RabbitMQ.Client; -using Tapeti.Cmd.ASCII; +using Tapeti.Cmd.ConsoleHelper; using Tapeti.Cmd.RateLimiter; namespace Tapeti.Cmd.Verbs @@ -39,6 +39,12 @@ namespace Tapeti.Cmd.Verbs [Option("maxrate", HelpText = "The maximum amount of messages per second to shovel.")] public int? MaxRate { get; set; } + + [Option("batchsize", HelpText = "How many messages to shovel before pausing. Will wait for manual confirmation unless batchpausetime is specified.")] + public int? BatchSize { get; set; } + + [Option("batchpausetime", HelpText = "How many seconds to wait before starting the next batch if batchsize is specified.")] + public int? BatchPauseTime { get; set; } } @@ -53,7 +59,7 @@ namespace Tapeti.Cmd.Verbs } - public void Execute() + public void Execute(IConsole console) { var sourceFactory = new ConnectionFactory { @@ -81,48 +87,49 @@ namespace Tapeti.Cmd.Verbs using var targetConnection = targetFactory.CreateConnection(); using var targetChannel = targetConnection.CreateModel(); - Shovel(options, sourceChannel, targetChannel); + Shovel(console, options, sourceChannel, targetChannel); } else - Shovel(options, sourceChannel, sourceChannel); + Shovel(console, options, sourceChannel, sourceChannel); } - private static void Shovel(ShovelOptions options, IModel sourceChannel, IModel targetChannel) + private static void Shovel(IConsole console, ShovelOptions options, IModel sourceChannel, IModel targetChannel) { - var rateLimiter = GetRateLimiter(options.MaxRate); + var consoleWriter = console.GetPermanentWriter(); + var rateLimiter = RateLimiterFactory.Create(console, options.MaxRate, options.BatchSize, options.BatchPauseTime); var targetQueueName = !string.IsNullOrEmpty(options.TargetQueueName) ? options.TargetQueueName : options.QueueName; var totalCount = (int)sourceChannel.MessageCount(options.QueueName); if (options.MaxCount.HasValue && options.MaxCount.Value < totalCount) totalCount = options.MaxCount.Value; - Console.WriteLine($"Shoveling {totalCount} message{(totalCount != 1 ? "s" : "")} (actual number may differ if queue has active consumers or publishers)"); + consoleWriter.WriteLine($"Shoveling {totalCount} message{(totalCount != 1 ? "s" : "")} (actual number may differ if queue has active consumers or publishers)"); var messageCount = 0; - var cancelled = false; - Console.CancelKeyPress += (_, args) => + using (var progressBar = new ProgressBar(console, totalCount)) { - args.Cancel = true; - cancelled = true; - }; + var hasMessage = true; - using (var progressBar = new ProgressBar(totalCount)) - { - while (!cancelled && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value)) + while (!console.Cancelled && hasMessage && (!options.MaxCount.HasValue || messageCount < options.MaxCount.Value)) { - var result = sourceChannel.BasicGet(options.QueueName, false); - if (result == null) - // No more messages on the queue - break; - - // Since RabbitMQ client 6 we need to copy the body before calling another channel method - // like BasicPublish, or the published body will be corrupted if sourceChannel and targetChannel are the same - var bodyCopy = result.Body.ToArray(); - - rateLimiter.Execute(() => { + if (console.Cancelled) + return; + + var result = sourceChannel.BasicGet(options.QueueName, false); + if (result == null) + { + // No more messages on the queue + hasMessage = false; + return; + } + + // Since RabbitMQ client 6 we need to copy the body before calling another channel method + // like BasicPublish, or the published body will be corrupted if sourceChannel and targetChannel are the same + var bodyCopy = result.Body.ToArray(); + targetChannel.BasicPublish("", targetQueueName, result.BasicProperties, bodyCopy); messageCount++; @@ -135,7 +142,7 @@ namespace Tapeti.Cmd.Verbs } } - Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled."); + consoleWriter.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} shoveled."); } @@ -168,14 +175,5 @@ namespace Tapeti.Cmd.Verbs // Everything's the same, we can use the same channel return false; } - - - private static IRateLimiter GetRateLimiter(int? maxRate) - { - if (!maxRate.HasValue || maxRate.Value <= 0) - return new NoRateLimiter(); - - return new SpreadRateLimiter(maxRate.Value, TimeSpan.FromSeconds(1)); - } } } diff --git a/Tapeti.Cmd/Verbs/UnbindQueueVerb.cs b/Tapeti.Cmd/Verbs/UnbindQueueVerb.cs index 12c91f2..6dbcdeb 100644 --- a/Tapeti.Cmd/Verbs/UnbindQueueVerb.cs +++ b/Tapeti.Cmd/Verbs/UnbindQueueVerb.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using CommandLine; using RabbitMQ.Client; +using Tapeti.Cmd.ConsoleHelper; using Tapeti.Cmd.Parser; namespace Tapeti.Cmd.Verbs @@ -29,8 +30,9 @@ namespace Tapeti.Cmd.Verbs } - public void Execute() + public void Execute(IConsole console) { + var consoleWriter = console.GetPermanentWriter(); var bindings = BindingParser.Parse(options.Bindings); var factory = new ConnectionFactory @@ -48,7 +50,7 @@ namespace Tapeti.Cmd.Verbs foreach (var (exchange, routingKey) in bindings) channel.QueueUnbind(options.QueueName, exchange, routingKey); - Console.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} removed from queue {options.QueueName}."); + consoleWriter.WriteLine($"{bindings.Length} binding{(bindings.Length != 1 ? "s" : "")} removed from queue {options.QueueName}."); } } } diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst index af3c77b..a474baf 100644 --- a/docs/tapeticmd.rst +++ b/docs/tapeticmd.rst @@ -1,7 +1,9 @@ Tapeti.Cmd ========== -The Tapeti command-line tool provides various operations for managing messages. It tries to be compatible with all type of messages, but has been tested only against JSON messages, specifically those sent by Tapeti. +The Tapeti command-line tool provides various operations for managing messages and queues. + +Some operations, like shovel, are compatible with all types of messages. However, commands like import and export can assume JSON messages, specifically those sent by Tapeti, so your results may vary. Common parameters @@ -83,6 +85,12 @@ Read messages from disk as previously exported and publish them to a queue. --maxrate The maximum amount of messages per second to import. +--batchsize + How many messages to import before pausing. Will wait for manual confirmation unless batchpausetime is specified. + +--batchpausetime + How many seconds to wait before starting the next batch if batchsize is specified. + Either input, message or pipe is required. @@ -128,6 +136,12 @@ Reads messages from a queue and publishes them to another queue, optionally to a --maxrate The maximum amount of messages per second to shovel. +--batchsize + How many messages to shovel before pausing. Will wait for manual confirmation unless batchpausetime is specified. + +--batchpausetime + How many seconds to wait before starting the next batch if batchsize is specified. + Example: :: @@ -135,6 +149,99 @@ Example: .\Tapeti.Cmd.exe shovel -q tapeti.example.01 -t tapeti.example.06 +Purge +----- + +Removes all messages from a queue destructively. + +-q , --queue + *Required*. The queue to purge. + +--confirm + Confirms the purging of the specified queue. If not provided, an interactive prompt will ask for confirmation. + + +Example: +:: + + .\Tapeti.Cmd.exe purge -q tapeti.example.01 + + +Declare queue +------------- + +Declares a durable queue without arguments. + +-q , --queue + *Required*. The queue to declare. + +-b , --bindings + One or more bindings to add to the queue. Format: : + + +Example: +:: + + .\Tapeti.Cmd.exe declarequeue -q tapeti.cmd.example -b myexchange:example.message myexchange:another.message + + +Bind queue +---------- + +Add a binding to an existing queue. + +-q , --queue + *Required*. The name of the queue to add the binding(s) to. + +-b , --bindings + One or more bindings to add to the queue. Format: : + + +Example: +:: + + .\Tapeti.Cmd.exe bindqueue -q tapeti.cmd.example -b myexchange:example.message myexchange:another.message + + +Unbind queue +------------ + +Remove a binding from a queue. + +-q , --queue + *Required*. The name of the queue to remove the binding(s) from. + +-b , --bindings + One or more bindings to remove from the queue. Format: : + + +Example: +:: + + .\Tapeti.Cmd.exe unbindqueue -q tapeti.cmd.example -b myexchange:example.message myexchange:another.message + + +Remove queue +------------ + +Removes a durable queue. + +-q , --queue + *Required*. The name of the queue to remove. + +--confirm + Confirms the removal of the specified queue. If not provided, an interactive prompt will ask for confirmation. + +--confirmpurge + Confirms the removal of the specified queue even if there still are messages in the queue. If not provided, an interactive prompt will ask for confirmation. + + +Example: +:: + + .\Tapeti.Cmd.exe removequeue -q tapeti.cmd.example + + Serialization methods --------------------- From 3981a5c1475a1fdb1f739b71235a01221df4f82d Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Sat, 4 Sep 2021 14:02:35 +0200 Subject: [PATCH 4/4] Slightly clarified bindqueue command --- Tapeti.Cmd/Verbs/BindQueueVerb.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tapeti.Cmd/Verbs/BindQueueVerb.cs b/Tapeti.Cmd/Verbs/BindQueueVerb.cs index fc4416d..32c9965 100644 --- a/Tapeti.Cmd/Verbs/BindQueueVerb.cs +++ b/Tapeti.Cmd/Verbs/BindQueueVerb.cs @@ -7,7 +7,7 @@ using Tapeti.Cmd.Parser; namespace Tapeti.Cmd.Verbs { - [Verb("bindqueue", HelpText = "Add a binding to a queue.")] + [Verb("bindqueue", HelpText = "Add a binding to an existing queue.")] [ExecutableVerb(typeof(BindQueueVerb))] public class BindQueueOptions : BaseConnectionOptions {