MAX-911 RDB Relaties samenvoegen vanuit LEF en update ontvangen in LEF

MAX-1081 POC Dictionary tasks in Web voor request
Transient 0.1
This commit is contained in:
Menno van Lavieren 2019-04-25 15:19:51 +02:00
parent 6bc6cfe216
commit 6cb701378d
6 changed files with 75 additions and 37 deletions

View File

@ -7,17 +7,17 @@ namespace Tapeti.Transient
public class TransientMiddleware : ITapetiExtension, ITapetiExtentionBinding
{
private string dynamicQueuePrefix;
private TimeSpan defaultTimeout;
private readonly TransientRouter router;
public TransientMiddleware(TimeSpan defaultTimeout, string dynamicQueuePrefix)
{
this.dynamicQueuePrefix = dynamicQueuePrefix;
this.defaultTimeout = defaultTimeout;
this.router = new TransientRouter(defaultTimeout);
}
public void RegisterDefaults(IDependencyContainer container)
{
container.RegisterDefaultSingleton(() => new TransientRouter(container.Resolve<IInternalPublisher>(), defaultTimeout));
container.RegisterDefaultSingleton(router);
container.RegisterDefault<ITransientPublisher, TransientPublisher>();
}
@ -28,7 +28,7 @@ namespace Tapeti.Transient
public IEnumerable<ICustomBinding> GetBindings(IDependencyResolver dependencyResolver)
{
yield return new TransientGenericBinding(dependencyResolver.Resolve<TransientRouter>(), dynamicQueuePrefix);
yield return new TransientGenericBinding(router, dynamicQueuePrefix);
}
}
}

View File

@ -8,15 +8,17 @@ namespace Tapeti.Transient
public class TransientPublisher : ITransientPublisher
{
private readonly TransientRouter router;
private readonly IPublisher publisher;
public TransientPublisher(TransientRouter router)
public TransientPublisher(TransientRouter router, IPublisher publisher)
{
this.router = router;
this.publisher = publisher;
}
public async Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request)
{
return (TResponse)(await router.RequestResponse(request));
return (TResponse)(await router.RequestResponse(publisher, request));
}
}
}

View File

@ -9,18 +9,16 @@ namespace Tapeti.Transient
{
public class TransientRouter
{
private readonly TimeSpan defaultTimeout;
private readonly int defaultTimeoutMs;
private readonly ConcurrentDictionary<Guid, TaskCompletionSource<object>> map = new ConcurrentDictionary<Guid, TaskCompletionSource<object>>();
private readonly IInternalPublisher internalPublisher;
public string TransientResponseQueueName { get; set; }
public TransientRouter(IInternalPublisher internalPublisher, TimeSpan defaultTimeout)
public TransientRouter(TimeSpan defaultTimeout)
{
this.internalPublisher = internalPublisher;
this.defaultTimeout = defaultTimeout;
defaultTimeoutMs = (int)defaultTimeout.TotalMilliseconds;
}
public void GenericHandleResponse(object response, IMessageContext context)
@ -35,7 +33,7 @@ namespace Tapeti.Transient
tcs.SetResult(response);
}
public async Task<object> RequestResponse(object request)
public async Task<object> RequestResponse(IPublisher publisher, object request)
{
var correlation = Guid.NewGuid();
var tcs = map.GetOrAdd(correlation, c => new TaskCompletionSource<object>());
@ -49,7 +47,7 @@ namespace Tapeti.Transient
Persistent = false
};
await internalPublisher.Publish(request, properties, false);
await ((IInternalPublisher)publisher).Publish(request, properties, false);
}
catch (Exception)
{
@ -60,7 +58,7 @@ namespace Tapeti.Transient
throw;
}
using (new Timer(TimeoutResponse, tcs, defaultTimeout, TimeSpan.MaxValue))
using (new Timer(TimeoutResponse, tcs, defaultTimeoutMs, -1))
{
return await tcs.Task;
}
@ -68,7 +66,7 @@ namespace Tapeti.Transient
private void TimeoutResponse(object tcs)
{
((TaskCompletionSource<object>)tcs).SetException(new TimeoutException("Transient RequestResponse timed out at " + defaultTimeout));
((TaskCompletionSource<object>)tcs).SetException(new TimeoutException("Transient RequestResponse timed out at (ms) " + defaultTimeoutMs));
}
}
}

View File

@ -24,7 +24,9 @@ namespace Tapeti
{
private readonly Dictionary<string, List<IBinding>> staticRegistrations = new Dictionary<string, List<IBinding>>();
private readonly Dictionary<string, Dictionary<Type, List<IBinding>>> dynamicRegistrations = new Dictionary<string, Dictionary<Type, List<IBinding>>>();
private readonly List<IBindingQueueInfo> uniqueRegistrations = new List<IBindingQueueInfo>();
private readonly List<ICustomBinding> customBindings = new List<ICustomBinding>();
private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>();
private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>();
private readonly List<ICleanupMiddleware> cleanupMiddleware = new List<ICleanupMiddleware>();
@ -47,6 +49,8 @@ namespace Tapeti
public IConfig Build()
{
RegisterCustomBindings();
RegisterDefaults();
var queues = new List<IQueue>();
@ -93,6 +97,9 @@ namespace Tapeti
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl)));
}
queues.AddRange(uniqueRegistrations.Select(b => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(b.QueueInfo.Name) }, new []{b})));
var config = new Config(queues)
{
DependencyResolver = dependencyResolver,
@ -144,7 +151,8 @@ namespace Tapeti
var middlewareBundle = extension.GetMiddleware(dependencyResolver);
(extension as ITapetiExtentionBinding)?.GetBindings(dependencyResolver);
if (extension is ITapetiExtentionBinding extentionBindings)
customBindings.AddRange(extentionBindings.GetBindings(dependencyResolver));
// ReSharper disable once InvertIf
if (middlewareBundle != null)
@ -244,9 +252,9 @@ namespace Tapeti
};
if (methodQueueInfo.Dynamic.GetValueOrDefault())
AddDynamicRegistration(context, handlerInfo);
AddDynamicRegistration(handlerInfo);
else
AddStaticRegistration(context, handlerInfo);
AddStaticRegistration(handlerInfo);
}
return this;
@ -267,6 +275,27 @@ namespace Tapeti
return RegisterAllControllers(Assembly.GetEntryAssembly());
}
private void RegisterCustomBindings()
{
foreach (var customBinding in customBindings)
{
// TODO Do we need to configure additional middleware, or does this only get confused if there is no MessageClass
var binding = new CustomBinding(customBinding);
if (binding.QueueInfo.Dynamic == false)
{
AddStaticRegistration(binding);
}
else if (binding.MessageClass != null)
{
AddDynamicRegistration(binding);
}
else
{
AddUniqueRegistration(binding);
}
}
}
protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method)
{
@ -362,7 +391,7 @@ namespace Tapeti
}
protected void AddStaticRegistration(IBindingContext context, IBindingQueueInfo binding)
protected void AddStaticRegistration(IBindingQueueInfo binding)
{
if (staticRegistrations.ContainsKey(binding.QueueInfo.Name))
{
@ -379,7 +408,7 @@ namespace Tapeti
}
protected void AddDynamicRegistration(IBindingContext context, IBindingQueueInfo binding)
protected void AddDynamicRegistration(IBindingQueueInfo binding)
{
var prefix = binding.QueueInfo.Name ?? "";
@ -389,15 +418,19 @@ namespace Tapeti
dynamicRegistrations.Add(prefix, prefixRegistrations);
}
if (!prefixRegistrations.TryGetValue(context.MessageClass, out List<IBinding> bindings))
if (!prefixRegistrations.TryGetValue(binding.MessageClass, out List<IBinding> bindings))
{
bindings = new List<IBinding>();
prefixRegistrations.Add(context.MessageClass, bindings);
prefixRegistrations.Add(binding.MessageClass, bindings);
}
bindings.Add(binding);
}
protected void AddUniqueRegistration(IBindingQueueInfo binding)
{
uniqueRegistrations.Add(binding);
}
protected QueueInfo GetQueueInfo(MemberInfo member)
{

View File

@ -7,21 +7,16 @@ using Tapeti.Flow;
using Tapeti.SimpleInjector;
using System.Threading;
using Tapeti.Annotations;
using Tapeti.Transient;
namespace Test
{
public interface IDummy
{
[DynamicQueue("test1")]
void HandleMessage(PoloConfirmationResponseMessage msg);
}
internal class Program
{
private static void Main()
{
// TODO logging
try
//try
{
var container = new Container();
container.Register<MarcoEmitter>();
@ -32,8 +27,8 @@ namespace Test
//.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true")
.WithFlow()
.WithDataAnnotations()
.WithTransient(TimeSpan.FromSeconds(30))
.RegisterAllControllers()
.RegisterController(typeof(IDummy))
//.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails
.Build();
@ -59,23 +54,32 @@ namespace Test
Console.WriteLine("Done!");
connection.GetPublisher().Publish(new FlowEndController.PingMessage());
var response = container.GetInstance<ITransientPublisher>()
.RequestResponse<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(
new PoloConfirmationRequestMessage
{
StoredInState = new Guid("309088d8-9906-4ef3-bc64-56976538d3ab")
}).Result;
Console.WriteLine(response.ShouldMatchState);
//connection.GetPublisher().Publish(new FlowEndController.PingMessage());
//container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true).Wait();
container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.TestParallelRequest).Wait();
//container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.TestParallelRequest).Wait();
Thread.Sleep(1000);
var emitter = container.GetInstance<MarcoEmitter>();
emitter.Run().Wait();
//var emitter = container.GetInstance<MarcoEmitter>();
//emitter.Run().Wait();
}
}
catch (Exception e)
//catch (Exception e)
{
Console.WriteLine(e.ToString());
Console.ReadKey();
// Console.WriteLine(e.ToString());
// Console.ReadKey();
}
}
}

View File

@ -11,6 +11,7 @@
<ProjectReference Include="..\Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj" />
<ProjectReference Include="..\Tapeti.Flow\Tapeti.Flow.csproj" />
<ProjectReference Include="..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\Tapeti.Transient\Tapeti.Transient.csproj" />
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>