2018-12-19 19:50:56 +00:00
using System ;
2017-01-31 11:01:08 +00:00
using System.Collections.Concurrent ;
using System.Collections.Generic ;
2021-07-18 12:29:41 +00:00
using System.IO ;
2017-01-31 11:01:08 +00:00
using System.Linq ;
using System.Threading.Tasks ;
2021-07-18 12:29:41 +00:00
using Tapeti.Config ;
2017-10-17 08:34:07 +00:00
using Tapeti.Flow.FlowHelpers ;
2023-04-13 06:39:43 +00:00
using Tapeti.Helpers ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
namespace Tapeti.Flow.Default
2017-01-31 11:01:08 +00:00
{
2019-08-14 18:48:40 +00:00
/// <summary>
/// Default implementation of IFlowStore.
/// </summary>
2017-01-31 11:01:08 +00:00
public class FlowStore : IFlowStore
{
2019-08-19 07:33:07 +00:00
private class CachedFlowState
{
2022-11-23 08:13:38 +00:00
public readonly FlowState ? FlowState ;
2021-11-02 14:48:14 +00:00
public readonly DateTime CreationTime ;
2019-08-19 07:33:07 +00:00
public readonly bool IsPersistent ;
2022-11-23 08:13:38 +00:00
public CachedFlowState ( FlowState ? flowState , DateTime creationTime , bool isPersistent )
2019-08-19 07:33:07 +00:00
{
FlowState = flowState ;
2021-11-02 14:48:14 +00:00
CreationTime = creationTime ;
2019-08-19 07:33:07 +00:00
IsPersistent = isPersistent ;
}
}
2022-11-17 15:47:07 +00:00
private readonly ConcurrentDictionary < Guid , CachedFlowState > flowStates = new ( ) ;
private readonly ConcurrentDictionary < Guid , Guid > continuationLookup = new ( ) ;
private readonly LockCollection < Guid > locks = new ( EqualityComparer < Guid > . Default ) ;
2022-11-23 08:13:38 +00:00
private HashSet < string > ? validatedMethods ;
2017-02-05 22:22:34 +00:00
2017-08-14 11:58:01 +00:00
private readonly IFlowRepository repository ;
2021-07-18 12:29:41 +00:00
private readonly ITapetiConfig config ;
2017-01-31 11:01:08 +00:00
2018-12-19 19:50:56 +00:00
private volatile bool inUse ;
2019-05-02 11:26:59 +00:00
private volatile bool loaded ;
2017-01-31 11:01:08 +00:00
2019-08-15 09:26:55 +00:00
2021-05-29 19:51:58 +00:00
/// <summary>
/// </summary>
2021-07-18 12:29:41 +00:00
public FlowStore ( IFlowRepository repository , ITapetiConfig config )
2017-01-31 11:01:08 +00:00
{
this . repository = repository ;
2021-07-18 12:29:41 +00:00
this . config = config ;
2017-01-31 11:01:08 +00:00
}
2019-08-15 09:26:55 +00:00
/// <inheritdoc />
2022-02-09 10:26:56 +00:00
public async ValueTask Load ( )
2017-01-31 11:01:08 +00:00
{
2018-12-19 19:50:56 +00:00
if ( inUse )
2017-10-17 08:34:07 +00:00
throw new InvalidOperationException ( "Can only load the saved state once." ) ;
2018-12-19 19:50:56 +00:00
inUse = true ;
2017-10-17 08:34:07 +00:00
2018-12-19 19:50:56 +00:00
flowStates . Clear ( ) ;
continuationLookup . Clear ( ) ;
2017-01-31 11:01:08 +00:00
2021-07-18 12:29:41 +00:00
validatedMethods = new HashSet < string > ( ) ;
try
2017-01-31 11:01:08 +00:00
{
2021-07-18 12:29:41 +00:00
foreach ( var flowStateRecord in await repository . GetStates < FlowState > ( ) )
{
2021-11-02 14:48:14 +00:00
flowStates . TryAdd ( flowStateRecord . FlowID , new CachedFlowState ( flowStateRecord . FlowState , flowStateRecord . CreationTime , true ) ) ;
2021-07-18 12:29:41 +00:00
2021-11-02 14:48:14 +00:00
foreach ( var continuation in flowStateRecord . FlowState . Continuations )
2021-07-18 12:29:41 +00:00
{
2021-11-02 14:48:14 +00:00
ValidateContinuation ( flowStateRecord . FlowID , continuation . Key , continuation . Value ) ;
continuationLookup . GetOrAdd ( continuation . Key , flowStateRecord . FlowID ) ;
2021-07-18 12:29:41 +00:00
}
}
}
finally
{
validatedMethods = null ;
2017-01-31 11:01:08 +00:00
}
2019-05-02 11:26:59 +00:00
loaded = true ;
2017-01-31 11:01:08 +00:00
}
2021-07-18 12:29:41 +00:00
private void ValidateContinuation ( Guid flowId , Guid continuationId , ContinuationMetadata metadata )
{
2022-11-23 08:13:38 +00:00
if ( string . IsNullOrEmpty ( metadata . MethodName ) )
return ;
2021-07-18 12:29:41 +00:00
// We could check all the things that are required for a continuation or converge method, but this should suffice
// for the common scenario where you change code without realizing that it's signature has been persisted
2021-10-26 18:33:25 +00:00
// ReSharper disable once InvertIf
2022-11-23 08:13:38 +00:00
if ( validatedMethods ! . Add ( metadata . MethodName ) )
2021-07-18 12:29:41 +00:00
{
var methodInfo = MethodSerializer . Deserialize ( metadata . MethodName ) ;
if ( methodInfo = = null )
throw new InvalidDataException ( $"Flow ID {flowId} references continuation method '{metadata.MethodName}' which no longer exists (continuation Id = {continuationId})" ) ;
var binding = config . Bindings . ForMethod ( methodInfo ) ;
if ( binding = = null )
throw new InvalidDataException ( $"Flow ID {flowId} references continuation method '{metadata.MethodName}' which no longer has a binding as a message handler (continuation Id = {continuationId})" ) ;
}
2021-10-26 18:33:25 +00:00
/ * Disabled for now - the ConvergeMethodName does not include the assembly so we can ' t easily check it
2021-07-18 12:29:41 +00:00
if ( string . IsNullOrEmpty ( metadata . ConvergeMethodName ) | | ! validatedMethods . Add ( metadata . ConvergeMethodName ) )
return ;
var convergeMethodInfo = MethodSerializer . Deserialize ( metadata . ConvergeMethodName ) ;
if ( convergeMethodInfo = = null )
throw new InvalidDataException ( $"Flow ID {flowId} references converge method '{metadata.ConvergeMethodName}' which no longer exists (continuation Id = {continuationId})" ) ;
2021-07-18 12:39:18 +00:00
// Converge methods are not message handlers themselves
2021-10-26 18:33:25 +00:00
* /
2021-07-18 12:29:41 +00:00
}
2017-01-31 11:01:08 +00:00
2019-08-15 09:26:55 +00:00
/// <inheritdoc />
2022-02-09 10:26:56 +00:00
public ValueTask < Guid ? > FindFlowID ( Guid continuationID )
2017-01-31 11:01:08 +00:00
{
2019-05-02 11:26:59 +00:00
if ( ! loaded )
throw new InvalidOperationException ( "Flow store is not yet loaded." ) ;
2022-11-22 12:20:47 +00:00
return new ValueTask < Guid ? > ( continuationLookup . TryGetValue ( continuationID , out var result ) ? result : null ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-15 09:26:55 +00:00
/// <inheritdoc />
2022-02-09 10:26:56 +00:00
public async ValueTask < IFlowStateLock > LockFlowState ( Guid flowID )
2017-01-31 11:01:08 +00:00
{
2019-05-02 11:32:03 +00:00
if ( ! loaded )
2019-05-02 11:26:59 +00:00
throw new InvalidOperationException ( "Flow store should be loaded before storing flows." ) ;
2018-12-19 19:50:56 +00:00
inUse = true ;
2017-01-31 11:01:08 +00:00
2018-12-19 19:50:56 +00:00
var flowStatelock = new FlowStateLock ( this , flowID , await locks . GetLock ( flowID ) ) ;
2017-10-17 08:34:07 +00:00
return flowStatelock ;
2017-01-31 11:01:08 +00:00
}
2019-08-15 09:26:55 +00:00
2021-11-02 14:48:14 +00:00
/// <inheritdoc />
2022-02-09 10:26:56 +00:00
public ValueTask < IEnumerable < ActiveFlow > > GetActiveFlows ( TimeSpan minimumAge )
2021-11-02 14:48:14 +00:00
{
var maximumDateTime = DateTime . UtcNow - minimumAge ;
2022-02-09 10:26:56 +00:00
return new ValueTask < IEnumerable < ActiveFlow > > ( flowStates
2021-11-02 14:48:14 +00:00
. Where ( p = > p . Value . CreationTime < = maximumDateTime )
. Select ( p = > new ActiveFlow ( p . Key , p . Value . CreationTime ) )
2022-02-09 10:26:56 +00:00
. ToArray ( ) ) ;
2021-11-02 14:48:14 +00:00
}
2017-01-31 11:01:08 +00:00
private class FlowStateLock : IFlowStateLock
{
private readonly FlowStore owner ;
2022-11-23 08:13:38 +00:00
private volatile IDisposable ? flowLock ;
private CachedFlowState ? cachedFlowState ;
2017-01-31 11:01:08 +00:00
2019-08-15 09:26:55 +00:00
public Guid FlowID { get ; }
2017-01-31 11:01:08 +00:00
2017-10-17 08:34:07 +00:00
public FlowStateLock ( FlowStore owner , Guid flowID , IDisposable flowLock )
2017-01-31 11:01:08 +00:00
{
this . owner = owner ;
2019-08-15 09:32:39 +00:00
FlowID = flowID ;
2017-10-17 08:34:07 +00:00
this . flowLock = flowLock ;
2017-01-31 11:01:08 +00:00
2019-08-19 07:33:07 +00:00
owner . flowStates . TryGetValue ( flowID , out cachedFlowState ) ;
2017-01-31 11:01:08 +00:00
}
public void Dispose ( )
{
2017-10-17 08:34:07 +00:00
var l = flowLock ;
flowLock = null ;
l ? . Dispose ( ) ;
2017-01-31 11:01:08 +00:00
}
2022-11-23 08:13:38 +00:00
public ValueTask < FlowState ? > GetFlowState ( )
2017-01-31 11:01:08 +00:00
{
2017-10-17 08:34:07 +00:00
if ( flowLock = = null )
throw new ObjectDisposedException ( "FlowStateLock" ) ;
2017-01-31 11:01:08 +00:00
2022-11-23 08:13:38 +00:00
return new ValueTask < FlowState ? > ( cachedFlowState ? . FlowState ? . Clone ( ) ) ;
2017-01-31 11:01:08 +00:00
}
2022-02-09 10:26:56 +00:00
public async ValueTask StoreFlowState ( FlowState newFlowState , bool persistent )
2017-01-31 11:01:08 +00:00
{
2017-10-17 08:34:07 +00:00
if ( flowLock = = null )
throw new ObjectDisposedException ( "FlowStateLock" ) ;
// Ensure no one has a direct reference to the protected state in the dictionary
newFlowState = newFlowState . Clone ( ) ;
2017-01-31 11:01:08 +00:00
2017-10-17 08:34:07 +00:00
// Update the lookup dictionary for the ContinuationIDs
2022-11-23 08:13:38 +00:00
if ( cachedFlowState ? . FlowState ! = null )
2017-10-17 08:34:07 +00:00
{
2019-08-19 07:33:07 +00:00
foreach ( var removedContinuation in cachedFlowState . FlowState . Continuations . Keys . Where ( k = > ! newFlowState . Continuations . ContainsKey ( k ) ) )
2018-12-19 19:50:56 +00:00
owner . continuationLookup . TryRemove ( removedContinuation , out _ ) ;
2017-10-17 08:34:07 +00:00
}
2017-01-31 11:01:08 +00:00
2022-11-23 08:13:38 +00:00
foreach ( var addedContinuation in newFlowState . Continuations . Where ( c = > cachedFlowState ? . FlowState = = null | | ! cachedFlowState . FlowState . Continuations . ContainsKey ( c . Key ) ) )
2017-10-17 08:34:07 +00:00
{
2019-08-15 09:26:55 +00:00
owner . continuationLookup . TryAdd ( addedContinuation . Key , FlowID ) ;
2017-01-31 11:01:08 +00:00
}
2017-02-05 22:22:34 +00:00
2019-08-19 07:33:07 +00:00
var isNew = cachedFlowState = = null ;
var wasPersistent = cachedFlowState ? . IsPersistent ? ? false ;
2022-11-23 08:13:38 +00:00
cachedFlowState = new CachedFlowState ( newFlowState , isNew ? DateTime . UtcNow : cachedFlowState ! . CreationTime , persistent ) ;
2019-08-19 07:33:07 +00:00
owner . flowStates [ FlowID ] = cachedFlowState ;
2017-10-17 08:34:07 +00:00
2019-08-19 07:33:07 +00:00
if ( persistent )
2017-01-31 11:01:08 +00:00
{
2019-08-19 07:33:07 +00:00
// Storing the flowstate in the underlying repository
if ( isNew )
{
2021-11-02 14:48:14 +00:00
await owner . repository . CreateState ( FlowID , cachedFlowState . FlowState , cachedFlowState . CreationTime ) ;
2019-08-19 07:33:07 +00:00
}
else
{
await owner . repository . UpdateState ( FlowID , cachedFlowState . FlowState ) ;
}
2017-01-31 11:01:08 +00:00
}
2019-08-19 07:33:07 +00:00
else if ( wasPersistent )
2017-01-31 11:01:08 +00:00
{
2019-08-19 07:33:07 +00:00
// We transitioned from a durable queue to a dynamic queue,
// remove the persistent state but keep the in-memory version
await owner . repository . DeleteState ( FlowID ) ;
2017-01-31 11:01:08 +00:00
}
}
2022-02-09 10:26:56 +00:00
public async ValueTask DeleteFlowState ( )
2017-01-31 11:01:08 +00:00
{
2017-10-17 08:34:07 +00:00
if ( flowLock = = null )
throw new ObjectDisposedException ( "FlowStateLock" ) ;
2017-01-31 11:01:08 +00:00
2022-11-23 08:13:38 +00:00
if ( cachedFlowState ? . FlowState ! = null )
2017-10-17 08:34:07 +00:00
{
2019-08-19 07:33:07 +00:00
foreach ( var removedContinuation in cachedFlowState . FlowState . Continuations . Keys )
2018-12-19 19:50:56 +00:00
owner . continuationLookup . TryRemove ( removedContinuation , out _ ) ;
2017-02-05 22:22:34 +00:00
2019-08-19 07:33:07 +00:00
owner . flowStates . TryRemove ( FlowID , out var removedFlowState ) ;
cachedFlowState = null ;
2017-02-05 22:22:34 +00:00
2022-11-23 08:13:38 +00:00
if ( removedFlowState is { IsPersistent : true } )
2019-08-15 09:26:55 +00:00
await owner . repository . DeleteState ( FlowID ) ;
2017-10-17 08:34:07 +00:00
}
2017-01-31 11:01:08 +00:00
}
2017-02-05 22:22:34 +00:00
}
2017-01-31 11:01:08 +00:00
}
}