1
0
mirror of synced 2024-11-22 01:13:49 +00:00

Merge branch 'release/1.3'

This commit is contained in:
Mark van Renswoude 2019-06-05 11:15:31 +02:00
commit db5d762097
34 changed files with 495 additions and 52 deletions

View File

@ -14,9 +14,12 @@ namespace Tapeti.Annotations
/// for deploy-time management of durable queues (shameless plug intended). /// for deploy-time management of durable queues (shameless plug intended).
/// </remarks> /// </remarks>
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
[MeansImplicitUse] [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)]
public class DurableQueueAttribute : Attribute public class DurableQueueAttribute : Attribute
{ {
/// <summary>
/// Specifies the name of the durable queue (must already be declared).
/// </summary>
public string Name { get; set; } public string Name { get; set; }

View File

@ -12,6 +12,11 @@ namespace Tapeti.Annotations
[MeansImplicitUse] [MeansImplicitUse]
public class DynamicQueueAttribute : Attribute public class DynamicQueueAttribute : Attribute
{ {
/// <summary>
/// An optional prefix. If specified, Tapeti will compose the queue name using the
/// prefix and a unique ID. If not specified, an empty queue name will be passed
/// to RabbitMQ thus letting it create a unique queue name.
/// </summary>
public string Prefix { get; set; } public string Prefix { get; set; }

View File

@ -9,7 +9,7 @@ namespace Tapeti.Annotations
/// when using the RegisterAllControllers method. It is not required when manually registering a controller. /// when using the RegisterAllControllers method. It is not required when manually registering a controller.
/// </summary> /// </summary>
[AttributeUsage(AttributeTargets.Class)] [AttributeUsage(AttributeTargets.Class)]
[MeansImplicitUse] [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)]
public class MessageControllerAttribute : Attribute public class MessageControllerAttribute : Attribute
{ {
} }

View File

@ -1,17 +0,0 @@
using System;
using JetBrains.Annotations;
namespace Tapeti.Annotations
{
/// <inheritdoc />
/// <summary>
/// This attribute does nothing in runtime and is not required. It is only used as
/// a hint to ReSharper, and maybe developers as well, to indicate the method is
/// indeed used.
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
[MeansImplicitUse]
public class MessageHandlerAttribute : Attribute
{
}
}

View File

@ -13,6 +13,9 @@ namespace Tapeti.Annotations
[AttributeUsage(AttributeTargets.Class)] [AttributeUsage(AttributeTargets.Class)]
public class RequestAttribute : Attribute public class RequestAttribute : Attribute
{ {
/// <summary>
/// The type of the message class which must be returned as the response.
/// </summary>
public Type Response { get; set; } public Type Response { get; set; }
} }
} }

View File

@ -2,6 +2,7 @@
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup> </PropertyGroup>
</Project> </Project>

View File

@ -2,6 +2,7 @@
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>

View File

@ -2,6 +2,7 @@
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>

View File

@ -105,9 +105,9 @@ namespace Tapeti.Flow.Default
// TODO disallow if replyto is not specified? // TODO disallow if replyto is not specified?
if (reply.ReplyTo != null) if (reply.ReplyTo != null)
await publisher.PublishDirect(message, reply.ReplyTo, properties, true); await publisher.PublishDirect(message, reply.ReplyTo, properties, reply.Mandatory);
else else
await publisher.Publish(message, properties, true); await publisher.Publish(message, properties, reply.Mandatory);
await context.Delete(); await context.Delete();
} }
@ -129,8 +129,8 @@ namespace Tapeti.Flow.Default
throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler)); throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler));
var requestAttribute = request.GetType().GetCustomAttribute<RequestAttribute>(); var requestAttribute = request.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response != null && requestAttribute.Response != binding.MessageClass) if (requestAttribute?.Response != null && !binding.Accept(requestAttribute.Response))
throw new ArgumentException($"responseHandler must accept message of type {binding.MessageClass}", nameof(responseHandler)); throw new ArgumentException($"responseHandler must accept message of type {requestAttribute.Response}", nameof(responseHandler));
var continuationAttribute = binding.Method.GetCustomAttribute<ContinuationAttribute>(); var continuationAttribute = binding.Method.GetCustomAttribute<ContinuationAttribute>();
if (continuationAttribute == null) if (continuationAttribute == null)
@ -157,7 +157,8 @@ namespace Tapeti.Flow.Default
{ {
CorrelationId = context.Properties.CorrelationId, CorrelationId = context.Properties.CorrelationId,
ReplyTo = context.Properties.ReplyTo, ReplyTo = context.Properties.ReplyTo,
ResponseTypeName = requestAttribute.Response.FullName ResponseTypeName = requestAttribute.Response.FullName,
Mandatory = context.Properties.Persistent
}; };
} }

View File

@ -57,6 +57,8 @@ namespace Tapeti.Flow.Default
public string CorrelationId { get; set; } public string CorrelationId { get; set; }
public string ResponseTypeName { get; set; } public string ResponseTypeName { get; set; }
public bool Mandatory { get; set; }
public ReplyMetadata Clone() public ReplyMetadata Clone()
{ {
@ -64,7 +66,8 @@ namespace Tapeti.Flow.Default
{ {
ReplyTo = ReplyTo, ReplyTo = ReplyTo,
CorrelationId = CorrelationId, CorrelationId = CorrelationId,
ResponseTypeName = ResponseTypeName ResponseTypeName = ResponseTypeName,
Mandatory = Mandatory
}; };
} }
} }

View File

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Flow.FlowHelpers; using Tapeti.Flow.FlowHelpers;
@ -16,6 +17,7 @@ namespace Tapeti.Flow.Default
private readonly IFlowRepository repository; private readonly IFlowRepository repository;
private volatile bool inUse; private volatile bool inUse;
private volatile bool loaded;
public FlowStore(IFlowRepository repository) public FlowStore(IFlowRepository repository)
{ {
@ -40,17 +42,25 @@ namespace Tapeti.Flow.Default
foreach (var continuation in flowStateRecord.Value.Continuations) foreach (var continuation in flowStateRecord.Value.Continuations)
continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key); continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key);
} }
loaded = true;
} }
public Task<Guid?> FindFlowID(Guid continuationID) public Task<Guid?> FindFlowID(Guid continuationID)
{ {
if (!loaded)
throw new InvalidOperationException("Flow store is not yet loaded.");
return Task.FromResult(continuationLookup.TryGetValue(continuationID, out var result) ? result : (Guid?)null); return Task.FromResult(continuationLookup.TryGetValue(continuationID, out var result) ? result : (Guid?)null);
} }
public async Task<IFlowStateLock> LockFlowState(Guid flowID) public async Task<IFlowStateLock> LockFlowState(Guid flowID)
{ {
if (!loaded)
throw new InvalidOperationException("Flow store should be loaded before storing flows.");
inUse = true; inUse = true;
var flowStatelock = new FlowStateLock(this, flowID, await locks.GetLock(flowID)); var flowStatelock = new FlowStateLock(this, flowID, await locks.GetLock(flowID));

View File

@ -2,6 +2,7 @@
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>

View File

@ -2,6 +2,7 @@
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>

View File

@ -2,6 +2,7 @@
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>

View File

@ -0,0 +1,14 @@
using System;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Tests
{
public class TransientFilterMiddleware : IMessageFilterMiddleware
{
public Task Handle(IMessageContext context, Func<Task> next)
{
throw new NotImplementedException();
}
}
}

View File

@ -0,0 +1,13 @@
using System;
namespace Tapeti.Transient
{
public static class ConfigExtensions
{
public static TapetiConfig WithTransient(this TapetiConfig config, TimeSpan defaultTimeout, string dynamicQueuePrefix = "transient")
{
config.Use(new TransientMiddleware(defaultTimeout, dynamicQueuePrefix));
return config;
}
}
}

View File

@ -0,0 +1,9 @@
using System.Threading.Tasks;
namespace Tapeti.Transient
{
public interface ITransientPublisher
{
Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request);
}
}

View File

@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<package >
<metadata>
<id>Tapeti.Transient</id>
<version>$version$</version>
<title>Tapeti Transient</title>
<authors>Menno van Lavieren, Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners>
<licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Transient extension for Tapeti</description>
<copyright></copyright>
<tags>rabbitmq tapeti transient</tags>
<dependencies>
<dependency id="Tapeti" version="[$version$]" />
</dependencies>
</metadata>
<files>
<file src="bin\Release\**" target="lib" />
</files>
</package>

View File

@ -0,0 +1,52 @@
using System;
using System.Reflection;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Transient
{
public class TransientGenericBinding : ICustomBinding
{
private readonly TransientRouter router;
public TransientGenericBinding(TransientRouter router, string dynamicQueuePrefix)
{
this.router = router;
DynamicQueuePrefix = dynamicQueuePrefix;
Method = typeof(TransientRouter).GetMethod("GenericHandleResponse");
}
public Type Controller => typeof(TransientRouter);
public MethodInfo Method { get; }
public QueueBindingMode QueueBindingMode => QueueBindingMode.DirectToQueue;
public string StaticQueueName => null;
public string DynamicQueuePrefix { get; }
public Type MessageClass => null;
public bool Accept(Type messageClass)
{
return true;
}
public bool Accept(IMessageContext context, object message)
{
return true;
}
public Task Invoke(IMessageContext context, object message)
{
router.GenericHandleResponse(message, context);
return Task.CompletedTask;
}
public void SetQueueName(string queueName)
{
router.TransientResponseQueueName = queueName;
}
}
}

View File

@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using Tapeti.Config;
namespace Tapeti.Transient
{
public class TransientMiddleware : ITapetiExtension, ITapetiExtentionBinding
{
private string dynamicQueuePrefix;
private readonly TransientRouter router;
public TransientMiddleware(TimeSpan defaultTimeout, string dynamicQueuePrefix)
{
this.dynamicQueuePrefix = dynamicQueuePrefix;
this.router = new TransientRouter(defaultTimeout);
}
public void RegisterDefaults(IDependencyContainer container)
{
container.RegisterDefaultSingleton(router);
container.RegisterDefault<ITransientPublisher, TransientPublisher>();
}
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
{
return new object[0];
}
public IEnumerable<ICustomBinding> GetBindings(IDependencyResolver dependencyResolver)
{
yield return new TransientGenericBinding(router, dynamicQueuePrefix);
}
}
}

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Transient
{
public class TransientPublisher : ITransientPublisher
{
private readonly TransientRouter router;
private readonly IPublisher publisher;
public TransientPublisher(TransientRouter router, IPublisher publisher)
{
this.router = router;
this.publisher = publisher;
}
public async Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request)
{
return (TResponse)(await router.RequestResponse(publisher, request));
}
}
}

View File

@ -0,0 +1,72 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;
using RabbitMQ.Client.Framing;
using Tapeti.Config;
namespace Tapeti.Transient
{
public class TransientRouter
{
private readonly int defaultTimeoutMs;
private readonly ConcurrentDictionary<Guid, TaskCompletionSource<object>> map = new ConcurrentDictionary<Guid, TaskCompletionSource<object>>();
public string TransientResponseQueueName { get; set; }
public TransientRouter(TimeSpan defaultTimeout)
{
defaultTimeoutMs = (int)defaultTimeout.TotalMilliseconds;
}
public void GenericHandleResponse(object response, IMessageContext context)
{
if (context.Properties.CorrelationId == null)
return;
if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID))
return;
if (map.TryRemove(continuationID, out var tcs))
tcs.SetResult(response);
}
public async Task<object> RequestResponse(IPublisher publisher, object request)
{
var correlation = Guid.NewGuid();
var tcs = map.GetOrAdd(correlation, c => new TaskCompletionSource<object>());
try
{
var properties = new BasicProperties
{
CorrelationId = correlation.ToString(),
ReplyTo = TransientResponseQueueName,
Persistent = false
};
await ((IInternalPublisher)publisher).Publish(request, properties, false);
}
catch (Exception)
{
// Simple cleanup of the task and map dictionary.
if (map.TryRemove(correlation, out tcs))
tcs.TrySetResult(null);
throw;
}
using (new Timer(TimeoutResponse, tcs, defaultTimeoutMs, -1))
{
return await tcs.Task;
}
}
private void TimeoutResponse(object tcs)
{
((TaskCompletionSource<object>)tcs).SetException(new TimeoutException("Transient RequestResponse timed out at (ms) " + defaultTimeoutMs));
}
}
}

View File

@ -19,7 +19,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Test", "Test\Test.csproj",
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Tests", "Tapeti.Tests\Tapeti.Tests.csproj", "{334F3715-63CF-4D13-B09A-38E2A616D4F5}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Tests", "Tapeti.Tests\Tapeti.Tests.csproj", "{334F3715-63CF-4D13-B09A-38E2A616D4F5}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Serilog", "Tapeti.Serilog\Tapeti.Serilog.csproj", "{43AA5DF3-49D5-4795-A290-D6511502B564}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Serilog", "Tapeti.Serilog\Tapeti.Serilog.csproj", "{43AA5DF3-49D5-4795-A290-D6511502B564}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Transient", "Tapeti.Transient\Tapeti.Transient.csproj", "{A6355E63-19AB-47EA-91FA-49B5E9B41F88}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -63,6 +65,10 @@ Global
{43AA5DF3-49D5-4795-A290-D6511502B564}.Debug|Any CPU.Build.0 = Debug|Any CPU {43AA5DF3-49D5-4795-A290-D6511502B564}.Debug|Any CPU.Build.0 = Debug|Any CPU
{43AA5DF3-49D5-4795-A290-D6511502B564}.Release|Any CPU.ActiveCfg = Release|Any CPU {43AA5DF3-49D5-4795-A290-D6511502B564}.Release|Any CPU.ActiveCfg = Release|Any CPU
{43AA5DF3-49D5-4795-A290-D6511502B564}.Release|Any CPU.Build.0 = Release|Any CPU {43AA5DF3-49D5-4795-A290-D6511502B564}.Release|Any CPU.Build.0 = Release|Any CPU
{A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A6355E63-19AB-47EA-91FA-49B5E9B41F88}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE

View File

@ -46,6 +46,7 @@ namespace Tapeti.Config
IReadOnlyList<IMessageFilterMiddleware> MessageFilterMiddleware { get; } IReadOnlyList<IMessageFilterMiddleware> MessageFilterMiddleware { get; }
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; } IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
bool Accept(Type messageClass);
bool Accept(IMessageContext context, object message); bool Accept(IMessageContext context, object message);
Task Invoke(IMessageContext context, object message); Task Invoke(IMessageContext context, object message);
} }

View File

@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Config
{
public interface ICustomBinding
{
Type Controller { get; }
MethodInfo Method { get; }
QueueBindingMode QueueBindingMode { get; }
string StaticQueueName { get; }
string DynamicQueuePrefix { get; }
Type MessageClass { get; } // Needed to get routing key information when QueueBindingMode = RoutingKey
bool Accept(Type messageClass);
bool Accept(IMessageContext context, object message);
Task Invoke(IMessageContext context, object message);
void SetQueueName(string queueName);
}
}

View File

@ -0,0 +1,10 @@
using System.Collections.Generic;
namespace Tapeti.Config
{
public interface ITapetiExtentionBinding
{
IEnumerable<ICustomBinding> GetBindings(IDependencyResolver dependencyResolver);
}
}

View File

@ -40,7 +40,7 @@ namespace Tapeti.Connection
private DateTime connectedDateTime; private DateTime connectedDateTime;
// These fields must be locked, since the callbacks for BasicAck/BasicReturn can run in a different thread // These fields must be locked, since the callbacks for BasicAck/BasicReturn can run in a different thread
private readonly object confirmLock = new Object(); private readonly object confirmLock = new object();
private readonly Dictionary<ulong, ConfirmMessageInfo> confirmMessages = new Dictionary<ulong, ConfirmMessageInfo>(); private readonly Dictionary<ulong, ConfirmMessageInfo> confirmMessages = new Dictionary<ulong, ConfirmMessageInfo>();
private readonly Dictionary<string, ReturnInfo> returnRoutingKeys = new Dictionary<string, ReturnInfo>(); private readonly Dictionary<string, ReturnInfo> returnRoutingKeys = new Dictionary<string, ReturnInfo>();
@ -113,6 +113,9 @@ namespace Tapeti.Connection
{ {
if (binding.QueueBindingMode == QueueBindingMode.RoutingKey) if (binding.QueueBindingMode == QueueBindingMode.RoutingKey)
{ {
if (binding.MessageClass == null)
throw new NullReferenceException("Binding with QueueBindingMode = RoutingKey must have a MessageClass");
var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass);
var exchange = exchangeStrategy.GetExchange(binding.MessageClass); var exchange = exchangeStrategy.GetExchange(binding.MessageClass);

View File

@ -69,7 +69,7 @@ namespace Tapeti.Default
properties.CorrelationId = messageContext.Properties.CorrelationId; properties.CorrelationId = messageContext.Properties.CorrelationId;
if (messageContext.Properties.IsReplyToPresent()) if (messageContext.Properties.IsReplyToPresent())
return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, true); return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, messageContext.Properties.Persistent);
return publisher.Publish(message, properties, false); return publisher.Publish(message, properties, false);
} }

View File

@ -2,6 +2,7 @@
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>

View File

@ -22,9 +22,11 @@ namespace Tapeti
public class TapetiConfig public class TapetiConfig
{ {
private readonly Dictionary<string, List<Binding>> staticRegistrations = new Dictionary<string, List<Binding>>(); private readonly Dictionary<string, List<IBinding>> staticRegistrations = new Dictionary<string, List<IBinding>>();
private readonly Dictionary<string, Dictionary<Type, List<Binding>>> dynamicRegistrations = new Dictionary<string, Dictionary<Type, List<Binding>>>(); private readonly Dictionary<string, Dictionary<Type, List<IBinding>>> dynamicRegistrations = new Dictionary<string, Dictionary<Type, List<IBinding>>>();
private readonly List<IBindingQueueInfo> uniqueRegistrations = new List<IBindingQueueInfo>();
private readonly List<ICustomBinding> customBindings = new List<ICustomBinding>();
private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>(); private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>();
private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>(); private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>();
private readonly List<ICleanupMiddleware> cleanupMiddleware = new List<ICleanupMiddleware>(); private readonly List<ICleanupMiddleware> cleanupMiddleware = new List<ICleanupMiddleware>();
@ -47,6 +49,8 @@ namespace Tapeti
public IConfig Build() public IConfig Build()
{ {
RegisterCustomBindings();
RegisterDefaults(); RegisterDefaults();
var queues = new List<IQueue>(); var queues = new List<IQueue>();
@ -79,12 +83,12 @@ namespace Tapeti
// //
foreach (var prefixGroup in dynamicRegistrations) foreach (var prefixGroup in dynamicRegistrations)
{ {
var dynamicBindings = new List<List<Binding>>(); var dynamicBindings = new List<List<IBinding>>();
foreach (var bindings in prefixGroup.Value.Values) foreach (var bindings in prefixGroup.Value.Values)
{ {
while (dynamicBindings.Count < bindings.Count) while (dynamicBindings.Count < bindings.Count)
dynamicBindings.Add(new List<Binding>()); dynamicBindings.Add(new List<IBinding>());
for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++) for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++)
dynamicBindings[bindingIndex].Add(bindings[bindingIndex]); dynamicBindings[bindingIndex].Add(bindings[bindingIndex]);
@ -93,6 +97,9 @@ namespace Tapeti
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl))); queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl)));
} }
queues.AddRange(uniqueRegistrations.Select(b => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(b.QueueInfo.Name) }, new []{b})));
var config = new Config(queues) var config = new Config(queues)
{ {
DependencyResolver = dependencyResolver, DependencyResolver = dependencyResolver,
@ -144,6 +151,9 @@ namespace Tapeti
var middlewareBundle = extension.GetMiddleware(dependencyResolver); var middlewareBundle = extension.GetMiddleware(dependencyResolver);
if (extension is ITapetiExtentionBinding extentionBindings)
customBindings.AddRange(extentionBindings.GetBindings(dependencyResolver));
// ReSharper disable once InvertIf // ReSharper disable once InvertIf
if (middlewareBundle != null) if (middlewareBundle != null)
{ {
@ -212,6 +222,7 @@ namespace Tapeti
{ {
var controllerQueueInfo = GetQueueInfo(controller); var controllerQueueInfo = GetQueueInfo(controller);
if (!controller.IsInterface)
(dependencyResolver as IDependencyContainer)?.RegisterController(controller); (dependencyResolver as IDependencyContainer)?.RegisterController(controller);
foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance) foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance)
@ -241,9 +252,9 @@ namespace Tapeti
}; };
if (methodQueueInfo.Dynamic.GetValueOrDefault()) if (methodQueueInfo.Dynamic.GetValueOrDefault())
AddDynamicRegistration(context, handlerInfo); AddDynamicRegistration(handlerInfo);
else else
AddStaticRegistration(context, handlerInfo); AddStaticRegistration(handlerInfo);
} }
return this; return this;
@ -264,6 +275,27 @@ namespace Tapeti
return RegisterAllControllers(Assembly.GetEntryAssembly()); return RegisterAllControllers(Assembly.GetEntryAssembly());
} }
private void RegisterCustomBindings()
{
foreach (var customBinding in customBindings)
{
// TODO Do we need to configure additional middleware, or does this only get confused if there is no MessageClass
var binding = new CustomBinding(customBinding);
if (binding.QueueInfo.Dynamic == false)
{
AddStaticRegistration(binding);
}
else if (binding.MessageClass != null)
{
AddDynamicRegistration(binding);
}
else
{
AddUniqueRegistration(binding);
}
}
}
protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method) protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method)
{ {
@ -359,7 +391,7 @@ namespace Tapeti
} }
protected void AddStaticRegistration(IBindingContext context, Binding binding) protected void AddStaticRegistration(IBindingQueueInfo binding)
{ {
if (staticRegistrations.ContainsKey(binding.QueueInfo.Name)) if (staticRegistrations.ContainsKey(binding.QueueInfo.Name))
{ {
@ -372,29 +404,33 @@ namespace Tapeti
existing.Add(binding); existing.Add(binding);
} }
else else
staticRegistrations.Add(binding.QueueInfo.Name, new List<Binding> { binding }); staticRegistrations.Add(binding.QueueInfo.Name, new List<IBinding> { binding });
} }
protected void AddDynamicRegistration(IBindingContext context, Binding binding) protected void AddDynamicRegistration(IBindingQueueInfo binding)
{ {
var prefix = binding.QueueInfo.Name ?? ""; var prefix = binding.QueueInfo.Name ?? "";
if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary<Type, List<Binding>> prefixRegistrations)) if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary<Type, List<IBinding>> prefixRegistrations))
{ {
prefixRegistrations = new Dictionary<Type, List<Binding>>(); prefixRegistrations = new Dictionary<Type, List<IBinding>>();
dynamicRegistrations.Add(prefix, prefixRegistrations); dynamicRegistrations.Add(prefix, prefixRegistrations);
} }
if (!prefixRegistrations.TryGetValue(context.MessageClass, out List<Binding> bindings)) if (!prefixRegistrations.TryGetValue(binding.MessageClass, out List<IBinding> bindings))
{ {
bindings = new List<Binding>(); bindings = new List<IBinding>();
prefixRegistrations.Add(context.MessageClass, bindings); prefixRegistrations.Add(binding.MessageClass, bindings);
} }
bindings.Add(binding); bindings.Add(binding);
} }
protected void AddUniqueRegistration(IBindingQueueInfo binding)
{
uniqueRegistrations.Add(binding);
}
protected QueueInfo GetQueueInfo(MemberInfo member) protected QueueInfo GetQueueInfo(MemberInfo member)
{ {
@ -491,8 +527,12 @@ namespace Tapeti
} }
} }
protected interface IBindingQueueInfo : IBuildBinding
{
QueueInfo QueueInfo { get; }
}
protected class Binding : IBuildBinding protected class Binding : IBindingQueueInfo
{ {
public Type Controller { get; set; } public Type Controller { get; set; }
public MethodInfo Method { get; set; } public MethodInfo Method { get; set; }
@ -523,6 +563,11 @@ namespace Tapeti
} }
public bool Accept(Type messageClass)
{
return MessageClass.IsAssignableFrom(messageClass);
}
public bool Accept(IMessageContext context, object message) public bool Accept(IMessageContext context, object message)
{ {
return message.GetType() == MessageClass; return message.GetType() == MessageClass;
@ -536,6 +581,69 @@ namespace Tapeti
} }
protected class CustomBinding : IBindingQueueInfo
{
private readonly ICustomBinding inner;
public CustomBinding(ICustomBinding inner)
{
this.inner = inner;
// Copy all variables to make them guaranteed readonly.
Controller = inner.Controller;
Method = inner.Method;
QueueBindingMode = inner.QueueBindingMode;
MessageClass = inner.MessageClass;
QueueInfo = inner.StaticQueueName != null
? new QueueInfo()
{
Dynamic = false,
Name = inner.StaticQueueName
}
: new QueueInfo()
{
Dynamic = true,
Name = inner.DynamicQueuePrefix
};
// Custom bindings cannot have other middleware messing with the binding.
MessageFilterMiddleware = new IMessageFilterMiddleware[0];
MessageMiddleware = new IMessageMiddleware[0];
}
public Type Controller { get; }
public MethodInfo Method { get; }
public string QueueName { get; private set; }
public QueueBindingMode QueueBindingMode { get; set; }
public IReadOnlyList<IMessageFilterMiddleware> MessageFilterMiddleware { get; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
public bool Accept(Type messageClass)
{
return inner.Accept(messageClass);
}
public bool Accept(IMessageContext context, object message)
{
return inner.Accept(context, message);
}
public Task Invoke(IMessageContext context, object message)
{
return inner.Invoke(context, message);
}
public void SetQueueName(string queueName)
{
QueueName = queueName;
inner.SetQueueName(queueName);
}
public Type MessageClass { get; }
public QueueInfo QueueInfo { get; }
}
internal interface IBindingParameterAccess internal interface IBindingParameterAccess
{ {
ValueFactory GetBinding(); ValueFactory GetBinding();

View File

@ -1,10 +1,13 @@
using System; using System;
using System.Runtime.CompilerServices;
using SimpleInjector; using SimpleInjector;
using Tapeti; using Tapeti;
using Tapeti.DataAnnotations; using Tapeti.DataAnnotations;
using Tapeti.Flow; using Tapeti.Flow;
using Tapeti.SimpleInjector; using Tapeti.SimpleInjector;
using System.Threading; using System.Threading;
using Tapeti.Annotations;
using Tapeti.Transient;
namespace Test namespace Test
{ {
@ -13,7 +16,7 @@ namespace Test
private static void Main() private static void Main()
{ {
// TODO logging // TODO logging
try //try
{ {
var container = new Container(); var container = new Container();
container.Register<MarcoEmitter>(); container.Register<MarcoEmitter>();
@ -24,6 +27,7 @@ namespace Test
//.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true") //.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true")
.WithFlow() .WithFlow()
.WithDataAnnotations() .WithDataAnnotations()
.WithTransient(TimeSpan.FromSeconds(30))
.RegisterAllControllers() .RegisterAllControllers()
//.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails //.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails
.Build(); .Build();
@ -50,23 +54,32 @@ namespace Test
Console.WriteLine("Done!"); Console.WriteLine("Done!");
connection.GetPublisher().Publish(new FlowEndController.PingMessage()); var response = container.GetInstance<ITransientPublisher>()
.RequestResponse<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(
new PoloConfirmationRequestMessage
{
StoredInState = new Guid("309088d8-9906-4ef3-bc64-56976538d3ab")
}).Result;
Console.WriteLine(response.ShouldMatchState);
//connection.GetPublisher().Publish(new FlowEndController.PingMessage());
//container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true).Wait(); //container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true).Wait();
container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.TestParallelRequest).Wait(); //container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.TestParallelRequest).Wait();
Thread.Sleep(1000); Thread.Sleep(1000);
var emitter = container.GetInstance<MarcoEmitter>(); //var emitter = container.GetInstance<MarcoEmitter>();
emitter.Run().Wait(); //emitter.Run().Wait();
} }
} }
catch (Exception e) //catch (Exception e)
{ {
Console.WriteLine(e.ToString()); // Console.WriteLine(e.ToString());
Console.ReadKey(); // Console.ReadKey();
} }
} }
} }

View File

@ -11,6 +11,7 @@
<ProjectReference Include="..\Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj" /> <ProjectReference Include="..\Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj" />
<ProjectReference Include="..\Tapeti.Flow\Tapeti.Flow.csproj" /> <ProjectReference Include="..\Tapeti.Flow\Tapeti.Flow.csproj" />
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" /> <ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\Tapeti.Transient\Tapeti.Transient.csproj" />
<ProjectReference Include="..\Tapeti\Tapeti.csproj" /> <ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup> </ItemGroup>

View File

@ -20,6 +20,8 @@ after_build:
- cmd: appveyor PushArtifact "Tapeti.Flow.%GitVersion_NuGetVersion%.nupkg" - cmd: appveyor PushArtifact "Tapeti.Flow.%GitVersion_NuGetVersion%.nupkg"
- cmd: nuget pack Tapeti.Flow.SQL\Tapeti.Flow.SQL.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" - cmd: nuget pack Tapeti.Flow.SQL\Tapeti.Flow.SQL.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%"
- cmd: appveyor PushArtifact "Tapeti.Flow.SQL.%GitVersion_NuGetVersion%.nupkg" - cmd: appveyor PushArtifact "Tapeti.Flow.SQL.%GitVersion_NuGetVersion%.nupkg"
- cmd: nuget pack Tapeti.Transient\Tapeti.Transient.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%"
- cmd: appveyor PushArtifact "Tapeti.Transient.%GitVersion_NuGetVersion%.nupkg"
- cmd: nuget pack Tapeti.SimpleInjector\Tapeti.SimpleInjector.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" - cmd: nuget pack Tapeti.SimpleInjector\Tapeti.SimpleInjector.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%"
- cmd: appveyor PushArtifact "Tapeti.SimpleInjector.%GitVersion_NuGetVersion%.nupkg" - cmd: appveyor PushArtifact "Tapeti.SimpleInjector.%GitVersion_NuGetVersion%.nupkg"
- cmd: nuget pack Tapeti.Serilog\Tapeti.Serilog.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" - cmd: nuget pack Tapeti.Serilog\Tapeti.Serilog.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%"