diff --git a/Tapeti.DataAnnotations/ConfigExtensions.cs b/Tapeti.DataAnnotations/ConfigExtensions.cs new file mode 100644 index 0000000..3001fe9 --- /dev/null +++ b/Tapeti.DataAnnotations/ConfigExtensions.cs @@ -0,0 +1,11 @@ +namespace Tapeti.DataAnnotations +{ + public static class ConfigExtensions + { + public static TapetiConfig WithDataAnnotations(this TapetiConfig config) + { + config.Use(new DataAnnotationsMiddleware()); + return config; + } + } +} diff --git a/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs b/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs new file mode 100644 index 0000000..5833630 --- /dev/null +++ b/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs @@ -0,0 +1,18 @@ +using System; +using System.ComponentModel.DataAnnotations; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.DataAnnotations +{ + public class DataAnnotationsMessageMiddleware : IMessageMiddleware + { + public Task Handle(IMessageContext context, Func next) + { + var validationContext = new ValidationContext(context.Message); + Validator.ValidateObject(context.Message, validationContext); + + return next(); + } + } +} diff --git a/Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs b/Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs new file mode 100644 index 0000000..ffbaac4 --- /dev/null +++ b/Tapeti.DataAnnotations/DataAnnotationsMiddleware.cs @@ -0,0 +1,21 @@ +using System.Collections.Generic; +using Tapeti.Config; + +namespace Tapeti.DataAnnotations +{ + public class DataAnnotationsMiddleware : ITapetiExtension + { + public void RegisterDefaults(IDependencyContainer container) + { + } + + public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) + { + return new object[] + { + new DataAnnotationsMessageMiddleware(), + new DataAnnotationsPublishMiddleware() + }; + } + } +} diff --git a/Tapeti.DataAnnotations/DataAnnotationsPublishMiddleware.cs b/Tapeti.DataAnnotations/DataAnnotationsPublishMiddleware.cs new file mode 100644 index 0000000..9cf0408 --- /dev/null +++ b/Tapeti.DataAnnotations/DataAnnotationsPublishMiddleware.cs @@ -0,0 +1,18 @@ +using System; +using System.ComponentModel.DataAnnotations; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.DataAnnotations +{ + public class DataAnnotationsPublishMiddleware : IPublishMiddleware + { + public Task Handle(IPublishContext context, Func next) + { + var validationContext = new ValidationContext(context.Message); + Validator.ValidateObject(context.Message, validationContext); + + return next(); + } + } +} diff --git a/Tapeti.DataAnnotations/Properties/AssemblyInfo.cs b/Tapeti.DataAnnotations/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..acc2c1e --- /dev/null +++ b/Tapeti.DataAnnotations/Properties/AssemblyInfo.cs @@ -0,0 +1,23 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Tapeti.DataAnnotations")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Tapeti.DataAnnotations")] +[assembly: AssemblyCopyright("")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("db1c6e81-4545-4670-a167-46e3265a9743")] diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj new file mode 100644 index 0000000..22bd208 --- /dev/null +++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj @@ -0,0 +1,64 @@ + + + + + Debug + AnyCPU + {DB1C6E81-4545-4670-A167-46E3265A9743} + Library + Properties + Tapeti.DataAnnotations + Tapeti.DataAnnotations + v4.6.1 + 512 + + + true + full + false + lib\net46\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + lib\net46\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + + {8ab4fd33-4aaa-465c-8579-9db3f3b23813} + Tapeti + + + + + \ No newline at end of file diff --git a/Tapeti.sln b/Tapeti.sln index d161beb..f6640d7 100644 --- a/Tapeti.sln +++ b/Tapeti.sln @@ -17,6 +17,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Annotations", "Tapet EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapet.Tests", "Tapet.Tests\Tapet.Tests.csproj", "{BA6BC046-5E60-410A-AE84-BE1D91561A96}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.DataAnnotations", "Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj", "{DB1C6E81-4545-4670-A167-46E3265A9743}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -51,6 +53,10 @@ Global {BA6BC046-5E60-410A-AE84-BE1D91561A96}.Debug|Any CPU.Build.0 = Debug|Any CPU {BA6BC046-5E60-410A-AE84-BE1D91561A96}.Release|Any CPU.ActiveCfg = Release|Any CPU {BA6BC046-5E60-410A-AE84-BE1D91561A96}.Release|Any CPU.Build.0 = Release|Any CPU + {DB1C6E81-4545-4670-A167-46E3265A9743}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DB1C6E81-4545-4670-A167-46E3265A9743}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DB1C6E81-4545-4670-A167-46E3265A9743}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DB1C6E81-4545-4670-A167-46E3265A9743}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs index daad63e..16a8333 100644 --- a/Tapeti/Config/IConfig.cs +++ b/Tapeti/Config/IConfig.cs @@ -9,6 +9,7 @@ namespace Tapeti.Config { IDependencyResolver DependencyResolver { get; } IReadOnlyList MessageMiddleware { get; } + IReadOnlyList PublishMiddleware { get; } IEnumerable Queues { get; } IBinding GetBinding(Delegate method); diff --git a/Tapeti/Config/IPublishContext.cs b/Tapeti/Config/IPublishContext.cs new file mode 100644 index 0000000..943b9ae --- /dev/null +++ b/Tapeti/Config/IPublishContext.cs @@ -0,0 +1,14 @@ +using RabbitMQ.Client; + +namespace Tapeti.Config +{ + public interface IPublishContext + { + IDependencyResolver DependencyResolver { get; } + + string Exchange { get; } + string RoutingKey { get; } + object Message { get; } + IBasicProperties Properties { get; } + } +} diff --git a/Tapeti/Config/IPublishMiddleware.cs b/Tapeti/Config/IPublishMiddleware.cs new file mode 100644 index 0000000..9a0eccc --- /dev/null +++ b/Tapeti/Config/IPublishMiddleware.cs @@ -0,0 +1,10 @@ +using System; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface IPublishMiddleware + { + Task Handle(IPublishContext context, Func next); + } +} diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index 9763332..3f651d8 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -5,16 +5,16 @@ using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; using Tapeti.Config; +using Tapeti.Helpers; using Tapeti.Tasks; namespace Tapeti.Connection { public class TapetiWorker { + private readonly IConfig config; public TapetiConnectionParams ConnectionParams { get; set; } - private readonly IDependencyResolver dependencyResolver; - private readonly IReadOnlyList messageMiddleware; private readonly IMessageSerializer messageSerializer; private readonly IRoutingKeyStrategy routingKeyStrategy; private readonly IExchangeStrategy exchangeStrategy; @@ -23,14 +23,13 @@ namespace Tapeti.Connection private IModel channelInstance; - public TapetiWorker(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware) + public TapetiWorker(IConfig config) { - this.dependencyResolver = dependencyResolver; - this.messageMiddleware = messageMiddleware; + this.config = config; - messageSerializer = dependencyResolver.Resolve(); - routingKeyStrategy = dependencyResolver.Resolve(); - exchangeStrategy = dependencyResolver.Resolve(); + messageSerializer = config.DependencyResolver.Resolve(); + routingKeyStrategy = config.DependencyResolver.Resolve(); + exchangeStrategy = config.DependencyResolver.Resolve(); } @@ -53,7 +52,7 @@ namespace Tapeti.Connection return taskQueue.Value.Add(async () => { - (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, dependencyResolver, bindings, messageMiddleware)); + (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware)); }).Unwrap(); } @@ -134,21 +133,33 @@ namespace Tapeti.Connection private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey) { - return taskQueue.Value.Add(async () => + var context = new PublishContext { - var messageProperties = properties ?? new BasicProperties(); - if (!messageProperties.IsTimestampPresent()) - messageProperties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()); + DependencyResolver = config.DependencyResolver, + Exchange = exchange, + RoutingKey = routingKey, + Message = message, + Properties = properties ?? new BasicProperties() + }; - if (!messageProperties.IsDeliveryModePresent()) - messageProperties.DeliveryMode = 2; // Persistent + if (!context.Properties.IsTimestampPresent()) + context.Properties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds()); - var body = messageSerializer.Serialize(message, messageProperties); + if (!context.Properties.IsDeliveryModePresent()) + context.Properties.DeliveryMode = 2; // Persistent - (await GetChannel()) - .BasicPublish(exchange, routingKey, false, messageProperties, body); - }).Unwrap(); + // ReSharper disable ImplicitlyCapturedClosure - MiddlewareHelper will not keep a reference to the lambdas + return MiddlewareHelper.GoAsync( + config.PublishMiddleware, + async (handler, next) => await handler.Handle(context, next), + () => taskQueue.Value.Add(async () => + { + var body = messageSerializer.Serialize(context.Message, context.Properties); + (await GetChannel()).BasicPublish(context.Exchange, context.RoutingKey, false, + context.Properties, body); + }).Unwrap()); + // ReSharper restore ImplicitlyCapturedClosure } /// @@ -191,5 +202,15 @@ namespace Tapeti.Connection return channelInstance; } + + + private class PublishContext : IPublishContext + { + public IDependencyResolver DependencyResolver { get; set; } + public string Exchange { get; set; } + public string RoutingKey { get; set; } + public object Message { get; set; } + public IBasicProperties Properties { get; set; } + } } } diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 71c0ec5..2ed3f8f 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -50,7 +50,9 @@ + + diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index f0c3687..12f72a3 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -26,6 +26,7 @@ namespace Tapeti private readonly List bindingMiddleware = new List(); private readonly List messageMiddleware = new List(); + private readonly List publishMiddleware = new List(); private readonly IDependencyResolver dependencyResolver; @@ -61,7 +62,7 @@ namespace Tapeti queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl))); - var config = new Config(dependencyResolver, messageMiddleware, queues); + var config = new Config(dependencyResolver, messageMiddleware, publishMiddleware, queues); (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); return config; @@ -82,6 +83,13 @@ namespace Tapeti } + public TapetiConfig Use(IPublishMiddleware handler) + { + publishMiddleware.Add(handler); + return this; + } + + public TapetiConfig Use(ITapetiExtension extension) { var container = dependencyResolver as IDependencyContainer; @@ -100,6 +108,8 @@ namespace Tapeti Use((IBindingMiddleware)middleware); else if (middleware is IMessageMiddleware) Use((IMessageMiddleware)middleware); + else if (middleware is IPublishMiddleware) + Use((IPublishMiddleware)middleware); else throw new ArgumentException($"Unsupported middleware implementation: {middleware.GetType().Name}"); } @@ -319,15 +329,17 @@ namespace Tapeti { public IDependencyResolver DependencyResolver { get; } public IReadOnlyList MessageMiddleware { get; } + public IReadOnlyList PublishMiddleware { get; } public IEnumerable Queues { get; } private readonly Dictionary bindingMethodLookup; - public Config(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IEnumerable queues) + public Config(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IReadOnlyList publishMiddleware, IEnumerable queues) { DependencyResolver = dependencyResolver; MessageMiddleware = messageMiddleware; + PublishMiddleware = publishMiddleware; Queues = queues.ToList(); bindingMethodLookup = Queues.SelectMany(q => q.Bindings).ToDictionary(b => b.Method, b => b); diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index 0b0d0ae..848dbac 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -19,7 +19,7 @@ namespace Tapeti this.config = config; (config.DependencyResolver as IDependencyContainer)?.RegisterDefault(GetPublisher); - worker = new Lazy(() => new TapetiWorker(config.DependencyResolver, config.MessageMiddleware) + worker = new Lazy(() => new TapetiWorker(config) { ConnectionParams = Params ?? new TapetiConnectionParams() }); diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index c0fa8bb..8660526 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -1,4 +1,5 @@ using System; +using System.ComponentModel.DataAnnotations; using System.Threading.Tasks; using Tapeti; using Tapeti.Annotations; @@ -114,12 +115,14 @@ namespace Test [Request(Response = typeof(PoloConfirmationResponseMessage))] public class PoloConfirmationRequestMessage { + [Required] public Guid StoredInState { get; set; } } public class PoloConfirmationResponseMessage { + [Required] public Guid ShouldMatchState { get; set; } } } diff --git a/Test/Program.cs b/Test/Program.cs index 3a81a3d..734d792 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -1,8 +1,10 @@ using System; using SimpleInjector; using Tapeti; +using Tapeti.DataAnnotations; using Tapeti.Flow; using Tapeti.Flow.SQL; +using Tapeti.Helpers; using Tapeti.SimpleInjector; namespace Test @@ -24,6 +26,7 @@ namespace Test var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) .WithFlow() //.WithFlowSqlRepository("data source=localhost;initial catalog=lef;integrated security=True;multipleactiveresultsets=True", 1) + .WithDataAnnotations() .RegisterAllControllers() .Build(); diff --git a/Test/Test.csproj b/Test/Test.csproj index 62a3b1a..f0d07d7 100644 --- a/Test/Test.csproj +++ b/Test/Test.csproj @@ -39,6 +39,7 @@ True + @@ -63,6 +64,10 @@ {c4897d64-d04e-4ae9-bd98-d64295d1d13a} Tapeti.Annotations + + {db1c6e81-4545-4670-a167-46e3265a9743} + Tapeti.DataAnnotations + {6de7b122-eb6a-46b8-aeaf-f84dde18f9c7} Tapeti.Flow.SQL