Commit 27ec2e70 authored by Mark van Renswoude's avatar Mark van Renswoude

Implemented adding and updating exchanges, queues and bindings

- ToDo: removing
Refactored Arguments properties to Dictionary
Fixed Internal Server Error due to null arguments
parent b499b562
using RabbitMetaQueue.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using RabbitMetaQueue.Model;
namespace RabbitMetaQueue.Domain
{
......@@ -8,7 +11,8 @@ namespace RabbitMetaQueue.Domain
public bool AllowRecreate { get; set; }
public bool AllowUnbind { get; set; }
private ITopologyWriter topologyWriter;
private readonly ITopologyWriter topologyWriter;
private List<string> volatileExchanges = new List<string>();
public TopologyComparator()
......@@ -27,7 +31,133 @@ namespace RabbitMetaQueue.Domain
public void Compare(Topology existingTopology, Topology definedTopology)
{
// ToDo Compare implementation
volatileExchanges.Clear();
// Added or updated exchanges
foreach (var exchange in definedTopology.Exchanges)
{
var existingExchange = existingTopology.Exchanges.FirstOrDefault(e => e.Name.Equals(exchange.Name, StringComparison.InvariantCulture));
if (existingExchange != null)
UpdateExchange(exchange, existingExchange);
else
CreateExchange(exchange);
}
// ToDo removed exchanges
// Added or updated queues
foreach (var queue in definedTopology.Queues)
{
var existingQueue = existingTopology.Queues.FirstOrDefault(q => q.Name.Equals(queue.Name, StringComparison.InvariantCulture));
if (existingQueue != null)
UpdateQueue(queue, existingQueue);
else
CreateQueue(queue);
}
// ToDo removed queues
}
private void CreateExchange(Exchange exchange)
{
topologyWriter.CreateExchange(exchange);
}
private void UpdateExchange(Exchange exchange, Exchange existingExchange)
{
if (AllowRecreate && !SameExchange(exchange, existingExchange))
{
topologyWriter.DeleteExchange(existingExchange);
topologyWriter.CreateExchange(exchange);
// Bindings need to be recreated as well
volatileExchanges.Add(exchange.Name);
}
}
private void CreateQueue(Queue queue)
{
topologyWriter.CreateQueue(queue);
foreach (var binding in queue.Bindings)
CreateBinding(queue, binding);
}
private void UpdateQueue(Queue queue, Queue existingQueue)
{
if (AllowRecreate && !SameQueue(queue, existingQueue))
{
topologyWriter.DeleteQueue(existingQueue);
CreateQueue(queue);
}
else
{
foreach (var binding in queue.Bindings)
{
var existingBinding = existingQueue.Bindings.FirstOrDefault(b => b.Exchange.Equals(binding.Exchange, StringComparison.InvariantCulture) &&
b.RoutingKey.Equals(binding.RoutingKey, StringComparison.InvariantCulture));
if (existingBinding != null)
UpdateBinding(queue, binding, existingBinding);
else
CreateBinding(queue, binding);
}
// ToDo removed bindings
}
}
private void CreateBinding(Queue queue, Binding binding)
{
topologyWriter.CreateBinding(queue, binding);
}
private void UpdateBinding(Queue queue, Binding binding, Binding existingBinding)
{
if (volatileExchanges.Contains(binding.Exchange, StringComparer.InvariantCulture))
{
CreateBinding(queue, binding);
}
else if (AllowRecreate && !SameBinding(binding, existingBinding))
{
topologyWriter.DeleteBinding(queue, existingBinding);
CreateBinding(queue, binding);
}
}
private static bool SameExchange(Exchange exchange, Exchange existingExchange)
{
return (exchange.Durable == existingExchange.Durable) &&
SameArguments(exchange.Arguments, existingExchange.Arguments);
}
private static bool SameQueue(Queue queue, Queue existingQueue)
{
return (queue.Durable == existingQueue.Durable) &&
SameArguments(queue.Arguments, existingQueue.Arguments);
}
private static bool SameBinding(Binding binding, Binding existingBinding)
{
return SameArguments(binding.Arguments, existingBinding.Arguments);
}
private static bool SameArguments(Arguments arguments, Arguments existingArguments)
{
if (arguments.Count != existingArguments.Count)
return false;
string value;
return arguments.All(a => existingArguments.TryGetValue(a.Key, out value) &&
value.Equals(a.Value, StringComparison.InvariantCulture));
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using EasyNetQ.Management.Client;
using EasyNetQ.Management.Client.Model;
using RabbitMetaQueue.Model;
......@@ -67,17 +66,14 @@ namespace RabbitMetaQueue.Infrastructure
}
private void MapArguments(Arguments arguments, List<Model.Argument> modelArguments)
private void MapArguments(EasyNetQ.Management.Client.Model.Arguments arguments, Model.Arguments modelArguments)
{
modelArguments.AddRange(arguments.Select(argument => new Argument
{
Key = argument.Key,
Value = argument.Value
}));
foreach (var argument in arguments)
modelArguments.Add(argument.Key, argument.Value);
}
private bool IsSystemExchange(string name)
private static bool IsSystemExchange(string name)
{
return (String.IsNullOrEmpty(name) ||
name.StartsWith("amq.", StringComparison.InvariantCulture));
......
......@@ -2,7 +2,6 @@
using EasyNetQ.Management.Client;
using EasyNetQ.Management.Client.Model;
using RabbitMetaQueue.Domain;
using RabbitMetaQueue.Model;
namespace RabbitMetaQueue.Infrastructure
{
......@@ -73,11 +72,8 @@ namespace RabbitMetaQueue.Infrastructure
}
public Arguments MapArguments(List<Argument> arguments)
public Arguments MapArguments(Model.Arguments arguments)
{
if (arguments.Count == 0)
return null;
var mapping = new Arguments();
foreach (var argument in arguments)
mapping.Add(argument.Key, argument.Value);
......@@ -86,11 +82,8 @@ namespace RabbitMetaQueue.Infrastructure
}
public InputArguments MapInputArguments(List<Argument> arguments)
public InputArguments MapInputArguments(Model.Arguments arguments)
{
if (arguments.Count == 0)
return null;
var mapping = new InputArguments();
foreach (var argument in arguments)
mapping.Add(argument.Key, argument.Value);
......
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Xml.Serialization;
namespace RabbitMetaQueue.Infrastructure
......@@ -96,13 +95,10 @@ namespace RabbitMetaQueue.Infrastructure
}
private void MapArguments(IEnumerable<Schema.Argument> definition, List<Model.Argument> model)
private void MapArguments(IEnumerable<Schema.Argument> definition, Model.Arguments model)
{
model.AddRange(definition.Select(sourceArgument => new Model.Argument
{
Key = sourceArgument.name,
Value = sourceArgument.Value
}));
foreach (var argument in definition)
model.Add(argument.name, argument.Value);
}
}
}
namespace RabbitMetaQueue.Model
{
class Argument
{
public string Key { get; set; }
public string Value { get; set; }
}
}
using System.Collections.Generic;
namespace RabbitMetaQueue.Model
{
class Arguments : Dictionary<string, string> { }
}
using System.Collections.Generic;
namespace RabbitMetaQueue.Model
namespace RabbitMetaQueue.Model
{
class Binding
{
public string Exchange { get; set; }
public string RoutingKey { get; set; }
public List<Argument> Arguments { get; set; }
public Arguments Arguments { get; set; }
public Binding()
{
Arguments = new List<Argument>();
Arguments = new Arguments();
}
}
}
using System.Collections.Generic;
namespace RabbitMetaQueue.Model
namespace RabbitMetaQueue.Model
{
public enum ExchangeType
{
......@@ -15,12 +13,12 @@ namespace RabbitMetaQueue.Model
public string Name { get; set; }
public ExchangeType ExchangeType { get; set; }
public bool Durable { get; set; }
public List<Argument> Arguments { get; private set; }
public Arguments Arguments { get; private set; }
public Exchange()
{
Arguments = new List<Argument>();
Arguments = new Arguments();
}
}
}
\ No newline at end of file
......@@ -6,12 +6,12 @@ namespace RabbitMetaQueue.Model
{
public string Name { get; set; }
public bool Durable { get; set; }
public List<Argument> Arguments { get; private set; }
public Arguments Arguments { get; private set; }
public List<Binding> Bindings { get; private set; }
public Queue()
{
Arguments = new List<Argument>();
Arguments = new Arguments();
Bindings = new List<Binding>();
}
}
......
......@@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using EasyNetQ.Management.Client;
using EasyNetQ.Management.Client.Model;
using NDesk.Options;
using RabbitMetaQueue.Domain;
using RabbitMetaQueue.Infrastructure;
......
......@@ -58,7 +58,7 @@
<Compile Include="Infrastructure\RabbitMQTopologyReader.cs" />
<Compile Include="Domain\TopologyComparator.cs" />
<Compile Include="Infrastructure\XmlTopologyReader.cs" />
<Compile Include="Model\Argument.cs" />
<Compile Include="Model\Arguments.cs" />
<Compile Include="Model\Binding.cs" />
<Compile Include="Model\Exchange.cs" />
<Compile Include="Model\ConnectionParams.cs" />
......
<?xml version="1.0" encoding="UTF-8"?>
<Topology xmlns="http://schema.x2software.net/RabbitMetaQueue" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://schema.x2software.net/RabbitMetaQueue ..\Schema\Topology.xsd">
<Exchanges>
<Exchange name="RMetaQ.test1" type="Topic" />
<Exchange name="RMetaQ.test2" type="Topic" />
</Exchanges>
<Queues>
<Queue name="RMetaQ.queue1">
<Arguments>
<Argument name="x-dead-letter-exchange">metatest2</Argument>
</Arguments>
<Bindings>
<Binding exchange="RMetaQ.test1" routingKey="some.routing.key" />
</Bindings>
</Queue>
<Queue name="RMetaQ.deadletter">
<Bindings>
<Binding exchange="RMetaQ.test2" routingKey="#" />
</Bindings>
</Queue>
</Queues>
</Topology>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment