Fixed #11: Do not persist flow for dynamic queues

This commit is contained in:
Mark van Renswoude 2019-08-19 09:33:07 +02:00
parent 23f86b3597
commit c63b821b87
9 changed files with 91 additions and 38 deletions

View File

@ -86,7 +86,7 @@ namespace Tapeti.Flow.Default
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext =>
{
await flowContext.Store();
await flowContext.Store(context.Binding.QueueType == QueueType.Durable);
}));
}

View File

@ -16,7 +16,7 @@ namespace Tapeti.Flow.Default
private bool deleteCalled;
public async Task Store()
public async Task Store(bool persistent)
{
storeCalled = true;
@ -25,7 +25,7 @@ namespace Tapeti.Flow.Default
if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock));
FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(HandlerContext.Controller);
await FlowStateLock.StoreFlowState(FlowState);
await FlowStateLock.StoreFlowState(FlowState, persistent);
}
public async Task Delete()

View File

@ -88,7 +88,7 @@ namespace Tapeti.Flow.Default
ReplyTo = responseHandlerInfo.ReplyToQueue
};
await context.Store();
await context.Store(responseHandlerInfo.IsDurableQueue);
await publisher.Publish(message, properties, true);
}
@ -153,7 +153,8 @@ namespace Tapeti.Flow.Default
return new ResponseHandlerInfo
{
MethodName = MethodSerializer.Serialize(responseHandler.Method),
ReplyToQueue = binding.QueueName
ReplyToQueue = binding.QueueName,
IsDurableQueue = binding.QueueType == QueueType.Durable
};
}
@ -331,6 +332,7 @@ namespace Tapeti.Flow.Default
{
public string MethodName { get; set; }
public string ReplyToQueue { get; set; }
public bool IsDurableQueue { get; set; }
}
}
}

View File

@ -13,7 +13,19 @@ namespace Tapeti.Flow.Default
/// </summary>
public class FlowStore : IFlowStore
{
private readonly ConcurrentDictionary<Guid, FlowState> flowStates = new ConcurrentDictionary<Guid, FlowState>();
private class CachedFlowState
{
public readonly FlowState FlowState;
public readonly bool IsPersistent;
public CachedFlowState(FlowState flowState, bool isPersistent)
{
FlowState = flowState;
IsPersistent = isPersistent;
}
}
private readonly ConcurrentDictionary<Guid, CachedFlowState> flowStates = new ConcurrentDictionary<Guid, CachedFlowState>();
private readonly ConcurrentDictionary<Guid, Guid> continuationLookup = new ConcurrentDictionary<Guid, Guid>();
private readonly LockCollection<Guid> locks = new LockCollection<Guid>(EqualityComparer<Guid>.Default);
@ -43,7 +55,7 @@ namespace Tapeti.Flow.Default
foreach (var flowStateRecord in await repository.GetStates<FlowState>())
{
flowStates.TryAdd(flowStateRecord.Key, flowStateRecord.Value);
flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true));
foreach (var continuation in flowStateRecord.Value.Continuations)
continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key);
@ -80,7 +92,7 @@ namespace Tapeti.Flow.Default
{
private readonly FlowStore owner;
private volatile IDisposable flowLock;
private FlowState flowState;
private CachedFlowState cachedFlowState;
public Guid FlowID { get; }
@ -91,7 +103,7 @@ namespace Tapeti.Flow.Default
FlowID = flowID;
this.flowLock = flowLock;
owner.flowStates.TryGetValue(flowID, out flowState);
owner.flowStates.TryGetValue(flowID, out cachedFlowState);
}
public void Dispose()
@ -106,10 +118,10 @@ namespace Tapeti.Flow.Default
if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock");
return Task.FromResult(flowState?.Clone());
return Task.FromResult(cachedFlowState.FlowState?.Clone());
}
public async Task StoreFlowState(FlowState newFlowState)
public async Task StoreFlowState(FlowState newFlowState, bool persistent)
{
if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock");
@ -118,30 +130,41 @@ namespace Tapeti.Flow.Default
newFlowState = newFlowState.Clone();
// Update the lookup dictionary for the ContinuationIDs
if (flowState != null)
if (cachedFlowState != null)
{
foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k)))
foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k)))
owner.continuationLookup.TryRemove(removedContinuation, out _);
}
foreach (var addedContinuation in newFlowState.Continuations.Where(c => flowState == null || !flowState.Continuations.ContainsKey(c.Key)))
foreach (var addedContinuation in newFlowState.Continuations.Where(c => cachedFlowState == null || !cachedFlowState.FlowState.Continuations.ContainsKey(c.Key)))
{
owner.continuationLookup.TryAdd(addedContinuation.Key, FlowID);
}
var isNew = flowState == null;
flowState = newFlowState;
owner.flowStates[FlowID] = newFlowState;
var isNew = cachedFlowState == null;
var wasPersistent = cachedFlowState?.IsPersistent ?? false;
// Storing the flowstate in the underlying repository
if (isNew)
cachedFlowState = new CachedFlowState(newFlowState, persistent);
owner.flowStates[FlowID] = cachedFlowState;
if (persistent)
{
var now = DateTime.UtcNow;
await owner.repository.CreateState(FlowID, flowState, now);
// Storing the flowstate in the underlying repository
if (isNew)
{
var now = DateTime.UtcNow;
await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, now);
}
else
{
await owner.repository.UpdateState(FlowID, cachedFlowState.FlowState);
}
}
else
else if (wasPersistent)
{
await owner.repository.UpdateState(FlowID, flowState);
// We transitioned from a durable queue to a dynamic queue,
// remove the persistent state but keep the in-memory version
await owner.repository.DeleteState(FlowID);
}
}
@ -150,18 +173,16 @@ namespace Tapeti.Flow.Default
if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock");
if (flowState != null)
if (cachedFlowState != null)
{
foreach (var removedContinuation in flowState.Continuations.Keys)
foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys)
owner.continuationLookup.TryRemove(removedContinuation, out _);
owner.flowStates.TryRemove(FlowID, out _);
owner.flowStates.TryRemove(FlowID, out var removedFlowState);
cachedFlowState = null;
if (flowState != null)
{
flowState = null;
if (removedFlowState.IsPersistent)
await owner.repository.DeleteState(FlowID);
}
}
}
}

View File

@ -52,7 +52,8 @@ namespace Tapeti.Flow
/// Stores the new flow state.
/// </summary>
/// <param name="flowState"></param>
Task StoreFlowState(FlowState flowState);
/// <param name="persistent"></param>
Task StoreFlowState(FlowState flowState, bool persistent);
/// <summary>
/// Disposes of the flow state corresponding to this Flow ID.

View File

@ -17,6 +17,9 @@ namespace Tapeti.Transient
/// <inheritdoc />
public string QueueName { get; private set; }
/// <inheritdoc />
public QueueType QueueType => QueueType.Dynamic;
/// <inheritdoc />
public TransientGenericBinding(TransientRouter router, string dynamicQueuePrefix)

View File

@ -3,6 +3,23 @@ using System.Threading.Tasks;
namespace Tapeti.Config
{
/// <summary>
/// Determines the type of queue the binding registers
/// </summary>
public enum QueueType
{
/// <summary>
/// The consumed queue is durable
/// </summary>
Durable,
/// <summary>
/// The consumed queue is dynamic
/// </summary>
Dynamic
}
/// <summary>
/// Represents a registered binding to handle incoming messages.
/// </summary>
@ -14,6 +31,12 @@ namespace Tapeti.Config
string QueueName { get; }
/// <summary>
/// Determines the type of queue the binding registers
/// </summary>
QueueType QueueType { get; }
/// <summary>
/// Called after a connection is established to set up the binding.
/// </summary>

View File

@ -83,6 +83,9 @@ namespace Tapeti.Default
/// <inheritdoc />
public string QueueName { get; private set; }
/// <inheritdoc />
public QueueType QueueType => bindingInfo.QueueInfo.QueueType;
/// <inheritdoc />
public Type Controller => bindingInfo.ControllerType;
@ -106,7 +109,7 @@ namespace Tapeti.Default
switch (bindingInfo.BindingTargetMode)
{
case BindingTargetMode.Default:
if (bindingInfo.QueueInfo.Dynamic)
if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic)
QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
else
{
@ -117,7 +120,7 @@ namespace Tapeti.Default
break;
case BindingTargetMode.Direct:
if (bindingInfo.QueueInfo.Dynamic)
if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic)
QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
else
{
@ -259,9 +262,9 @@ namespace Tapeti.Default
public class QueueInfo
{
/// <summary>
/// Whether the queue is dynamic or durable.
/// The type of queue this binding consumes.
/// </summary>
public bool Dynamic { get; set; }
public QueueType QueueType { get; set; }
/// <summary>
/// The name of the durable queue, or optional prefix of the dynamic queue.
@ -272,7 +275,7 @@ namespace Tapeti.Default
/// <summary>
/// Determines if the QueueInfo properties contain a valid combination.
/// </summary>
public bool IsValid => Dynamic|| !string.IsNullOrEmpty(Name);
public bool IsValid => QueueType == QueueType.Dynamic || !string.IsNullOrEmpty(Name);
}
}
}

View File

@ -130,10 +130,10 @@ namespace Tapeti
throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on controller {member.DeclaringType?.Name} method {member.Name}");
if (dynamicQueueAttribute != null)
return new ControllerMethodBinding.QueueInfo { Dynamic = true, Name = dynamicQueueAttribute.Prefix };
return new ControllerMethodBinding.QueueInfo { QueueType = QueueType.Dynamic, Name = dynamicQueueAttribute.Prefix };
return durableQueueAttribute != null
? new ControllerMethodBinding.QueueInfo { Dynamic = false, Name = durableQueueAttribute.Name }
? new ControllerMethodBinding.QueueInfo { QueueType = QueueType.Durable, Name = durableQueueAttribute.Name }
: null;
}
}