1
0
mirror of synced 2024-11-22 09:13:51 +00:00

Implemented #27: Check methods for persisted flows while loading?

This commit is contained in:
Mark van Renswoude 2021-07-18 14:29:41 +02:00
parent e9a7e32bf5
commit 6d7836de2f
2 changed files with 82 additions and 6 deletions

View File

@ -1,8 +1,10 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Flow.FlowHelpers; using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
@ -28,8 +30,10 @@ namespace Tapeti.Flow.Default
private readonly ConcurrentDictionary<Guid, CachedFlowState> flowStates = new ConcurrentDictionary<Guid, CachedFlowState>(); private readonly ConcurrentDictionary<Guid, CachedFlowState> flowStates = new ConcurrentDictionary<Guid, CachedFlowState>();
private readonly ConcurrentDictionary<Guid, Guid> continuationLookup = new ConcurrentDictionary<Guid, Guid>(); private readonly ConcurrentDictionary<Guid, Guid> continuationLookup = new ConcurrentDictionary<Guid, Guid>();
private readonly LockCollection<Guid> locks = new LockCollection<Guid>(EqualityComparer<Guid>.Default); private readonly LockCollection<Guid> locks = new LockCollection<Guid>(EqualityComparer<Guid>.Default);
private HashSet<string> validatedMethods = null;
private readonly IFlowRepository repository; private readonly IFlowRepository repository;
private readonly ITapetiConfig config;
private volatile bool inUse; private volatile bool inUse;
private volatile bool loaded; private volatile bool loaded;
@ -37,9 +41,10 @@ namespace Tapeti.Flow.Default
/// <summary> /// <summary>
/// </summary> /// </summary>
public FlowStore(IFlowRepository repository) public FlowStore(IFlowRepository repository, ITapetiConfig config)
{ {
this.repository = repository; this.repository = repository;
this.config = config;
} }
@ -54,18 +59,57 @@ namespace Tapeti.Flow.Default
flowStates.Clear(); flowStates.Clear();
continuationLookup.Clear(); continuationLookup.Clear();
foreach (var flowStateRecord in await repository.GetStates<FlowState>()) validatedMethods = new HashSet<string>();
try
{ {
flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true)); foreach (var flowStateRecord in await repository.GetStates<FlowState>())
{
flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true));
foreach (var continuation in flowStateRecord.Value.Continuations) foreach (var continuation in flowStateRecord.Value.Continuations)
continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key); {
ValidateContinuation(flowStateRecord.Key, continuation.Key, continuation.Value);
continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key);
}
}
}
finally
{
validatedMethods = null;
} }
loaded = true; loaded = true;
} }
private void ValidateContinuation(Guid flowId, Guid continuationId, ContinuationMetadata metadata)
{
// We could check all the things that are required for a continuation or converge method, but this should suffice
// for the common scenario where you change code without realizing that it's signature has been persisted
if (validatedMethods.Add(metadata.MethodName))
{
var methodInfo = MethodSerializer.Deserialize(metadata.MethodName);
if (methodInfo == null)
throw new InvalidDataException($"Flow ID {flowId} references continuation method '{metadata.MethodName}' which no longer exists (continuation Id = {continuationId})");
var binding = config.Bindings.ForMethod(methodInfo);
if (binding == null)
throw new InvalidDataException($"Flow ID {flowId} references continuation method '{metadata.MethodName}' which no longer has a binding as a message handler (continuation Id = {continuationId})");
}
if (string.IsNullOrEmpty(metadata.ConvergeMethodName) || !validatedMethods.Add(metadata.ConvergeMethodName))
return;
var convergeMethodInfo = MethodSerializer.Deserialize(metadata.ConvergeMethodName);
if (convergeMethodInfo == null)
throw new InvalidDataException($"Flow ID {flowId} references converge method '{metadata.ConvergeMethodName}' which no longer exists (continuation Id = {continuationId})");
var convergeBinding = config.Bindings.ForMethod(convergeMethodInfo);
if (convergeBinding == null)
throw new InvalidDataException($"Flow ID {flowId} references converge method '{metadata.ConvergeMethodName}' which no longer has a binding as a message handler (continuation Id = {continuationId})");
}
/// <inheritdoc /> /// <inheritdoc />
public Task<Guid?> FindFlowID(Guid continuationID) public Task<Guid?> FindFlowID(Guid continuationID)
{ {

View File

@ -1,4 +1,6 @@
using System.Reflection; using System;
using System.Reflection;
using System.Text.RegularExpressions;
namespace Tapeti.Flow.FlowHelpers namespace Tapeti.Flow.FlowHelpers
{ {
@ -15,5 +17,35 @@ namespace Tapeti.Flow.FlowHelpers
{ {
return method.Name + '@' + method.DeclaringType?.Assembly.GetName().Name + ':' + method.DeclaringType?.FullName; return method.Name + '@' + method.DeclaringType?.Assembly.GetName().Name + ':' + method.DeclaringType?.FullName;
} }
private static readonly Regex DeserializeRegex = new Regex("^(?<method>.+?)@(?<assembly>.+?):(?<type>.+?)$");
/// <summary>
/// Deserializes the serialized method representation back into it's MethodInfo, or null if not found.
/// </summary>
/// <param name="serializedMethod"></param>
public static MethodInfo Deserialize(string serializedMethod)
{
var match = DeserializeRegex.Match(serializedMethod);
if (!match.Success)
return null;
Assembly assembly;
try
{
assembly = Assembly.Load(match.Groups["assembly"].Value);
if (assembly == null)
return null;
}
catch
{
return null;
}
var declaringType = assembly.GetType(match.Groups["type"].Value);
return declaringType?.GetMethod(match.Groups["method"].Value);
}
} }
} }