From 6d7836de2fde6f121b5bd851b9f0e4f77594cae7 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Sun, 18 Jul 2021 14:29:41 +0200 Subject: [PATCH] Implemented #27: Check methods for persisted flows while loading? --- Tapeti.Flow/Default/FlowStore.cs | 54 +++++++++++++++++++-- Tapeti.Flow/FlowHelpers/MethodSerializer.cs | 34 ++++++++++++- 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index f380962..4fccd86 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -1,8 +1,10 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Threading.Tasks; +using Tapeti.Config; using Tapeti.Flow.FlowHelpers; namespace Tapeti.Flow.Default @@ -28,8 +30,10 @@ namespace Tapeti.Flow.Default private readonly ConcurrentDictionary flowStates = new ConcurrentDictionary(); private readonly ConcurrentDictionary continuationLookup = new ConcurrentDictionary(); private readonly LockCollection locks = new LockCollection(EqualityComparer.Default); + private HashSet validatedMethods = null; private readonly IFlowRepository repository; + private readonly ITapetiConfig config; private volatile bool inUse; private volatile bool loaded; @@ -37,9 +41,10 @@ namespace Tapeti.Flow.Default /// /// - public FlowStore(IFlowRepository repository) + public FlowStore(IFlowRepository repository, ITapetiConfig config) { this.repository = repository; + this.config = config; } @@ -54,16 +59,55 @@ namespace Tapeti.Flow.Default flowStates.Clear(); continuationLookup.Clear(); - foreach (var flowStateRecord in await repository.GetStates()) + validatedMethods = new HashSet(); + try { - flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true)); + foreach (var flowStateRecord in await repository.GetStates()) + { + flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true)); - foreach (var continuation in flowStateRecord.Value.Continuations) - continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key); + foreach (var continuation in flowStateRecord.Value.Continuations) + { + ValidateContinuation(flowStateRecord.Key, continuation.Key, continuation.Value); + continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key); + } + } + } + finally + { + validatedMethods = null; } loaded = true; } + + + private void ValidateContinuation(Guid flowId, Guid continuationId, ContinuationMetadata metadata) + { + // We could check all the things that are required for a continuation or converge method, but this should suffice + // for the common scenario where you change code without realizing that it's signature has been persisted + if (validatedMethods.Add(metadata.MethodName)) + { + var methodInfo = MethodSerializer.Deserialize(metadata.MethodName); + if (methodInfo == null) + throw new InvalidDataException($"Flow ID {flowId} references continuation method '{metadata.MethodName}' which no longer exists (continuation Id = {continuationId})"); + + var binding = config.Bindings.ForMethod(methodInfo); + if (binding == null) + throw new InvalidDataException($"Flow ID {flowId} references continuation method '{metadata.MethodName}' which no longer has a binding as a message handler (continuation Id = {continuationId})"); + } + + if (string.IsNullOrEmpty(metadata.ConvergeMethodName) || !validatedMethods.Add(metadata.ConvergeMethodName)) + return; + + var convergeMethodInfo = MethodSerializer.Deserialize(metadata.ConvergeMethodName); + if (convergeMethodInfo == null) + throw new InvalidDataException($"Flow ID {flowId} references converge method '{metadata.ConvergeMethodName}' which no longer exists (continuation Id = {continuationId})"); + + var convergeBinding = config.Bindings.ForMethod(convergeMethodInfo); + if (convergeBinding == null) + throw new InvalidDataException($"Flow ID {flowId} references converge method '{metadata.ConvergeMethodName}' which no longer has a binding as a message handler (continuation Id = {continuationId})"); + } /// diff --git a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs b/Tapeti.Flow/FlowHelpers/MethodSerializer.cs index e8a099e..1784873 100644 --- a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs +++ b/Tapeti.Flow/FlowHelpers/MethodSerializer.cs @@ -1,4 +1,6 @@ -using System.Reflection; +using System; +using System.Reflection; +using System.Text.RegularExpressions; namespace Tapeti.Flow.FlowHelpers { @@ -15,5 +17,35 @@ namespace Tapeti.Flow.FlowHelpers { return method.Name + '@' + method.DeclaringType?.Assembly.GetName().Name + ':' + method.DeclaringType?.FullName; } + + + private static readonly Regex DeserializeRegex = new Regex("^(?.+?)@(?.+?):(?.+?)$"); + + + /// + /// Deserializes the serialized method representation back into it's MethodInfo, or null if not found. + /// + /// + public static MethodInfo Deserialize(string serializedMethod) + { + var match = DeserializeRegex.Match(serializedMethod); + if (!match.Success) + return null; + + Assembly assembly; + try + { + assembly = Assembly.Load(match.Groups["assembly"].Value); + if (assembly == null) + return null; + } + catch + { + return null; + } + + var declaringType = assembly.GetType(match.Groups["type"].Value); + return declaringType?.GetMethod(match.Groups["method"].Value); + } } }