diff --git a/Connection/TapetiWorker.cs b/Connection/TapetiWorker.cs index 10ea446..c4b13ef 100644 --- a/Connection/TapetiWorker.cs +++ b/Connection/TapetiWorker.cs @@ -9,11 +9,7 @@ namespace Tapeti.Connection { public class TapetiWorker { - public string HostName { get; set; } - public int Port { get; set; } - public string VirtualHost { get; set; } - public string Username { get; set; } - public string Password { get; set; } + public TapetiConnectionParams ConnectionParams { get; set; } public string PublishExchange { get; set; } @@ -124,11 +120,11 @@ namespace Tapeti.Connection var connectionFactory = new ConnectionFactory { - HostName = HostName, - Port = Port, - VirtualHost = VirtualHost, - UserName = Username, - Password = Password, + HostName = ConnectionParams.HostName, + Port = ConnectionParams.Port, + VirtualHost = ConnectionParams.VirtualHost, + UserName = ConnectionParams.Username, + Password = ConnectionParams.Password, AutomaticRecoveryEnabled = true, RequestedHeartbeat = 30 }; diff --git a/Default/ConsoleLogger.cs b/Default/ConsoleLogger.cs new file mode 100644 index 0000000..2cb7caf --- /dev/null +++ b/Default/ConsoleLogger.cs @@ -0,0 +1,20 @@ +namespace Tapeti.Default +{ + public class ConsoleLogger : ILogger + { + public void Connect(TapetiConnectionParams connectionParams) + { + throw new System.NotImplementedException(); + } + + public void ConnectFailed(TapetiConnectionParams connectionParams) + { + throw new System.NotImplementedException(); + } + + public void ConnectSuccess(TapetiConnectionParams connectionParams) + { + throw new System.NotImplementedException(); + } + } +} diff --git a/Default/DefaultDependencyResolver.cs b/Default/DefaultDependencyResolver.cs index f9d987c..062bab7 100644 --- a/Default/DefaultDependencyResolver.cs +++ b/Default/DefaultDependencyResolver.cs @@ -14,12 +14,29 @@ namespace Tapeti.Default private readonly Lazy controllerFactory; private readonly Lazy routingKeyStrategy = new Lazy(); private readonly Lazy messageSerializer = new Lazy(); + private readonly Lazy logger; public DefaultDependencyResolver(Func publisherFactory) { controllerFactory = new Lazy(() => new DefaultControllerFactory(publisherFactory)); + + logger = new Lazy(() => + { + // http://stackoverflow.com/questions/6408588/how-to-tell-if-there-is-a-console + try + { + // ReSharper disable once UnusedVariable + var dummy = Console.WindowHeight; + + return new ConsoleLogger(); + } + catch + { + return new DevNullLogger(); + } + }); } @@ -34,6 +51,9 @@ namespace Tapeti.Default if (typeof(T) == typeof(IMessageSerializer)) return (T)(messageSerializer.Value as IMessageSerializer); + if (typeof(T) == typeof(ILogger)) + return (T)logger.Value; + return default(T); } diff --git a/Default/DevNullLogger.cs b/Default/DevNullLogger.cs new file mode 100644 index 0000000..24919fc --- /dev/null +++ b/Default/DevNullLogger.cs @@ -0,0 +1,17 @@ +namespace Tapeti.Default +{ + public class DevNullLogger : ILogger + { + public void Connect(TapetiConnectionParams connectionParams) + { + } + + public void ConnectFailed(TapetiConnectionParams connectionParams) + { + } + + public void ConnectSuccess(TapetiConnectionParams connectionParams) + { + } + } +} diff --git a/ILogger.cs b/ILogger.cs new file mode 100644 index 0000000..014f217 --- /dev/null +++ b/ILogger.cs @@ -0,0 +1,11 @@ +namespace Tapeti +{ + // This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog) + // instead of only string-based logging without control over the output. + public interface ILogger + { + void Connect(TapetiConnectionParams connectionParams); + void ConnectFailed(TapetiConnectionParams connectionParams); + void ConnectSuccess(TapetiConnectionParams connectionParams); + } +} diff --git a/Tapeti.csproj b/Tapeti.csproj index a32def8..ca4d60b 100644 --- a/Tapeti.csproj +++ b/Tapeti.csproj @@ -55,6 +55,11 @@ + + + + + diff --git a/TapetiConnection.cs b/TapetiConnection.cs index f0c1111..0a7a5cc 100644 --- a/TapetiConnection.cs +++ b/TapetiConnection.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Reflection; using System.Threading.Tasks; using Tapeti.Annotations; @@ -12,11 +11,8 @@ namespace Tapeti { public class TapetiConnection : IDisposable { - public string HostName { get; set; } = "localhost"; - public int Port { get; set; } = 5672; - public string VirtualHost { get; set; } = "/"; - public string Username { get; set; } = "guest"; - public string Password { get; set; } = "guest"; + public TapetiConnectionParams Params { get; set; } + public string PublishExchange { get; set; } = ""; public string SubscribeExchange { get; set; } = ""; @@ -29,9 +25,20 @@ namespace Tapeti private IDependencyResolver dependencyResolver; - private List registrations; - private TapetiWorker worker; - + private readonly Lazy> registrations = new Lazy>(); + private readonly Lazy worker; + + + public TapetiConnection() + { + worker = new Lazy(() => new TapetiWorker( + DependencyResolver.Resolve(), + DependencyResolver.Resolve()) + { + ConnectionParams = Params ?? new TapetiConnectionParams(), + PublishExchange = PublishExchange + }); + } public TapetiConnection WithDependencyResolver(IDependencyResolver resolver) @@ -52,8 +59,8 @@ namespace Tapeti if (!string.IsNullOrEmpty(queueAttribute.Name)) throw new ArgumentException("Dynamic queue attributes must not have a Name"); - GetRegistrations().Add(new ControllerDynamicQueueRegistration( - DependencyResolver.Resolve, + registrations.Value.Add(new ControllerDynamicQueueRegistration( + DependencyResolver.Resolve, DependencyResolver.Resolve, type, SubscribeExchange)); } @@ -62,8 +69,8 @@ namespace Tapeti if (string.IsNullOrEmpty(queueAttribute.Name)) throw new ArgumentException("Non-dynamic queue attribute must have a Name"); - GetRegistrations().Add(new ControllerQueueRegistration( - DependencyResolver.Resolve, + registrations.Value.Add(new ControllerQueueRegistration( + DependencyResolver.Resolve, type, SubscribeExchange, queueAttribute.Name)); } @@ -72,28 +79,13 @@ namespace Tapeti } - public TapetiConnection RegisterAllControllers(Assembly assembly) - { - foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute)))) - RegisterController(type); - - return this; - } - - - public TapetiConnection RegisterAllControllers() - { - return RegisterAllControllers(Assembly.GetCallingAssembly()); - } - - public async Task Subscribe() { - if (registrations == null || registrations.Count == 0) + if (!registrations.IsValueCreated || registrations.Value.Count == 0) throw new ArgumentException("No controllers registered"); - var subscriber = new TapetiSubscriber(GetWorker()); - await subscriber.BindQueues(registrations); + var subscriber = new TapetiSubscriber(worker.Value); + await subscriber.BindQueues(registrations.Value); return subscriber; } @@ -101,17 +93,14 @@ namespace Tapeti public IPublisher GetPublisher() { - return new TapetiPublisher(GetWorker()); + return new TapetiPublisher(worker.Value); } public async Task Close() { - if (worker != null) - { - await worker.Close(); - worker = null; - } + if (worker.IsValueCreated) + await worker.Value.Close(); } @@ -119,27 +108,5 @@ namespace Tapeti { Close().Wait(); } - - - protected List GetRegistrations() - { - return registrations ?? (registrations = new List()); - } - - - protected TapetiWorker GetWorker() - { - return worker ?? (worker = new TapetiWorker( - DependencyResolver.Resolve(), - DependencyResolver.Resolve()) - { - HostName = HostName, - Port = Port, - VirtualHost = VirtualHost, - Username = Username, - Password = Password, - PublishExchange = PublishExchange - }); - } } } diff --git a/TapetiConnectionExtensions.cs b/TapetiConnectionExtensions.cs new file mode 100644 index 0000000..72a0fe2 --- /dev/null +++ b/TapetiConnectionExtensions.cs @@ -0,0 +1,23 @@ +using System.Linq; +using System.Reflection; +using Tapeti.Annotations; + +namespace Tapeti +{ + public static class TapetiConnectionExtensions + { + public static TapetiConnection RegisterAllControllers(this TapetiConnection connection, Assembly assembly) + { + foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(QueueAttribute)))) + connection.RegisterController(type); + + return connection; + } + + + public static TapetiConnection RegisterAllControllers(this TapetiConnection connection) + { + return RegisterAllControllers(connection, Assembly.GetCallingAssembly()); + } + } +} diff --git a/TapetiConnectionParams.cs b/TapetiConnectionParams.cs new file mode 100644 index 0000000..2f7162c --- /dev/null +++ b/TapetiConnectionParams.cs @@ -0,0 +1,35 @@ +using System; + +namespace Tapeti +{ + public class TapetiConnectionParams + { + public string HostName { get; set; } = "localhost"; + public int Port { get; set; } = 5672; + public string VirtualHost { get; set; } = "/"; + public string Username { get; set; } = "guest"; + public string Password { get; set; } = "guest"; + + + public TapetiConnectionParams() + { + } + + public TapetiConnectionParams(Uri uri) + { + HostName = uri.Host; + VirtualHost = string.IsNullOrEmpty(uri.AbsolutePath) ? "/" : uri.AbsolutePath; + + if (!uri.IsDefaultPort) + Port = uri.Port; + + var userInfo = uri.UserInfo.Split(':'); + if (userInfo.Length > 0) + { + Username = userInfo[0]; + if (userInfo.Length > 1) + Password = userInfo[1]; + } + } + } +} diff --git a/Test/Program.cs b/Test/Program.cs index 37858a9..6e6fe03 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -31,7 +31,7 @@ namespace Test while(true) publisher.Publish(new MarcoMessage()).Wait(); - Console.ReadLine(); + //Console.ReadLine(); } } }