From c63b821b8730fa2b7da0a88e6b932281cb3c2d19 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 19 Aug 2019 09:33:07 +0200 Subject: [PATCH] Fixed #11: Do not persist flow for dynamic queues --- Tapeti.Flow/Default/FlowBindingMiddleware.cs | 2 +- Tapeti.Flow/Default/FlowContext.cs | 4 +- Tapeti.Flow/Default/FlowProvider.cs | 6 +- Tapeti.Flow/Default/FlowStore.cs | 71 +++++++++++++------- Tapeti.Flow/IFlowStore.cs | 3 +- Tapeti.Transient/TransientGenericBinding.cs | 3 + Tapeti/Config/IBinding.cs | 23 +++++++ Tapeti/Default/ControllerMethodBinding.cs | 13 ++-- Tapeti/TapetiConfigControllers.cs | 4 +- 9 files changed, 91 insertions(+), 38 deletions(-) diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 4dfb3ba..dbade9d 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -86,7 +86,7 @@ namespace Tapeti.Flow.Default var flowHandler = context.Config.DependencyResolver.Resolve(); return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext => { - await flowContext.Store(); + await flowContext.Store(context.Binding.QueueType == QueueType.Durable); })); } diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs index 96235a8..686e2df 100644 --- a/Tapeti.Flow/Default/FlowContext.cs +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -16,7 +16,7 @@ namespace Tapeti.Flow.Default private bool deleteCalled; - public async Task Store() + public async Task Store(bool persistent) { storeCalled = true; @@ -25,7 +25,7 @@ namespace Tapeti.Flow.Default if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock)); FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(HandlerContext.Controller); - await FlowStateLock.StoreFlowState(FlowState); + await FlowStateLock.StoreFlowState(FlowState, persistent); } public async Task Delete() diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 6796cc0..123c80b 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -88,7 +88,7 @@ namespace Tapeti.Flow.Default ReplyTo = responseHandlerInfo.ReplyToQueue }; - await context.Store(); + await context.Store(responseHandlerInfo.IsDurableQueue); await publisher.Publish(message, properties, true); } @@ -153,7 +153,8 @@ namespace Tapeti.Flow.Default return new ResponseHandlerInfo { MethodName = MethodSerializer.Serialize(responseHandler.Method), - ReplyToQueue = binding.QueueName + ReplyToQueue = binding.QueueName, + IsDurableQueue = binding.QueueType == QueueType.Durable }; } @@ -331,6 +332,7 @@ namespace Tapeti.Flow.Default { public string MethodName { get; set; } public string ReplyToQueue { get; set; } + public bool IsDurableQueue { get; set; } } } } diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index bee2419..3b0c9db 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -13,7 +13,19 @@ namespace Tapeti.Flow.Default /// public class FlowStore : IFlowStore { - private readonly ConcurrentDictionary flowStates = new ConcurrentDictionary(); + private class CachedFlowState + { + public readonly FlowState FlowState; + public readonly bool IsPersistent; + + public CachedFlowState(FlowState flowState, bool isPersistent) + { + FlowState = flowState; + IsPersistent = isPersistent; + } + } + + private readonly ConcurrentDictionary flowStates = new ConcurrentDictionary(); private readonly ConcurrentDictionary continuationLookup = new ConcurrentDictionary(); private readonly LockCollection locks = new LockCollection(EqualityComparer.Default); @@ -43,7 +55,7 @@ namespace Tapeti.Flow.Default foreach (var flowStateRecord in await repository.GetStates()) { - flowStates.TryAdd(flowStateRecord.Key, flowStateRecord.Value); + flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true)); foreach (var continuation in flowStateRecord.Value.Continuations) continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key); @@ -80,7 +92,7 @@ namespace Tapeti.Flow.Default { private readonly FlowStore owner; private volatile IDisposable flowLock; - private FlowState flowState; + private CachedFlowState cachedFlowState; public Guid FlowID { get; } @@ -91,7 +103,7 @@ namespace Tapeti.Flow.Default FlowID = flowID; this.flowLock = flowLock; - owner.flowStates.TryGetValue(flowID, out flowState); + owner.flowStates.TryGetValue(flowID, out cachedFlowState); } public void Dispose() @@ -106,10 +118,10 @@ namespace Tapeti.Flow.Default if (flowLock == null) throw new ObjectDisposedException("FlowStateLock"); - return Task.FromResult(flowState?.Clone()); + return Task.FromResult(cachedFlowState.FlowState?.Clone()); } - public async Task StoreFlowState(FlowState newFlowState) + public async Task StoreFlowState(FlowState newFlowState, bool persistent) { if (flowLock == null) throw new ObjectDisposedException("FlowStateLock"); @@ -118,30 +130,41 @@ namespace Tapeti.Flow.Default newFlowState = newFlowState.Clone(); // Update the lookup dictionary for the ContinuationIDs - if (flowState != null) + if (cachedFlowState != null) { - foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k))) + foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k))) owner.continuationLookup.TryRemove(removedContinuation, out _); } - foreach (var addedContinuation in newFlowState.Continuations.Where(c => flowState == null || !flowState.Continuations.ContainsKey(c.Key))) + foreach (var addedContinuation in newFlowState.Continuations.Where(c => cachedFlowState == null || !cachedFlowState.FlowState.Continuations.ContainsKey(c.Key))) { owner.continuationLookup.TryAdd(addedContinuation.Key, FlowID); } - var isNew = flowState == null; - flowState = newFlowState; - owner.flowStates[FlowID] = newFlowState; + var isNew = cachedFlowState == null; + var wasPersistent = cachedFlowState?.IsPersistent ?? false; - // Storing the flowstate in the underlying repository - if (isNew) + cachedFlowState = new CachedFlowState(newFlowState, persistent); + owner.flowStates[FlowID] = cachedFlowState; + + if (persistent) { - var now = DateTime.UtcNow; - await owner.repository.CreateState(FlowID, flowState, now); + // Storing the flowstate in the underlying repository + if (isNew) + { + var now = DateTime.UtcNow; + await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, now); + } + else + { + await owner.repository.UpdateState(FlowID, cachedFlowState.FlowState); + } } - else + else if (wasPersistent) { - await owner.repository.UpdateState(FlowID, flowState); + // We transitioned from a durable queue to a dynamic queue, + // remove the persistent state but keep the in-memory version + await owner.repository.DeleteState(FlowID); } } @@ -150,18 +173,16 @@ namespace Tapeti.Flow.Default if (flowLock == null) throw new ObjectDisposedException("FlowStateLock"); - if (flowState != null) + if (cachedFlowState != null) { - foreach (var removedContinuation in flowState.Continuations.Keys) + foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys) owner.continuationLookup.TryRemove(removedContinuation, out _); - owner.flowStates.TryRemove(FlowID, out _); + owner.flowStates.TryRemove(FlowID, out var removedFlowState); + cachedFlowState = null; - if (flowState != null) - { - flowState = null; + if (removedFlowState.IsPersistent) await owner.repository.DeleteState(FlowID); - } } } } diff --git a/Tapeti.Flow/IFlowStore.cs b/Tapeti.Flow/IFlowStore.cs index fec1fb0..21c4337 100644 --- a/Tapeti.Flow/IFlowStore.cs +++ b/Tapeti.Flow/IFlowStore.cs @@ -52,7 +52,8 @@ namespace Tapeti.Flow /// Stores the new flow state. /// /// - Task StoreFlowState(FlowState flowState); + /// + Task StoreFlowState(FlowState flowState, bool persistent); /// /// Disposes of the flow state corresponding to this Flow ID. diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs index c743234..94c6e6b 100644 --- a/Tapeti.Transient/TransientGenericBinding.cs +++ b/Tapeti.Transient/TransientGenericBinding.cs @@ -17,6 +17,9 @@ namespace Tapeti.Transient /// public string QueueName { get; private set; } + /// + public QueueType QueueType => QueueType.Dynamic; + /// public TransientGenericBinding(TransientRouter router, string dynamicQueuePrefix) diff --git a/Tapeti/Config/IBinding.cs b/Tapeti/Config/IBinding.cs index eb204be..98b5cb6 100644 --- a/Tapeti/Config/IBinding.cs +++ b/Tapeti/Config/IBinding.cs @@ -3,6 +3,23 @@ using System.Threading.Tasks; namespace Tapeti.Config { + /// + /// Determines the type of queue the binding registers + /// + public enum QueueType + { + /// + /// The consumed queue is durable + /// + Durable, + + /// + /// The consumed queue is dynamic + /// + Dynamic + } + + /// /// Represents a registered binding to handle incoming messages. /// @@ -14,6 +31,12 @@ namespace Tapeti.Config string QueueName { get; } + /// + /// Determines the type of queue the binding registers + /// + QueueType QueueType { get; } + + /// /// Called after a connection is established to set up the binding. /// diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index 3722877..4136894 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -83,6 +83,9 @@ namespace Tapeti.Default /// public string QueueName { get; private set; } + /// + public QueueType QueueType => bindingInfo.QueueInfo.QueueType; + /// public Type Controller => bindingInfo.ControllerType; @@ -106,7 +109,7 @@ namespace Tapeti.Default switch (bindingInfo.BindingTargetMode) { case BindingTargetMode.Default: - if (bindingInfo.QueueInfo.Dynamic) + if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic) QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name); else { @@ -117,7 +120,7 @@ namespace Tapeti.Default break; case BindingTargetMode.Direct: - if (bindingInfo.QueueInfo.Dynamic) + if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic) QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name); else { @@ -259,9 +262,9 @@ namespace Tapeti.Default public class QueueInfo { /// - /// Whether the queue is dynamic or durable. + /// The type of queue this binding consumes. /// - public bool Dynamic { get; set; } + public QueueType QueueType { get; set; } /// /// The name of the durable queue, or optional prefix of the dynamic queue. @@ -272,7 +275,7 @@ namespace Tapeti.Default /// /// Determines if the QueueInfo properties contain a valid combination. /// - public bool IsValid => Dynamic|| !string.IsNullOrEmpty(Name); + public bool IsValid => QueueType == QueueType.Dynamic || !string.IsNullOrEmpty(Name); } } } diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs index d0450dd..916488d 100644 --- a/Tapeti/TapetiConfigControllers.cs +++ b/Tapeti/TapetiConfigControllers.cs @@ -130,10 +130,10 @@ namespace Tapeti throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on controller {member.DeclaringType?.Name} method {member.Name}"); if (dynamicQueueAttribute != null) - return new ControllerMethodBinding.QueueInfo { Dynamic = true, Name = dynamicQueueAttribute.Prefix }; + return new ControllerMethodBinding.QueueInfo { QueueType = QueueType.Dynamic, Name = dynamicQueueAttribute.Prefix }; return durableQueueAttribute != null - ? new ControllerMethodBinding.QueueInfo { Dynamic = false, Name = durableQueueAttribute.Name } + ? new ControllerMethodBinding.QueueInfo { QueueType = QueueType.Durable, Name = durableQueueAttribute.Name } : null; } }