diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs
index af5690b..1c2118a 100644
--- a/Tapeti.Serilog/TapetiSeriLogger.cs
+++ b/Tapeti.Serilog/TapetiSeriLogger.cs
@@ -6,16 +6,18 @@ using ISerilogLogger = Serilog.ILogger;
namespace Tapeti.Serilog
{
- ///
///
/// Implements the Tapeti ILogger interface for Serilog output.
///
- public class TapetiSeriLogger: ILogger
+ public class TapetiSeriLogger: IBindingLogger
{
private readonly ISerilogLogger seriLogger;
- ///
+ ///
+ /// Create a Tapeti ILogger implementation to output to the specified Serilog.ILogger interface
+ ///
+ /// The Serilog.ILogger implementation to output Tapeti log message to
public TapetiSeriLogger(ISerilogLogger seriLogger)
{
this.seriLogger = seriLogger;
@@ -82,6 +84,39 @@ namespace Tapeti.Serilog
contextLogger.Error(exception, "Tapeti: exception in message handler");
}
+ ///
+ public void QueueDeclare(string queueName, bool durable, bool passive)
+ {
+ if (passive)
+ seriLogger.Information("Tapeti: verifying durable queue {queueName}", queueName);
+ else
+ seriLogger.Information("Tapeti: declaring {queueType} queue {queueName}", durable ? "durable" : "dynamic", queueName);
+ }
+
+ ///
+ public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
+ {
+ seriLogger.Information("Tapeti: binding {queueName} to exchange {exchange} with routing key {routingKey}",
+ queueName,
+ exchange,
+ routingKey);
+ }
+
+ ///
+ public void QueueUnbind(string queueName, string exchange, string routingKey)
+ {
+ seriLogger.Information("Tapeti: removing binding for {queueName} to exchange {exchange} with routing key {routingKey}",
+ queueName,
+ exchange,
+ routingKey);
+ }
+
+ ///
+ public void ExchangeDeclare(string exchange)
+ {
+ seriLogger.Information("Tapeti: declaring exchange {exchange}", exchange);
+ }
+
///
public void QueueObsolete(string queueName, bool deleted, uint messageCount)
{
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index 9a91ed0..eb04704 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -237,22 +237,29 @@ namespace Tapeti.Connection
{
var existingBindings = (await GetQueueBindings(queueName)).ToList();
var currentBindings = bindings.ToList();
+ var bindingLogger = logger as IBindingLogger;
await Queue(channel =>
{
if (cancellationToken.IsCancellationRequested)
return;
+ bindingLogger?.QueueDeclare(queueName, true, false);
channel.QueueDeclare(queueName, true, false, false);
+
foreach (var binding in currentBindings.Except(existingBindings))
{
DeclareExchange(channel, binding.Exchange);
+ bindingLogger?.QueueBind(queueName, true, binding.Exchange, binding.RoutingKey);
channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
}
foreach (var deletedBinding in existingBindings.Except(currentBindings))
+ {
+ bindingLogger?.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey);
channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey);
+ }
});
}
@@ -264,6 +271,7 @@ namespace Tapeti.Connection
if (cancellationToken.IsCancellationRequested)
return;
+ (logger as IBindingLogger)?.QueueDeclare(queueName, true, true);
channel.QueueDeclarePassive(queueName);
});
}
@@ -285,7 +293,7 @@ namespace Tapeti.Connection
});
deletedQueues.Add(queueName);
- logger.QueueObsolete(queueName, true, deletedMessages);
+ (logger as IBindingLogger)?.QueueObsolete(queueName, true, deletedMessages);
return;
}
@@ -321,7 +329,7 @@ namespace Tapeti.Connection
channel.QueueDelete(queueName, false, true);
deletedQueues.Add(queueName);
- logger.QueueObsolete(queueName, true, 0);
+ (logger as IBindingLogger)?.QueueObsolete(queueName, true, 0);
}
catch (OperationInterruptedException e)
{
@@ -344,7 +352,7 @@ namespace Tapeti.Connection
channel.QueueUnbind(queueName, binding.Exchange, binding.RoutingKey);
}
- logger.QueueObsolete(queueName, false, queueInfo.Messages);
+ (logger as IBindingLogger)?.QueueObsolete(queueName, false, queueInfo.Messages);
}
} while (retry);
});
@@ -355,6 +363,7 @@ namespace Tapeti.Connection
public async Task DynamicQueueDeclare(CancellationToken cancellationToken, string queuePrefix = null)
{
string queueName = null;
+ var bindingLogger = logger as IBindingLogger;
await Queue(channel =>
{
@@ -364,10 +373,14 @@ namespace Tapeti.Connection
if (!string.IsNullOrEmpty(queuePrefix))
{
queueName = queuePrefix + "." + Guid.NewGuid().ToString("N");
+ bindingLogger?.QueueDeclare(queueName, false, false);
channel.QueueDeclare(queueName);
}
else
+ {
queueName = channel.QueueDeclare().QueueName;
+ bindingLogger?.QueueDeclare(queueName, false, false);
+ }
});
return queueName;
@@ -381,8 +394,9 @@ namespace Tapeti.Connection
if (cancellationToken.IsCancellationRequested)
return;
- DeclareExchange(channel, binding.Exchange);
- channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
+ DeclareExchange(channel, binding.Exchange);
+ (logger as IBindingLogger)?.QueueBind(queueName, false, binding.Exchange, binding.RoutingKey);
+ channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
});
}
@@ -554,6 +568,7 @@ namespace Tapeti.Connection
if (declaredExchanges.Contains(exchange))
return;
+ (logger as IBindingLogger)?.ExchangeDeclare(exchange);
channel.ExchangeDeclare(exchange, "topic", true);
declaredExchanges.Add(exchange);
}
diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs
index e942ae0..024bbac 100644
--- a/Tapeti/Default/ConsoleLogger.cs
+++ b/Tapeti/Default/ConsoleLogger.cs
@@ -7,7 +7,7 @@ namespace Tapeti.Default
///
/// Default ILogger implementation for console applications.
///
- public class ConsoleLogger : ILogger
+ public class ConsoleLogger : IBindingLogger
{
///
public void Connect(IConnectContext connectContext)
@@ -52,6 +52,32 @@ namespace Tapeti.Default
Console.WriteLine(exception);
}
+ ///
+ public void QueueDeclare(string queueName, bool durable, bool passive)
+ {
+ Console.WriteLine(passive
+ ? $"[Tapeti] Declaring {(durable ? "durable" : "dynamic")} queue {queueName}"
+ : $"[Tapeti] Verifying durable queue {queueName}");
+ }
+
+ ///
+ public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
+ {
+ Console.WriteLine($"[Tapeti] Binding {queueName} to exchange {exchange} with routing key {routingKey}");
+ }
+
+ ///
+ public void QueueUnbind(string queueName, string exchange, string routingKey)
+ {
+ Console.WriteLine($"[Tapeti] Removing binding for {queueName} to exchange {exchange} with routing key {routingKey}");
+ }
+
+ ///
+ public void ExchangeDeclare(string exchange)
+ {
+ Console.WriteLine($"[Tapeti] Declaring exchange {exchange}");
+ }
+
///
public void QueueObsolete(string queueName, bool deleted, uint messageCount)
{
diff --git a/Tapeti/Default/DevNullLogger.cs b/Tapeti/Default/DevNullLogger.cs
index bbaf911..9e712d7 100644
--- a/Tapeti/Default/DevNullLogger.cs
+++ b/Tapeti/Default/DevNullLogger.cs
@@ -33,10 +33,5 @@ namespace Tapeti.Default
public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
}
-
- ///
- public void QueueObsolete(string queueName, bool deleted, uint messageCount)
- {
- }
}
}
diff --git a/Tapeti/ILogger.cs b/Tapeti/ILogger.cs
index 0a16ba0..8fbabd8 100644
--- a/Tapeti/ILogger.cs
+++ b/Tapeti/ILogger.cs
@@ -110,6 +110,48 @@ namespace Tapeti
///
/// Indicates the action taken by the exception handler
void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult);
+ }
+
+
+ ///
+ /// Optional interface which can be implemented by an ILogger implementation to log all operations
+ /// related to declaring queues and bindings.
+ ///
+ public interface IBindingLogger : ILogger
+ {
+ ///
+ /// Called before a queue is declared for durable queues and dynamic queues with a prefix. Called after
+ /// a queue is declared for dynamic queues without a name with the queue name as determined by the RabbitMQ server.
+ /// Will always be called even if the queue already existed, as that information is not returned by the RabbitMQ server/client.
+ ///
+ /// The name of the queue that is declared
+ /// Indicates if the queue is durable or dynamic
+ /// Indicates whether the queue was declared as passive (to verify durable queues)
+ void QueueDeclare(string queueName, bool durable, bool passive);
+
+ ///
+ /// Called before a binding is added to a queue.
+ ///
+ /// The name of the queue the binding is created for
+ /// Indicates if the queue is durable or dynamic
+ /// The exchange for the binding
+ /// The routing key for the binding
+ void QueueBind(string queueName, bool durable, string exchange, string routingKey);
+
+ ///
+ /// Called before a binding is removed from a durable queue.
+ ///
+ /// The name of the queue the binding is removed from
+ /// The exchange of the binding
+ /// The routing key of the binding
+ void QueueUnbind(string queueName, string exchange, string routingKey);
+
+ ///
+ /// Called before an exchange is declared. Will always be called once for each exchange involved in a dynamic queue,
+ /// durable queue with auto-declare bindings enabled or published messages, even if the exchange already existed.
+ ///
+ /// The name of the exchange that is declared
+ void ExchangeDeclare(string exchange);
///
/// Called when a queue is determined to be obsolete.
diff --git a/docs/README.md b/docs/README.md
new file mode 100644
index 0000000..ccea1cc
--- /dev/null
+++ b/docs/README.md
@@ -0,0 +1,14 @@
+The documentation can be built locally using Sphinx. Install Python 3 (choco install python on Windows),
+then install sphinx and the ReadTheDocs theme:
+
+```pip install sphinx sphinx_rtd_theme```
+
+To build the HTML output, run:
+
+```.\make.bat html```
+
+
+
+To use the auto reloading server (rundev.bat), install the sphinx-autobuild package:
+
+```pip install sphinx-autobuild```
\ No newline at end of file
diff --git a/docs/index.rst b/docs/index.rst
index 0bc369d..cbc0898 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -10,4 +10,5 @@ Tapeti documentation
indepth
dataannotations
flow
- transient
\ No newline at end of file
+ transient
+ tapeticmd
\ No newline at end of file
diff --git a/docs/tapeticmd.rst b/docs/tapeticmd.rst
new file mode 100644
index 0000000..c7cc559
--- /dev/null
+++ b/docs/tapeticmd.rst
@@ -0,0 +1,177 @@
+Tapeti.Cmd
+==========
+
+The Tapeti command-line tool provides various operations for managing messages. It tries to be compatible with all type of messages, but has been tested only against JSON messages, specifically those sent by Tapeti.
+
+
+Common parameters
+-----------------
+
+All operations support the following parameters. All are optional.
+
+-h , --host
+ Specifies the hostname of the RabbitMQ server. Default is localhost.
+
+--port
+ Specifies the AMQP port of the RabbitMQ server. Default is 5672.
+
+-v , --virtualhost
+ Specifies the virtual host to use. Default is /.
+
+-u , --username
+ Specifies the username to authenticate the connection. Default is guest.
+
+-p , --password
+ Specifies the password to authenticate the connection. Default is guest.
+
+
+Example:
+::
+
+ .\Tapeti.Cmd.exe -h rabbitmq-server -u tapeti -p topsecret
+
+
+
+Export
+------
+
+Fetches messages from a queue and writes them to disk.
+
+-q , --queue
+ *Required*. The queue to read the messages from.
+
+-o , --output
+ *Required*. Path or filename (depending on the chosen serialization method) where the messages will be output to.
+
+-r, --remove
+ If specified messages are acknowledged and removed from the queue. If not messages are kept.
+
+-n , --maxcount
+ Maximum number of messages to retrieve from the queue. If not specified all messages are exported.
+
+-s , --serialization
+ The method used to serialize the message for import or export. Valid options: SingleFileJSON, EasyNetQHosepipe. Defaults to SingleFileJSON. See Serialization methods below for more information.
+
+
+Example:
+::
+
+ .\Tapeti.Cmd.exe export -q tapeti.example.01 -o dump.json
+
+
+
+Import
+------
+
+Read messages from disk as previously exported and publish them to a queue.
+
+-i