Commit ede845a5 authored by Mark van Renswoude's avatar Mark van Renswoude

Initial commit. Completely untested.

parent 241c440a
/obj/
/bin/
/packages/
*.suo
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
</startup>
</configuration>
\ No newline at end of file
using RabbitMetaQueue.Model;
namespace RabbitMetaQueue.Domain
{
interface ITopologyOperations
{
void ExchangeDeclare(Exchange exchange);
void ExchangeDelete(Exchange exchange);
void QueueDeclare(Queue queue);
void QueueDelete(Queue queue);
void QueueBind(Queue queue, Binding binding);
void QueueUnbind(Queue queue, Binding binding);
}
}
using RabbitMetaQueue.Model;
namespace RabbitMetaQueue.Domain
{
class TopologyComparator
{
public bool AllowDelete { get; set; }
public bool AllowRecreate { get; set; }
public bool AllowUnbind { get; set; }
private ITopologyOperations topologyOperations;
public TopologyComparator()
{
AllowDelete = false;
AllowRecreate = false;
AllowUnbind = false;
}
public TopologyComparator(ITopologyOperations topologyOperations)
{
this.topologyOperations = topologyOperations;
}
public void Compare(Topology existingTopology, Topology definedTopology)
{
// ToDo Compare implementation
}
}
}
using System;
using RabbitMetaQueue.Domain;
using RabbitMetaQueue.Model;
namespace RabbitMetaQueue.Infrastructure
{
// ToDo arguments
class ConsoleTopologyOperations : ITopologyOperations
{
public void ExchangeDeclare(Exchange exchange)
{
Console.WriteLine("> Adding exchange: " + exchange.Name);
Console.WriteLine(" Type: " + exchange.ExchangeType);
Console.WriteLine(" Durable: " + exchange.Durable);
}
public void ExchangeDelete(Exchange exchange)
{
Console.WriteLine("> Deleting exchange: " + exchange.Name);
}
public void QueueDeclare(Queue queue)
{
Console.WriteLine("> Adding queue: " + queue.Name);
Console.WriteLine(" Durable: " + queue.Durable);
}
public void QueueDelete(Queue queue)
{
Console.WriteLine("> Deleting queue: " + queue.Name);
}
public void QueueBind(Queue queue, Binding binding)
{
Console.WriteLine("> Binding queue: " + queue.Name);
Console.WriteLine(" Exchange: " + binding.Exchange);
Console.WriteLine(" Routing key: " + binding.RoutingKey);
}
public void QueueUnbind(Queue queue, Binding binding)
{
Console.WriteLine("> Unbinding queue: " + queue.Name);
Console.WriteLine(" Exchange: " + binding.Exchange);
Console.WriteLine(" Routing key: " + binding.RoutingKey);
}
}
}
using System.Collections.Generic;
using RabbitMetaQueue.Domain;
using RabbitMetaQueue.Model;
namespace RabbitMetaQueue.Infrastructure
{
class MulticastTopologyOperations : ITopologyOperations
{
private readonly List<ITopologyOperations> topologyOperationsList = new List<ITopologyOperations>();
public void Add(ITopologyOperations topologyOperations)
{
topologyOperationsList.Add(topologyOperations);
}
public void ExchangeDeclare(Exchange exchange)
{
topologyOperationsList.ForEach(to => to.ExchangeDeclare(exchange));
}
public void ExchangeDelete(Exchange exchange)
{
topologyOperationsList.ForEach(to => to.ExchangeDelete(exchange));
}
public void QueueDeclare(Queue queue)
{
topologyOperationsList.ForEach(to => to.QueueDeclare(queue));
}
public void QueueDelete(Queue queue)
{
topologyOperationsList.ForEach(to => to.QueueDelete(queue));
}
public void QueueBind(Queue queue, Binding binding)
{
topologyOperationsList.ForEach(to => to.QueueBind(queue, binding));
}
public void QueueUnbind(Queue queue, Binding binding)
{
topologyOperationsList.ForEach(to => to.QueueUnbind(queue, binding));
}
}
}
using System.Collections.Generic;
using EasyNetQ.Management.Client;
using EasyNetQ.Management.Client.Model;
using RabbitMetaQueue.Domain;
using RabbitMetaQueue.Model;
namespace RabbitMetaQueue.Infrastructure
{
class RabbitMQTopologyOperations : ITopologyOperations
{
private readonly IManagementClient client;
private readonly Vhost virtualHost;
private static readonly Dictionary<Model.ExchangeType, string> ExchangeTypeMap = new Dictionary<Model.ExchangeType, string>
{
{ Model.ExchangeType.Fanout, RabbitMQ.Client.ExchangeType.Fanout },
{ Model.ExchangeType.Direct, RabbitMQ.Client.ExchangeType.Direct },
{ Model.ExchangeType.Topic, RabbitMQ.Client.ExchangeType.Topic },
{ Model.ExchangeType.Headers, RabbitMQ.Client.ExchangeType.Headers }
};
public RabbitMQTopologyOperations(IManagementClient client, Vhost virtualHost)
{
this.client = client;
this.virtualHost = virtualHost;
}
public void ExchangeDeclare(Model.Exchange exchange)
{
client.CreateExchange(new ExchangeInfo(exchange.Name, ExchangeTypeMap[exchange.ExchangeType], false, exchange.Durable, false,
MapArguments(exchange.Arguments)), virtualHost);
}
public void ExchangeDelete(Model.Exchange exchange)
{
client.DeleteExchange(client.GetExchange(exchange.Name, virtualHost));
}
public void QueueDeclare(Model.Queue queue)
{
client.CreateQueue(new QueueInfo(queue.Name, false, queue.Durable, MapInputArguments(queue.Arguments)), virtualHost);
}
public void QueueDelete(Model.Queue queue)
{
client.DeleteQueue(client.GetQueue(queue.Name, virtualHost));
}
public void QueueBind(Model.Queue queue, Model.Binding binding)
{
client.CreateBinding(client.GetExchange(binding.Exchange, virtualHost),
client.GetQueue(queue.Name, virtualHost),
new BindingInfo(binding.RoutingKey, MapInputArguments(binding.Arguments)));
}
public void QueueUnbind(Model.Queue queue, Model.Binding binding)
{
foreach (var clientBinding in client.GetBindingsForQueue(client.GetQueue(queue.Name, virtualHost)))
{
if (clientBinding.Source.Equals(binding.Exchange) &&
clientBinding.RoutingKey.Equals(binding.RoutingKey))
{
client.DeleteBinding(clientBinding);
}
}
}
public Arguments MapArguments(List<Argument> arguments)
{
if (arguments.Count == 0)
return null;
var mapping = new Arguments();
foreach (var argument in arguments)
mapping.Add(argument.Key, argument.Value);
return mapping;
}
public InputArguments MapInputArguments(List<Argument> arguments)
{
if (arguments.Count == 0)
return null;
var mapping = new InputArguments();
foreach (var argument in arguments)
mapping.Add(argument.Key, argument.Value);
return mapping;
}
}
}
using System.Collections.Generic;
using System.Linq;
using EasyNetQ.Management.Client;
using EasyNetQ.Management.Client.Model;
using RabbitMetaQueue.Model;
namespace RabbitMetaQueue.Infrastructure
{
class RabbitMQTopologyParser
{
private static readonly Dictionary<string, Model.ExchangeType> ExchangeTypeMap = new Dictionary<string, Model.ExchangeType>
{
{ RabbitMQ.Client.ExchangeType.Fanout, Model.ExchangeType.Fanout },
{ RabbitMQ.Client.ExchangeType.Direct, Model.ExchangeType.Direct },
{ RabbitMQ.Client.ExchangeType.Topic, Model.ExchangeType.Topic },
{ RabbitMQ.Client.ExchangeType.Headers, Model.ExchangeType.Headers }
};
public Topology Parse(IManagementClient client, Vhost virtualHost)
{
var topology = new Topology();
foreach (var exchange in client.GetExchanges())
{
var modelExchange = new Model.Exchange
{
Name = exchange.Name,
ExchangeType = ExchangeTypeMap[exchange.Type],
Durable = exchange.Durable,
};
MapArguments(exchange.Arguments, modelExchange.Arguments);
topology.Exchanges.Add(modelExchange);
}
foreach (var queue in client.GetQueues())
{
var modelQueue = new Model.Queue
{
Name = queue.Name,
Durable = queue.Durable
};
MapArguments(queue.Arguments, modelQueue.Arguments);
foreach (var binding in client.GetBindingsForQueue(queue))
{
var modelBinding = new Model.Binding
{
Exchange = binding.Source,
RoutingKey = binding.RoutingKey
};
MapArguments(binding.Arguments, modelBinding.Arguments);
modelQueue.Bindings.Add(modelBinding);
}
topology.Queues.Add(modelQueue);
}
return topology;
}
private void MapArguments(Arguments arguments, List<Model.Argument> modelArguments)
{
modelArguments.AddRange(arguments.Select(argument => new Argument
{
Key = argument.Key,
Value = argument.Value
}));
}
}
}
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Xml.Serialization;
namespace RabbitMetaQueue.Infrastructure
{
class XmlTopologyParser
{
public Model.Topology Parse(string filename)
{
Schema.Topology definition;
using (var stream = new FileStream(filename, FileMode.Open, FileAccess.Read))
{
var serializer = new XmlSerializer(typeof(Schema.Topology));
definition = (Schema.Topology)serializer.Deserialize(stream);
}
var model = new Model.Topology();
if (definition.Exchanges != null)
MapExchanges(definition.Exchanges, model.Exchanges);
if (definition.Queues != null)
MapQueues(definition.Queues, model.Queues);
return model;
}
private void MapExchanges(IEnumerable<Schema.Exchange> definition, List<Model.Exchange> model)
{
var exchangeTypeMap = new Dictionary<Schema.ExchangeType, Model.ExchangeType>
{
{ Schema.ExchangeType.Fanout, Model.ExchangeType.Fanout },
{ Schema.ExchangeType.Direct, Model.ExchangeType.Direct },
{ Schema.ExchangeType.Topic, Model.ExchangeType.Topic },
{ Schema.ExchangeType.Headers, Model.ExchangeType.Headers },
};
foreach (var sourceExchange in definition)
{
var destExchange = new Model.Exchange
{
Name = sourceExchange.name,
ExchangeType = exchangeTypeMap[sourceExchange.type],
Durable = sourceExchange.durable
};
if (sourceExchange.Arguments != null)
MapArguments(sourceExchange.Arguments, destExchange.Arguments);
model.Add(destExchange);
}
}
private void MapQueues(IEnumerable<Schema.Queue> definition, List<Model.Queue> model)
{
foreach (var sourceQueue in definition)
{
var destQueue = new Model.Queue
{
Name = sourceQueue.name,
Durable = sourceQueue.durable
};
if (sourceQueue.Arguments != null)
MapArguments(sourceQueue.Arguments, destQueue.Arguments);
if (sourceQueue.Bindings != null)
MapBindings(sourceQueue.Bindings, destQueue.Bindings);
model.Add(destQueue);
}
}
private void MapBindings(IEnumerable<Schema.Binding> definition, List<Model.Binding> model)
{
foreach (var sourceBinding in definition)
{
var destBinding = new Model.Binding
{
Exchange = sourceBinding.exchange,
RoutingKey = sourceBinding.routingKey
};
if (sourceBinding.Arguments != null)
MapArguments(sourceBinding.Arguments, destBinding.Arguments);
model.Add(destBinding);
}
}
private void MapArguments(IEnumerable<Schema.Argument> definition, List<Model.Argument> model)
{
model.AddRange(definition.Select(sourceArgument => new Model.Argument
{
Key = sourceArgument.name,
Value = sourceArgument.Value
}));
}
}
}
namespace RabbitMetaQueue.Model
{
class Argument
{
public string Key { get; set; }
public string Value { get; set; }
}
}
using System.Collections.Generic;
namespace RabbitMetaQueue.Model
{
class Binding
{
public string Exchange { get; set; }
public string RoutingKey { get; set; }
public List<Argument> Arguments { get; set; }
public Binding()
{
Arguments = new List<Argument>();
}
}
}
namespace RabbitMetaQueue.Model
{
class ConnectionParams
{
public string Host { get; set; }
public string VirtualHost { get; set; }
public string Username { get; set; }
public string Password { get; set; }
public ConnectionParams()
{
Host = "localhost";
VirtualHost = "/";
Username = "guest";
Password = "guest";
}
}
}
using System.Collections.Generic;
namespace RabbitMetaQueue.Model
{
public enum ExchangeType
{
Fanout,
Direct,
Topic,
Headers
}
class Exchange
{
public string Name { get; set; }
public ExchangeType ExchangeType { get; set; }
public bool Durable { get; set; }
public List<Argument> Arguments { get; private set; }
public Exchange()
{
Arguments = new List<Argument>();
}
}
}
\ No newline at end of file
using System.Collections.Generic;
namespace RabbitMetaQueue.Model
{
class Queue
{
public string Name { get; set; }
public bool Durable { get; set; }
public List<Argument> Arguments { get; private set; }
public List<Binding> Bindings { get; private set; }
public Queue()
{
Arguments = new List<Argument>();
Bindings = new List<Binding>();
}
}
}
using System.Collections.Generic;
namespace RabbitMetaQueue.Model
{
class Topology
{
public List<Exchange> Exchanges { get; private set; }
public List<Queue> Queues { get; private set; }
public Topology()
{
Exchanges = new List<Exchange>();
Queues = new List<Queue>();
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using EasyNetQ.Management.Client;
using EasyNetQ.Management.Client.Model;
using NDesk.Options;
using RabbitMetaQueue.Domain;
using RabbitMetaQueue.Infrastructure;
using RabbitMetaQueue.Model;
namespace RabbitMetaQueue
{
class Program
{
private class Options
{
public string TopologyFilename { get; set; }
public bool DryRun { get; set; }
public ConnectionParams ConnectionParams { get; private set; }
public Options()
{
ConnectionParams = new ConnectionParams();
DryRun = false;
}
}
static int Main(string[] args)
{
var options = new Options();
if (!ParseOptions(args, options))
return 1;
try
{
Console.WriteLine("Parsing topology definition");
var definedTopology = new XmlTopologyParser().Parse(options.TopologyFilename);
Console.WriteLine("Connecting to RabbitMQ server [{0}{1}]", options.ConnectionParams.Host, options.ConnectionParams.VirtualHost);
var client = Connect(options.ConnectionParams);
var virtualHost = client.GetVhost(options.ConnectionParams.VirtualHost);
Console.WriteLine("Reading existing topology");
var existingTopology = new RabbitMQTopologyParser().Parse(client, virtualHost);
var operations = new MulticastTopologyOperations();
operations.Add(new ConsoleTopologyOperations());
if (!options.DryRun)
{
Console.WriteLine("Changes WILL be applied");
operations.Add(new RabbitMQTopologyOperations(client, virtualHost));
}
else
Console.WriteLine("Dry run - changes will not be applied");
var comparator = new TopologyComparator(operations)
{
AllowDelete = true,
AllowRecreate = true,
AllowUnbind = true
};
Console.WriteLine("Comparing topology");
comparator.Compare(existingTopology, definedTopology);
Console.WriteLine("Done!");
return 0;
}
catch(Exception e)
{
Console.Write("Error: ");
Console.WriteLine(e.Message);
return 1;
}
}
private static IManagementClient Connect(ConnectionParams connectionParams)
{
return new ManagementClient(String.Format("http://{0}", connectionParams.Host),
connectionParams.Username,
connectionParams.Password);
}
private static bool ParseOptions(IEnumerable<string> args, Options options)
{
string filename = null;
var optionSet = new OptionSet
{
{
"i|input=", "The {file name} of the topology definition. Required.",
v => filename = v
},
{
"h|host=", "The host {name} of the RabbitMQ server. Defaults to localhost.",
v => options.ConnectionParams.Host = v
},
{
"v|virtualhost=", "The virtual host {name} as configured in RabbitMQ. Defaults to /.",
v => options.ConnectionParams.VirtualHost = v
},
{
"u|username=", "The {username} used in the connection. Defaults to guest.",
v => options.ConnectionParams.Username = v
},
{
"p|password=", "The {password} used in the connection. Defaults to guest.",
v => options.ConnectionParams.Password = v
},
{
"d|dryrun", "When specified, changes are not applied to the RabbitMQ server.",
v => options.DryRun = (v != null)
}
};
try
{
optionSet.Parse(args);
if (String.IsNullOrEmpty(filename))
throw new OptionException("Topology file name is required", "i");
if (!File.Exists(filename))
throw new OptionException("Topology file not found", "i");
options.TopologyFilename = filename;
return true;
}
catch(OptionException e)
{
Console.Write("Invalid arguments: ");
Console.WriteLine(e.Message);
return false;
}
}
}
}
using System.Reflection;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("RabbitMetaQueue")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Hewlett-Packard Company")]
[assembly: AssemblyProduct("RabbitMetaQueue")]