Fixed #5: Implement message validation

This commit is contained in:
Mark van Renswoude 2017-02-12 21:43:30 +01:00
parent eb017e7b63
commit e881ed94c1
17 changed files with 254 additions and 22 deletions

View File

@ -0,0 +1,11 @@
namespace Tapeti.DataAnnotations
{
public static class ConfigExtensions
{
public static TapetiConfig WithDataAnnotations(this TapetiConfig config)
{
config.Use(new DataAnnotationsMiddleware());
return config;
}
}
}

View File

@ -0,0 +1,18 @@
using System;
using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
public class DataAnnotationsMessageMiddleware : IMessageMiddleware
{
public Task Handle(IMessageContext context, Func<Task> next)
{
var validationContext = new ValidationContext(context.Message);
Validator.ValidateObject(context.Message, validationContext);
return next();
}
}
}

View File

@ -0,0 +1,21 @@
using System.Collections.Generic;
using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
public class DataAnnotationsMiddleware : ITapetiExtension
{
public void RegisterDefaults(IDependencyContainer container)
{
}
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
{
return new object[]
{
new DataAnnotationsMessageMiddleware(),
new DataAnnotationsPublishMiddleware()
};
}
}
}

View File

@ -0,0 +1,18 @@
using System;
using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
public class DataAnnotationsPublishMiddleware : IPublishMiddleware
{
public Task Handle(IPublishContext context, Func<Task> next)
{
var validationContext = new ValidationContext(context.Message);
Validator.ValidateObject(context.Message, validationContext);
return next();
}
}
}

View File

@ -0,0 +1,23 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Tapeti.DataAnnotations")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Tapeti.DataAnnotations")]
[assembly: AssemblyCopyright("")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("db1c6e81-4545-4670-a167-46e3265a9743")]

View File

@ -0,0 +1,64 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{DB1C6E81-4545-4670-A167-46E3265A9743}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Tapeti.DataAnnotations</RootNamespace>
<AssemblyName>Tapeti.DataAnnotations</AssemblyName>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>lib\net46\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>lib\net46\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.ComponentModel.DataAnnotations" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="DataAnnotationsPublishMiddleware.cs" />
<Compile Include="DataAnnotationsMessageMiddleware.cs" />
<Compile Include="DataAnnotationsMiddleware.cs" />
<Compile Include="ConfigExtensions.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti\Tapeti.csproj">
<Project>{8ab4fd33-4aaa-465c-8579-9db3f3b23813}</Project>
<Name>Tapeti</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

View File

@ -17,6 +17,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Annotations", "Tapet
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapet.Tests", "Tapet.Tests\Tapet.Tests.csproj", "{BA6BC046-5E60-410A-AE84-BE1D91561A96}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.DataAnnotations", "Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj", "{DB1C6E81-4545-4670-A167-46E3265A9743}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -51,6 +53,10 @@ Global
{BA6BC046-5E60-410A-AE84-BE1D91561A96}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BA6BC046-5E60-410A-AE84-BE1D91561A96}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BA6BC046-5E60-410A-AE84-BE1D91561A96}.Release|Any CPU.Build.0 = Release|Any CPU
{DB1C6E81-4545-4670-A167-46E3265A9743}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DB1C6E81-4545-4670-A167-46E3265A9743}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DB1C6E81-4545-4670-A167-46E3265A9743}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DB1C6E81-4545-4670-A167-46E3265A9743}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

View File

@ -9,6 +9,7 @@ namespace Tapeti.Config
{
IDependencyResolver DependencyResolver { get; }
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
IReadOnlyList<IPublishMiddleware> PublishMiddleware { get; }
IEnumerable<IQueue> Queues { get; }
IBinding GetBinding(Delegate method);

View File

@ -0,0 +1,14 @@
using RabbitMQ.Client;
namespace Tapeti.Config
{
public interface IPublishContext
{
IDependencyResolver DependencyResolver { get; }
string Exchange { get; }
string RoutingKey { get; }
object Message { get; }
IBasicProperties Properties { get; }
}
}

View File

@ -0,0 +1,10 @@
using System;
using System.Threading.Tasks;
namespace Tapeti.Config
{
public interface IPublishMiddleware
{
Task Handle(IPublishContext context, Func<Task> next);
}
}

View File

@ -5,16 +5,16 @@ using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing;
using Tapeti.Config;
using Tapeti.Helpers;
using Tapeti.Tasks;
namespace Tapeti.Connection
{
public class TapetiWorker
{
private readonly IConfig config;
public TapetiConnectionParams ConnectionParams { get; set; }
private readonly IDependencyResolver dependencyResolver;
private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware;
private readonly IMessageSerializer messageSerializer;
private readonly IRoutingKeyStrategy routingKeyStrategy;
private readonly IExchangeStrategy exchangeStrategy;
@ -23,14 +23,13 @@ namespace Tapeti.Connection
private IModel channelInstance;
public TapetiWorker(IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware)
public TapetiWorker(IConfig config)
{
this.dependencyResolver = dependencyResolver;
this.messageMiddleware = messageMiddleware;
this.config = config;
messageSerializer = dependencyResolver.Resolve<IMessageSerializer>();
routingKeyStrategy = dependencyResolver.Resolve<IRoutingKeyStrategy>();
exchangeStrategy = dependencyResolver.Resolve<IExchangeStrategy>();
messageSerializer = config.DependencyResolver.Resolve<IMessageSerializer>();
routingKeyStrategy = config.DependencyResolver.Resolve<IRoutingKeyStrategy>();
exchangeStrategy = config.DependencyResolver.Resolve<IExchangeStrategy>();
}
@ -53,7 +52,7 @@ namespace Tapeti.Connection
return taskQueue.Value.Add(async () =>
{
(await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, dependencyResolver, bindings, messageMiddleware));
(await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware));
}).Unwrap();
}
@ -134,21 +133,33 @@ namespace Tapeti.Connection
private Task Publish(object message, IBasicProperties properties, string exchange, string routingKey)
{
return taskQueue.Value.Add(async () =>
var context = new PublishContext
{
var messageProperties = properties ?? new BasicProperties();
if (!messageProperties.IsTimestampPresent())
messageProperties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds());
DependencyResolver = config.DependencyResolver,
Exchange = exchange,
RoutingKey = routingKey,
Message = message,
Properties = properties ?? new BasicProperties()
};
if (!messageProperties.IsDeliveryModePresent())
messageProperties.DeliveryMode = 2; // Persistent
if (!context.Properties.IsTimestampPresent())
context.Properties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds());
var body = messageSerializer.Serialize(message, messageProperties);
if (!context.Properties.IsDeliveryModePresent())
context.Properties.DeliveryMode = 2; // Persistent
(await GetChannel())
.BasicPublish(exchange, routingKey, false, messageProperties, body);
}).Unwrap();
// ReSharper disable ImplicitlyCapturedClosure - MiddlewareHelper will not keep a reference to the lambdas
return MiddlewareHelper.GoAsync(
config.PublishMiddleware,
async (handler, next) => await handler.Handle(context, next),
() => taskQueue.Value.Add(async () =>
{
var body = messageSerializer.Serialize(context.Message, context.Properties);
(await GetChannel()).BasicPublish(context.Exchange, context.RoutingKey, false,
context.Properties, body);
}).Unwrap());
// ReSharper restore ImplicitlyCapturedClosure
}
/// <remarks>
@ -191,5 +202,15 @@ namespace Tapeti.Connection
return channelInstance;
}
private class PublishContext : IPublishContext
{
public IDependencyResolver DependencyResolver { get; set; }
public string Exchange { get; set; }
public string RoutingKey { get; set; }
public object Message { get; set; }
public IBasicProperties Properties { get; set; }
}
}
}

View File

@ -50,7 +50,9 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Config\IPublishContext.cs" />
<Compile Include="Config\IMessageFilterMiddleware.cs" />
<Compile Include="Config\IPublishMiddleware.cs" />
<Compile Include="Connection\TapetiConsumer.cs" />
<Compile Include="Connection\TapetiPublisher.cs" />
<Compile Include="Connection\TapetiSubscriber.cs" />

View File

@ -26,6 +26,7 @@ namespace Tapeti
private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>();
private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>();
private readonly List<IPublishMiddleware> publishMiddleware = new List<IPublishMiddleware>();
private readonly IDependencyResolver dependencyResolver;
@ -61,7 +62,7 @@ namespace Tapeti
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl)));
var config = new Config(dependencyResolver, messageMiddleware, queues);
var config = new Config(dependencyResolver, messageMiddleware, publishMiddleware, queues);
(dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<IConfig>(config);
return config;
@ -82,6 +83,13 @@ namespace Tapeti
}
public TapetiConfig Use(IPublishMiddleware handler)
{
publishMiddleware.Add(handler);
return this;
}
public TapetiConfig Use(ITapetiExtension extension)
{
var container = dependencyResolver as IDependencyContainer;
@ -100,6 +108,8 @@ namespace Tapeti
Use((IBindingMiddleware)middleware);
else if (middleware is IMessageMiddleware)
Use((IMessageMiddleware)middleware);
else if (middleware is IPublishMiddleware)
Use((IPublishMiddleware)middleware);
else
throw new ArgumentException($"Unsupported middleware implementation: {middleware.GetType().Name}");
}
@ -319,15 +329,17 @@ namespace Tapeti
{
public IDependencyResolver DependencyResolver { get; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
public IReadOnlyList<IPublishMiddleware> PublishMiddleware { get; }
public IEnumerable<IQueue> Queues { get; }
private readonly Dictionary<MethodInfo, IBinding> bindingMethodLookup;
public Config(IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware, IEnumerable<IQueue> queues)
public Config(IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware, IReadOnlyList<IPublishMiddleware> publishMiddleware, IEnumerable<IQueue> queues)
{
DependencyResolver = dependencyResolver;
MessageMiddleware = messageMiddleware;
PublishMiddleware = publishMiddleware;
Queues = queues.ToList();
bindingMethodLookup = Queues.SelectMany(q => q.Bindings).ToDictionary(b => b.Method, b => b);

View File

@ -19,7 +19,7 @@ namespace Tapeti
this.config = config;
(config.DependencyResolver as IDependencyContainer)?.RegisterDefault(GetPublisher);
worker = new Lazy<TapetiWorker>(() => new TapetiWorker(config.DependencyResolver, config.MessageMiddleware)
worker = new Lazy<TapetiWorker>(() => new TapetiWorker(config)
{
ConnectionParams = Params ?? new TapetiConnectionParams()
});

View File

@ -1,4 +1,5 @@
using System;
using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks;
using Tapeti;
using Tapeti.Annotations;
@ -114,12 +115,14 @@ namespace Test
[Request(Response = typeof(PoloConfirmationResponseMessage))]
public class PoloConfirmationRequestMessage
{
[Required]
public Guid StoredInState { get; set; }
}
public class PoloConfirmationResponseMessage
{
[Required]
public Guid ShouldMatchState { get; set; }
}
}

View File

@ -1,8 +1,10 @@
using System;
using SimpleInjector;
using Tapeti;
using Tapeti.DataAnnotations;
using Tapeti.Flow;
using Tapeti.Flow.SQL;
using Tapeti.Helpers;
using Tapeti.SimpleInjector;
namespace Test
@ -24,6 +26,7 @@ namespace Test
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
.WithFlow()
//.WithFlowSqlRepository("data source=localhost;initial catalog=lef;integrated security=True;multipleactiveresultsets=True", 1)
.WithDataAnnotations()
.RegisterAllControllers()
.Build();

View File

@ -39,6 +39,7 @@
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.ComponentModel.DataAnnotations" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
@ -63,6 +64,10 @@
<Project>{c4897d64-d04e-4ae9-bd98-d64295d1d13a}</Project>
<Name>Tapeti.Annotations</Name>
</ProjectReference>
<ProjectReference Include="..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj">
<Project>{db1c6e81-4545-4670-a167-46e3265a9743}</Project>
<Name>Tapeti.DataAnnotations</Name>
</ProjectReference>
<ProjectReference Include="..\Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj">
<Project>{6de7b122-eb6a-46b8-aeaf-f84dde18f9c7}</Project>
<Name>Tapeti.Flow.SQL</Name>