From 7aab0f86bebd3600bed1b7821260502196f9e3cc Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Tue, 2 Nov 2021 15:48:14 +0100 Subject: [PATCH] Added IFlowStore.GetActiveFlows for monitoring purposes --- .../SqlConnectionFlowRepository.cs | 11 ++-- Tapeti.Flow/Default/FlowStore.cs | 29 ++++++++--- .../Default/NonPersistentFlowRepository.cs | 5 +- Tapeti.Flow/IFlowRepository.cs | 52 +++++++++++++++---- Tapeti.Flow/IFlowStore.cs | 39 ++++++++++++++ 5 files changed, 112 insertions(+), 24 deletions(-) diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs index 19f7a8b..3da64b3 100644 --- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs +++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs @@ -37,24 +37,25 @@ namespace Tapeti.Flow.SQL /// - public async Task>> GetStates() + public async Task>> GetStates() { return await SqlRetryHelper.Execute(async () => { using (var connection = await GetConnection()) { - var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection); + var flowQuery = new SqlCommand($"select FlowID, CreationTime, StateJson from {tableName}", connection); var flowReader = await flowQuery.ExecuteReaderAsync(); - var result = new List>(); + var result = new List>(); while (await flowReader.ReadAsync()) { var flowID = flowReader.GetGuid(0); - var stateJson = flowReader.GetString(1); + var creationTime = flowReader.GetDateTime(1); + var stateJson = flowReader.GetString(2); var state = JsonConvert.DeserializeObject(stateJson); - result.Add(new KeyValuePair(flowID, state)); + result.Add(new FlowRecord(flowID, creationTime, state)); } return result; diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index 8b0aa96..044e9d0 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -18,11 +18,13 @@ namespace Tapeti.Flow.Default private class CachedFlowState { public readonly FlowState FlowState; + public readonly DateTime CreationTime; public readonly bool IsPersistent; - public CachedFlowState(FlowState flowState, bool isPersistent) + public CachedFlowState(FlowState flowState, DateTime creationTime, bool isPersistent) { FlowState = flowState; + CreationTime = creationTime; IsPersistent = isPersistent; } } @@ -64,12 +66,12 @@ namespace Tapeti.Flow.Default { foreach (var flowStateRecord in await repository.GetStates()) { - flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true)); + flowStates.TryAdd(flowStateRecord.FlowID, new CachedFlowState(flowStateRecord.FlowState, flowStateRecord.CreationTime, true)); - foreach (var continuation in flowStateRecord.Value.Continuations) + foreach (var continuation in flowStateRecord.FlowState.Continuations) { - ValidateContinuation(flowStateRecord.Key, continuation.Key, continuation.Value); - continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key); + ValidateContinuation(flowStateRecord.FlowID, continuation.Key, continuation.Value); + continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.FlowID); } } } @@ -134,6 +136,18 @@ namespace Tapeti.Flow.Default } + /// + public Task> GetActiveFlows(TimeSpan minimumAge) + { + var maximumDateTime = DateTime.UtcNow - minimumAge; + + return Task.FromResult(flowStates + .Where(p => p.Value.CreationTime <= maximumDateTime) + .Select(p => new ActiveFlow(p.Key, p.Value.CreationTime)) + .ToArray() as IEnumerable); + } + + private class FlowStateLock : IFlowStateLock { private readonly FlowStore owner; @@ -190,7 +204,7 @@ namespace Tapeti.Flow.Default var isNew = cachedFlowState == null; var wasPersistent = cachedFlowState?.IsPersistent ?? false; - cachedFlowState = new CachedFlowState(newFlowState, persistent); + cachedFlowState = new CachedFlowState(newFlowState, isNew ? DateTime.UtcNow : cachedFlowState.CreationTime, persistent); owner.flowStates[FlowID] = cachedFlowState; if (persistent) @@ -198,8 +212,7 @@ namespace Tapeti.Flow.Default // Storing the flowstate in the underlying repository if (isNew) { - var now = DateTime.UtcNow; - await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, now); + await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, cachedFlowState.CreationTime); } else { diff --git a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs index b20bfd6..b1aa283 100644 --- a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs +++ b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; namespace Tapeti.Flow.Default @@ -10,9 +11,9 @@ namespace Tapeti.Flow.Default /// public class NonPersistentFlowRepository : IFlowRepository { - Task>> IFlowRepository.GetStates() + Task>> IFlowRepository.GetStates() { - return Task.FromResult(new List>()); + return Task.FromResult(Enumerable.Empty>()); } /// diff --git a/Tapeti.Flow/IFlowRepository.cs b/Tapeti.Flow/IFlowRepository.cs index f147684..cde801c 100644 --- a/Tapeti.Flow/IFlowRepository.cs +++ b/Tapeti.Flow/IFlowRepository.cs @@ -13,30 +13,64 @@ namespace Tapeti.Flow /// Load the previously persisted flow states. /// /// A list of flow states, where the key is the unique Flow ID and the value is the deserialized T. - Task>> GetStates(); + Task>> GetStates(); /// /// Stores a new flow state. Guaranteed to be run in a lock for the specified flow ID. /// - /// - /// - /// + /// The unique ID of the flow. + /// The flow state to be stored. + /// The time when the flow was initially created. /// Task CreateState(Guid flowID, T state, DateTime timestamp); /// /// Updates an existing flow state. Guaranteed to be run in a lock for the specified flow ID. /// - /// - /// - /// - /// + /// The unique ID of the flow. + /// The flow state to be stored. Task UpdateState(Guid flowID, T state); /// /// Delete a flow state. Guaranteed to be run in a lock for the specified flow ID. /// - /// + /// The unique ID of the flow. Task DeleteState(Guid flowID); } + + + /// + /// Contains information about a persisted flow state. + /// + public class FlowRecord + { + /// + /// The unique ID of the flow. + /// + public Guid FlowID { get; } + + /// + /// The time when the flow was initially created. + /// + public DateTime CreationTime { get; } + + /// + /// The stored flow state. + /// + public T FlowState { get; } + + + /// + /// Creates a new instance of a FlowRecord. + /// + /// + /// + /// + public FlowRecord(Guid flowID, DateTime creationTime, T flowState) + { + FlowID = flowID; + CreationTime = creationTime; + FlowState = flowState; + } + } } diff --git a/Tapeti.Flow/IFlowStore.cs b/Tapeti.Flow/IFlowStore.cs index 21c4337..b3720b3 100644 --- a/Tapeti.Flow/IFlowStore.cs +++ b/Tapeti.Flow/IFlowStore.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using Tapeti.Flow.Default; @@ -29,6 +30,15 @@ namespace Tapeti.Flow /// /// Task LockFlowState(Guid flowID); + + /// + /// Returns information about the currently active flows. + /// + /// + /// This is intended for monitoring purposes and should be treated as a snapshot. + /// + /// The minimum age of the flow before it is included in the result. Set to TimeSpan.Zero to return all active flows. + Task> GetActiveFlows(TimeSpan minimumAge); } @@ -60,4 +70,33 @@ namespace Tapeti.Flow /// Task DeleteFlowState(); } + + + /// + /// Contains information about an active flow, as returned by . + /// + public class ActiveFlow + { + /// + /// The ID of the active flow. + /// + public Guid FlowID { get; } + + /// + /// The time when the flow was initially created. + /// + public DateTime CreationTime { get; } + + + /// + /// Create a new instance of an ActiveFlow. + /// + /// The ID of the active flow. + /// The time when the flow was initially created. + public ActiveFlow(Guid flowID, DateTime creationTime) + { + FlowID = flowID; + CreationTime = creationTime; + } + } }