2018-12-19 19:50:56 +00:00
using System ;
2017-01-31 11:01:08 +00:00
using System.Collections.Concurrent ;
using System.Collections.Generic ;
2021-07-18 12:29:41 +00:00
using System.IO ;
2017-01-31 11:01:08 +00:00
using System.Linq ;
using System.Threading.Tasks ;
2021-07-18 12:29:41 +00:00
using Tapeti.Config ;
2017-10-17 08:34:07 +00:00
using Tapeti.Flow.FlowHelpers ;
2017-01-31 11:01:08 +00:00
2017-02-05 22:22:34 +00:00
namespace Tapeti.Flow.Default
2017-01-31 11:01:08 +00:00
{
2019-08-14 18:48:40 +00:00
/// <inheritdoc />
/// <summary>
/// Default implementation of IFlowStore.
/// </summary>
2017-01-31 11:01:08 +00:00
public class FlowStore : IFlowStore
{
2019-08-19 07:33:07 +00:00
private class CachedFlowState
{
public readonly FlowState FlowState ;
public readonly bool IsPersistent ;
public CachedFlowState ( FlowState flowState , bool isPersistent )
{
FlowState = flowState ;
IsPersistent = isPersistent ;
}
}
private readonly ConcurrentDictionary < Guid , CachedFlowState > flowStates = new ConcurrentDictionary < Guid , CachedFlowState > ( ) ;
2018-12-19 19:50:56 +00:00
private readonly ConcurrentDictionary < Guid , Guid > continuationLookup = new ConcurrentDictionary < Guid , Guid > ( ) ;
private readonly LockCollection < Guid > locks = new LockCollection < Guid > ( EqualityComparer < Guid > . Default ) ;
2021-07-18 12:29:41 +00:00
private HashSet < string > validatedMethods = null ;
2017-02-05 22:22:34 +00:00
2017-08-14 11:58:01 +00:00
private readonly IFlowRepository repository ;
2021-07-18 12:29:41 +00:00
private readonly ITapetiConfig config ;
2017-01-31 11:01:08 +00:00
2018-12-19 19:50:56 +00:00
private volatile bool inUse ;
2019-05-02 11:26:59 +00:00
private volatile bool loaded ;
2017-01-31 11:01:08 +00:00
2019-08-15 09:26:55 +00:00
2021-05-29 19:51:58 +00:00
/// <summary>
/// </summary>
2021-07-18 12:29:41 +00:00
public FlowStore ( IFlowRepository repository , ITapetiConfig config )
2017-01-31 11:01:08 +00:00
{
this . repository = repository ;
2021-07-18 12:29:41 +00:00
this . config = config ;
2017-01-31 11:01:08 +00:00
}
2019-08-15 09:26:55 +00:00
/// <inheritdoc />
2017-01-31 11:01:08 +00:00
public async Task Load ( )
{
2018-12-19 19:50:56 +00:00
if ( inUse )
2017-10-17 08:34:07 +00:00
throw new InvalidOperationException ( "Can only load the saved state once." ) ;
2018-12-19 19:50:56 +00:00
inUse = true ;
2017-10-17 08:34:07 +00:00
2018-12-19 19:50:56 +00:00
flowStates . Clear ( ) ;
continuationLookup . Clear ( ) ;
2017-01-31 11:01:08 +00:00
2021-07-18 12:29:41 +00:00
validatedMethods = new HashSet < string > ( ) ;
try
2017-01-31 11:01:08 +00:00
{
2021-07-18 12:29:41 +00:00
foreach ( var flowStateRecord in await repository . GetStates < FlowState > ( ) )
{
flowStates . TryAdd ( flowStateRecord . Key , new CachedFlowState ( flowStateRecord . Value , true ) ) ;
foreach ( var continuation in flowStateRecord . Value . Continuations )
{
ValidateContinuation ( flowStateRecord . Key , continuation . Key , continuation . Value ) ;
continuationLookup . GetOrAdd ( continuation . Key , flowStateRecord . Key ) ;
}
}
}
finally
{
validatedMethods = null ;
2017-01-31 11:01:08 +00:00
}
2019-05-02 11:26:59 +00:00
loaded = true ;
2017-01-31 11:01:08 +00:00
}
2021-07-18 12:29:41 +00:00
private void ValidateContinuation ( Guid flowId , Guid continuationId , ContinuationMetadata metadata )
{
// We could check all the things that are required for a continuation or converge method, but this should suffice
// for the common scenario where you change code without realizing that it's signature has been persisted
if ( validatedMethods . Add ( metadata . MethodName ) )
{
var methodInfo = MethodSerializer . Deserialize ( metadata . MethodName ) ;
if ( methodInfo = = null )
throw new InvalidDataException ( $"Flow ID {flowId} references continuation method '{metadata.MethodName}' which no longer exists (continuation Id = {continuationId})" ) ;
var binding = config . Bindings . ForMethod ( methodInfo ) ;
if ( binding = = null )
throw new InvalidDataException ( $"Flow ID {flowId} references continuation method '{metadata.MethodName}' which no longer has a binding as a message handler (continuation Id = {continuationId})" ) ;
}
if ( string . IsNullOrEmpty ( metadata . ConvergeMethodName ) | | ! validatedMethods . Add ( metadata . ConvergeMethodName ) )
return ;
var convergeMethodInfo = MethodSerializer . Deserialize ( metadata . ConvergeMethodName ) ;
if ( convergeMethodInfo = = null )
throw new InvalidDataException ( $"Flow ID {flowId} references converge method '{metadata.ConvergeMethodName}' which no longer exists (continuation Id = {continuationId})" ) ;
2021-07-18 12:39:18 +00:00
// Converge methods are not message handlers themselves
2021-07-18 12:29:41 +00:00
}
2017-01-31 11:01:08 +00:00
2019-08-15 09:26:55 +00:00
/// <inheritdoc />
2017-02-05 22:22:34 +00:00
public Task < Guid ? > FindFlowID ( Guid continuationID )
2017-01-31 11:01:08 +00:00
{
2019-05-02 11:26:59 +00:00
if ( ! loaded )
throw new InvalidOperationException ( "Flow store is not yet loaded." ) ;
2018-12-19 19:50:56 +00:00
return Task . FromResult ( continuationLookup . TryGetValue ( continuationID , out var result ) ? result : ( Guid ? ) null ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-15 09:26:55 +00:00
/// <inheritdoc />
2017-02-05 22:22:34 +00:00
public async Task < IFlowStateLock > LockFlowState ( Guid flowID )
2017-01-31 11:01:08 +00:00
{
2019-05-02 11:32:03 +00:00
if ( ! loaded )
2019-05-02 11:26:59 +00:00
throw new InvalidOperationException ( "Flow store should be loaded before storing flows." ) ;
2018-12-19 19:50:56 +00:00
inUse = true ;
2017-01-31 11:01:08 +00:00
2018-12-19 19:50:56 +00:00
var flowStatelock = new FlowStateLock ( this , flowID , await locks . GetLock ( flowID ) ) ;
2017-10-17 08:34:07 +00:00
return flowStatelock ;
2017-01-31 11:01:08 +00:00
}
2019-08-15 09:26:55 +00:00
2017-01-31 11:01:08 +00:00
private class FlowStateLock : IFlowStateLock
{
private readonly FlowStore owner ;
2017-10-17 08:34:07 +00:00
private volatile IDisposable flowLock ;
2019-08-19 07:33:07 +00:00
private CachedFlowState cachedFlowState ;
2017-01-31 11:01:08 +00:00
2019-08-15 09:26:55 +00:00
public Guid FlowID { get ; }
2017-01-31 11:01:08 +00:00
2017-10-17 08:34:07 +00:00
public FlowStateLock ( FlowStore owner , Guid flowID , IDisposable flowLock )
2017-01-31 11:01:08 +00:00
{
this . owner = owner ;
2019-08-15 09:32:39 +00:00
FlowID = flowID ;
2017-10-17 08:34:07 +00:00
this . flowLock = flowLock ;
2017-01-31 11:01:08 +00:00
2019-08-19 07:33:07 +00:00
owner . flowStates . TryGetValue ( flowID , out cachedFlowState ) ;
2017-01-31 11:01:08 +00:00
}
public void Dispose ( )
{
2017-10-17 08:34:07 +00:00
var l = flowLock ;
flowLock = null ;
l ? . Dispose ( ) ;
2017-01-31 11:01:08 +00:00
}
public Task < FlowState > GetFlowState ( )
{
2017-10-17 08:34:07 +00:00
if ( flowLock = = null )
throw new ObjectDisposedException ( "FlowStateLock" ) ;
2017-01-31 11:01:08 +00:00
2019-08-19 07:33:07 +00:00
return Task . FromResult ( cachedFlowState . FlowState ? . Clone ( ) ) ;
2017-01-31 11:01:08 +00:00
}
2019-08-19 07:33:07 +00:00
public async Task StoreFlowState ( FlowState newFlowState , bool persistent )
2017-01-31 11:01:08 +00:00
{
2017-10-17 08:34:07 +00:00
if ( flowLock = = null )
throw new ObjectDisposedException ( "FlowStateLock" ) ;
// Ensure no one has a direct reference to the protected state in the dictionary
newFlowState = newFlowState . Clone ( ) ;
2017-01-31 11:01:08 +00:00
2017-10-17 08:34:07 +00:00
// Update the lookup dictionary for the ContinuationIDs
2019-08-19 07:33:07 +00:00
if ( cachedFlowState ! = null )
2017-10-17 08:34:07 +00:00
{
2019-08-19 07:33:07 +00:00
foreach ( var removedContinuation in cachedFlowState . FlowState . Continuations . Keys . Where ( k = > ! newFlowState . Continuations . ContainsKey ( k ) ) )
2018-12-19 19:50:56 +00:00
owner . continuationLookup . TryRemove ( removedContinuation , out _ ) ;
2017-10-17 08:34:07 +00:00
}
2017-01-31 11:01:08 +00:00
2019-08-19 07:33:07 +00:00
foreach ( var addedContinuation in newFlowState . Continuations . Where ( c = > cachedFlowState = = null | | ! cachedFlowState . FlowState . Continuations . ContainsKey ( c . Key ) ) )
2017-10-17 08:34:07 +00:00
{
2019-08-15 09:26:55 +00:00
owner . continuationLookup . TryAdd ( addedContinuation . Key , FlowID ) ;
2017-01-31 11:01:08 +00:00
}
2017-02-05 22:22:34 +00:00
2019-08-19 07:33:07 +00:00
var isNew = cachedFlowState = = null ;
var wasPersistent = cachedFlowState ? . IsPersistent ? ? false ;
cachedFlowState = new CachedFlowState ( newFlowState , persistent ) ;
owner . flowStates [ FlowID ] = cachedFlowState ;
2017-10-17 08:34:07 +00:00
2019-08-19 07:33:07 +00:00
if ( persistent )
2017-01-31 11:01:08 +00:00
{
2019-08-19 07:33:07 +00:00
// Storing the flowstate in the underlying repository
if ( isNew )
{
var now = DateTime . UtcNow ;
await owner . repository . CreateState ( FlowID , cachedFlowState . FlowState , now ) ;
}
else
{
await owner . repository . UpdateState ( FlowID , cachedFlowState . FlowState ) ;
}
2017-01-31 11:01:08 +00:00
}
2019-08-19 07:33:07 +00:00
else if ( wasPersistent )
2017-01-31 11:01:08 +00:00
{
2019-08-19 07:33:07 +00:00
// We transitioned from a durable queue to a dynamic queue,
// remove the persistent state but keep the in-memory version
await owner . repository . DeleteState ( FlowID ) ;
2017-01-31 11:01:08 +00:00
}
}
public async Task DeleteFlowState ( )
{
2017-10-17 08:34:07 +00:00
if ( flowLock = = null )
throw new ObjectDisposedException ( "FlowStateLock" ) ;
2017-01-31 11:01:08 +00:00
2019-08-19 07:33:07 +00:00
if ( cachedFlowState ! = null )
2017-10-17 08:34:07 +00:00
{
2019-08-19 07:33:07 +00:00
foreach ( var removedContinuation in cachedFlowState . FlowState . Continuations . Keys )
2018-12-19 19:50:56 +00:00
owner . continuationLookup . TryRemove ( removedContinuation , out _ ) ;
2017-02-05 22:22:34 +00:00
2019-08-19 07:33:07 +00:00
owner . flowStates . TryRemove ( FlowID , out var removedFlowState ) ;
cachedFlowState = null ;
2017-02-05 22:22:34 +00:00
2019-08-19 07:33:07 +00:00
if ( removedFlowState . IsPersistent )
2019-08-15 09:26:55 +00:00
await owner . repository . DeleteState ( FlowID ) ;
2017-10-17 08:34:07 +00:00
}
2017-01-31 11:01:08 +00:00
}
2017-02-05 22:22:34 +00:00
}
2017-01-31 11:01:08 +00:00
}
}