diff --git a/Tapeti.Transient/TransientMiddleware.cs b/Tapeti.Transient/TransientMiddleware.cs index 24ef7f3..5077fa5 100644 --- a/Tapeti.Transient/TransientMiddleware.cs +++ b/Tapeti.Transient/TransientMiddleware.cs @@ -7,17 +7,17 @@ namespace Tapeti.Transient public class TransientMiddleware : ITapetiExtension, ITapetiExtentionBinding { private string dynamicQueuePrefix; - private TimeSpan defaultTimeout; + private readonly TransientRouter router; public TransientMiddleware(TimeSpan defaultTimeout, string dynamicQueuePrefix) { this.dynamicQueuePrefix = dynamicQueuePrefix; - this.defaultTimeout = defaultTimeout; + this.router = new TransientRouter(defaultTimeout); } public void RegisterDefaults(IDependencyContainer container) { - container.RegisterDefaultSingleton(() => new TransientRouter(container.Resolve(), defaultTimeout)); + container.RegisterDefaultSingleton(router); container.RegisterDefault(); } @@ -28,7 +28,7 @@ namespace Tapeti.Transient public IEnumerable GetBindings(IDependencyResolver dependencyResolver) { - yield return new TransientGenericBinding(dependencyResolver.Resolve(), dynamicQueuePrefix); + yield return new TransientGenericBinding(router, dynamicQueuePrefix); } } } \ No newline at end of file diff --git a/Tapeti.Transient/TransientPublisher.cs b/Tapeti.Transient/TransientPublisher.cs index 6bbae21..62715e7 100644 --- a/Tapeti.Transient/TransientPublisher.cs +++ b/Tapeti.Transient/TransientPublisher.cs @@ -8,15 +8,17 @@ namespace Tapeti.Transient public class TransientPublisher : ITransientPublisher { private readonly TransientRouter router; + private readonly IPublisher publisher; - public TransientPublisher(TransientRouter router) + public TransientPublisher(TransientRouter router, IPublisher publisher) { this.router = router; + this.publisher = publisher; } public async Task RequestResponse(TRequest request) { - return (TResponse)(await router.RequestResponse(request)); + return (TResponse)(await router.RequestResponse(publisher, request)); } } } diff --git a/Tapeti.Transient/TransientRouter.cs b/Tapeti.Transient/TransientRouter.cs index f91be75..775576b 100644 --- a/Tapeti.Transient/TransientRouter.cs +++ b/Tapeti.Transient/TransientRouter.cs @@ -9,18 +9,16 @@ namespace Tapeti.Transient { public class TransientRouter { - private readonly TimeSpan defaultTimeout; + private readonly int defaultTimeoutMs; private readonly ConcurrentDictionary> map = new ConcurrentDictionary>(); - private readonly IInternalPublisher internalPublisher; public string TransientResponseQueueName { get; set; } - public TransientRouter(IInternalPublisher internalPublisher, TimeSpan defaultTimeout) + public TransientRouter(TimeSpan defaultTimeout) { - this.internalPublisher = internalPublisher; - this.defaultTimeout = defaultTimeout; + defaultTimeoutMs = (int)defaultTimeout.TotalMilliseconds; } public void GenericHandleResponse(object response, IMessageContext context) @@ -35,7 +33,7 @@ namespace Tapeti.Transient tcs.SetResult(response); } - public async Task RequestResponse(object request) + public async Task RequestResponse(IPublisher publisher, object request) { var correlation = Guid.NewGuid(); var tcs = map.GetOrAdd(correlation, c => new TaskCompletionSource()); @@ -49,7 +47,7 @@ namespace Tapeti.Transient Persistent = false }; - await internalPublisher.Publish(request, properties, false); + await ((IInternalPublisher)publisher).Publish(request, properties, false); } catch (Exception) { @@ -60,7 +58,7 @@ namespace Tapeti.Transient throw; } - using (new Timer(TimeoutResponse, tcs, defaultTimeout, TimeSpan.MaxValue)) + using (new Timer(TimeoutResponse, tcs, defaultTimeoutMs, -1)) { return await tcs.Task; } @@ -68,7 +66,7 @@ namespace Tapeti.Transient private void TimeoutResponse(object tcs) { - ((TaskCompletionSource)tcs).SetException(new TimeoutException("Transient RequestResponse timed out at " + defaultTimeout)); + ((TaskCompletionSource)tcs).SetException(new TimeoutException("Transient RequestResponse timed out at (ms) " + defaultTimeoutMs)); } } } \ No newline at end of file diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index db83fd7..9291798 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -24,7 +24,9 @@ namespace Tapeti { private readonly Dictionary> staticRegistrations = new Dictionary>(); private readonly Dictionary>> dynamicRegistrations = new Dictionary>>(); + private readonly List uniqueRegistrations = new List(); + private readonly List customBindings = new List(); private readonly List bindingMiddleware = new List(); private readonly List messageMiddleware = new List(); private readonly List cleanupMiddleware = new List(); @@ -47,6 +49,8 @@ namespace Tapeti public IConfig Build() { + RegisterCustomBindings(); + RegisterDefaults(); var queues = new List(); @@ -93,6 +97,9 @@ namespace Tapeti queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl))); } + queues.AddRange(uniqueRegistrations.Select(b => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(b.QueueInfo.Name) }, new []{b}))); + + var config = new Config(queues) { DependencyResolver = dependencyResolver, @@ -144,7 +151,8 @@ namespace Tapeti var middlewareBundle = extension.GetMiddleware(dependencyResolver); - (extension as ITapetiExtentionBinding)?.GetBindings(dependencyResolver); + if (extension is ITapetiExtentionBinding extentionBindings) + customBindings.AddRange(extentionBindings.GetBindings(dependencyResolver)); // ReSharper disable once InvertIf if (middlewareBundle != null) @@ -244,9 +252,9 @@ namespace Tapeti }; if (methodQueueInfo.Dynamic.GetValueOrDefault()) - AddDynamicRegistration(context, handlerInfo); + AddDynamicRegistration(handlerInfo); else - AddStaticRegistration(context, handlerInfo); + AddStaticRegistration(handlerInfo); } return this; @@ -267,6 +275,27 @@ namespace Tapeti return RegisterAllControllers(Assembly.GetEntryAssembly()); } + private void RegisterCustomBindings() + { + foreach (var customBinding in customBindings) + { + // TODO Do we need to configure additional middleware, or does this only get confused if there is no MessageClass + + var binding = new CustomBinding(customBinding); + if (binding.QueueInfo.Dynamic == false) + { + AddStaticRegistration(binding); + } + else if (binding.MessageClass != null) + { + AddDynamicRegistration(binding); + } + else + { + AddUniqueRegistration(binding); + } + } + } protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method) { @@ -362,7 +391,7 @@ namespace Tapeti } - protected void AddStaticRegistration(IBindingContext context, IBindingQueueInfo binding) + protected void AddStaticRegistration(IBindingQueueInfo binding) { if (staticRegistrations.ContainsKey(binding.QueueInfo.Name)) { @@ -379,7 +408,7 @@ namespace Tapeti } - protected void AddDynamicRegistration(IBindingContext context, IBindingQueueInfo binding) + protected void AddDynamicRegistration(IBindingQueueInfo binding) { var prefix = binding.QueueInfo.Name ?? ""; @@ -389,15 +418,19 @@ namespace Tapeti dynamicRegistrations.Add(prefix, prefixRegistrations); } - if (!prefixRegistrations.TryGetValue(context.MessageClass, out List bindings)) + if (!prefixRegistrations.TryGetValue(binding.MessageClass, out List bindings)) { bindings = new List(); - prefixRegistrations.Add(context.MessageClass, bindings); + prefixRegistrations.Add(binding.MessageClass, bindings); } bindings.Add(binding); } + protected void AddUniqueRegistration(IBindingQueueInfo binding) + { + uniqueRegistrations.Add(binding); + } protected QueueInfo GetQueueInfo(MemberInfo member) { diff --git a/Test/Program.cs b/Test/Program.cs index 309361b..3969269 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -7,21 +7,16 @@ using Tapeti.Flow; using Tapeti.SimpleInjector; using System.Threading; using Tapeti.Annotations; +using Tapeti.Transient; namespace Test { - public interface IDummy - { - [DynamicQueue("test1")] - void HandleMessage(PoloConfirmationResponseMessage msg); - } - internal class Program { private static void Main() { // TODO logging - try + //try { var container = new Container(); container.Register(); @@ -32,8 +27,8 @@ namespace Test //.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true") .WithFlow() .WithDataAnnotations() + .WithTransient(TimeSpan.FromSeconds(30)) .RegisterAllControllers() - .RegisterController(typeof(IDummy)) //.DisablePublisherConfirms() -> you probably never want to do this if you're using Flow or want requeues when a publish fails .Build(); @@ -59,23 +54,32 @@ namespace Test Console.WriteLine("Done!"); - connection.GetPublisher().Publish(new FlowEndController.PingMessage()); + var response = container.GetInstance() + .RequestResponse( + new PoloConfirmationRequestMessage + { + StoredInState = new Guid("309088d8-9906-4ef3-bc64-56976538d3ab") + }).Result; + + Console.WriteLine(response.ShouldMatchState); + + //connection.GetPublisher().Publish(new FlowEndController.PingMessage()); //container.GetInstance().Start(c => c.StartFlow, true).Wait(); - container.GetInstance().Start(c => c.TestParallelRequest).Wait(); + //container.GetInstance().Start(c => c.TestParallelRequest).Wait(); Thread.Sleep(1000); - var emitter = container.GetInstance(); - emitter.Run().Wait(); + //var emitter = container.GetInstance(); + //emitter.Run().Wait(); } } - catch (Exception e) + //catch (Exception e) { - Console.WriteLine(e.ToString()); - Console.ReadKey(); + // Console.WriteLine(e.ToString()); + // Console.ReadKey(); } } } diff --git a/Test/Test.csproj b/Test/Test.csproj index 7aa20f6..00f9c1e 100644 --- a/Test/Test.csproj +++ b/Test/Test.csproj @@ -11,6 +11,7 @@ +