1
0
mirror of synced 2024-11-21 17:03:50 +00:00

Added IFlowStore.GetActiveFlows for monitoring purposes

This commit is contained in:
Mark van Renswoude 2021-11-02 15:48:14 +01:00
parent b36a3e400a
commit 7aab0f86be
5 changed files with 112 additions and 24 deletions

View File

@ -37,24 +37,25 @@ namespace Tapeti.Flow.SQL
/// <inheritdoc /> /// <inheritdoc />
public async Task<List<KeyValuePair<Guid, T>>> GetStates<T>() public async Task<IEnumerable<FlowRecord<T>>> GetStates<T>()
{ {
return await SqlRetryHelper.Execute(async () => return await SqlRetryHelper.Execute(async () =>
{ {
using (var connection = await GetConnection()) using (var connection = await GetConnection())
{ {
var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection); var flowQuery = new SqlCommand($"select FlowID, CreationTime, StateJson from {tableName}", connection);
var flowReader = await flowQuery.ExecuteReaderAsync(); var flowReader = await flowQuery.ExecuteReaderAsync();
var result = new List<KeyValuePair<Guid, T>>(); var result = new List<FlowRecord<T>>();
while (await flowReader.ReadAsync()) while (await flowReader.ReadAsync())
{ {
var flowID = flowReader.GetGuid(0); var flowID = flowReader.GetGuid(0);
var stateJson = flowReader.GetString(1); var creationTime = flowReader.GetDateTime(1);
var stateJson = flowReader.GetString(2);
var state = JsonConvert.DeserializeObject<T>(stateJson); var state = JsonConvert.DeserializeObject<T>(stateJson);
result.Add(new KeyValuePair<Guid, T>(flowID, state)); result.Add(new FlowRecord<T>(flowID, creationTime, state));
} }
return result; return result;

View File

@ -18,11 +18,13 @@ namespace Tapeti.Flow.Default
private class CachedFlowState private class CachedFlowState
{ {
public readonly FlowState FlowState; public readonly FlowState FlowState;
public readonly DateTime CreationTime;
public readonly bool IsPersistent; public readonly bool IsPersistent;
public CachedFlowState(FlowState flowState, bool isPersistent) public CachedFlowState(FlowState flowState, DateTime creationTime, bool isPersistent)
{ {
FlowState = flowState; FlowState = flowState;
CreationTime = creationTime;
IsPersistent = isPersistent; IsPersistent = isPersistent;
} }
} }
@ -64,12 +66,12 @@ namespace Tapeti.Flow.Default
{ {
foreach (var flowStateRecord in await repository.GetStates<FlowState>()) foreach (var flowStateRecord in await repository.GetStates<FlowState>())
{ {
flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true)); flowStates.TryAdd(flowStateRecord.FlowID, new CachedFlowState(flowStateRecord.FlowState, flowStateRecord.CreationTime, true));
foreach (var continuation in flowStateRecord.Value.Continuations) foreach (var continuation in flowStateRecord.FlowState.Continuations)
{ {
ValidateContinuation(flowStateRecord.Key, continuation.Key, continuation.Value); ValidateContinuation(flowStateRecord.FlowID, continuation.Key, continuation.Value);
continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key); continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.FlowID);
} }
} }
} }
@ -134,6 +136,18 @@ namespace Tapeti.Flow.Default
} }
/// <inheritdoc />
public Task<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge)
{
var maximumDateTime = DateTime.UtcNow - minimumAge;
return Task.FromResult(flowStates
.Where(p => p.Value.CreationTime <= maximumDateTime)
.Select(p => new ActiveFlow(p.Key, p.Value.CreationTime))
.ToArray() as IEnumerable<ActiveFlow>);
}
private class FlowStateLock : IFlowStateLock private class FlowStateLock : IFlowStateLock
{ {
private readonly FlowStore owner; private readonly FlowStore owner;
@ -190,7 +204,7 @@ namespace Tapeti.Flow.Default
var isNew = cachedFlowState == null; var isNew = cachedFlowState == null;
var wasPersistent = cachedFlowState?.IsPersistent ?? false; var wasPersistent = cachedFlowState?.IsPersistent ?? false;
cachedFlowState = new CachedFlowState(newFlowState, persistent); cachedFlowState = new CachedFlowState(newFlowState, isNew ? DateTime.UtcNow : cachedFlowState.CreationTime, persistent);
owner.flowStates[FlowID] = cachedFlowState; owner.flowStates[FlowID] = cachedFlowState;
if (persistent) if (persistent)
@ -198,8 +212,7 @@ namespace Tapeti.Flow.Default
// Storing the flowstate in the underlying repository // Storing the flowstate in the underlying repository
if (isNew) if (isNew)
{ {
var now = DateTime.UtcNow; await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, cachedFlowState.CreationTime);
await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, now);
} }
else else
{ {

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
@ -10,9 +11,9 @@ namespace Tapeti.Flow.Default
/// </summary> /// </summary>
public class NonPersistentFlowRepository : IFlowRepository public class NonPersistentFlowRepository : IFlowRepository
{ {
Task<List<KeyValuePair<Guid, T>>> IFlowRepository.GetStates<T>() Task<IEnumerable<FlowRecord<T>>> IFlowRepository.GetStates<T>()
{ {
return Task.FromResult(new List<KeyValuePair<Guid, T>>()); return Task.FromResult(Enumerable.Empty<FlowRecord<T>>());
} }
/// <inheritdoc /> /// <inheritdoc />

View File

@ -13,30 +13,64 @@ namespace Tapeti.Flow
/// Load the previously persisted flow states. /// Load the previously persisted flow states.
/// </summary> /// </summary>
/// <returns>A list of flow states, where the key is the unique Flow ID and the value is the deserialized T.</returns> /// <returns>A list of flow states, where the key is the unique Flow ID and the value is the deserialized T.</returns>
Task<List<KeyValuePair<Guid, T>>> GetStates<T>(); Task<IEnumerable<FlowRecord<T>>> GetStates<T>();
/// <summary> /// <summary>
/// Stores a new flow state. Guaranteed to be run in a lock for the specified flow ID. /// Stores a new flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary> /// </summary>
/// <param name="flowID"></param> /// <param name="flowID">The unique ID of the flow.</param>
/// <param name="state"></param> /// <param name="state">The flow state to be stored.</param>
/// <param name="timestamp"></param> /// <param name="timestamp">The time when the flow was initially created.</param>
/// <returns></returns> /// <returns></returns>
Task CreateState<T>(Guid flowID, T state, DateTime timestamp); Task CreateState<T>(Guid flowID, T state, DateTime timestamp);
/// <summary> /// <summary>
/// Updates an existing flow state. Guaranteed to be run in a lock for the specified flow ID. /// Updates an existing flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary> /// </summary>
/// <param name="flowID"></param> /// <param name="flowID">The unique ID of the flow.</param>
/// <param name="state"></param> /// <param name="state">The flow state to be stored.</param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Task UpdateState<T>(Guid flowID, T state); Task UpdateState<T>(Guid flowID, T state);
/// <summary> /// <summary>
/// Delete a flow state. Guaranteed to be run in a lock for the specified flow ID. /// Delete a flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary> /// </summary>
/// <param name="flowID"></param> /// <param name="flowID">The unique ID of the flow.</param>
Task DeleteState(Guid flowID); Task DeleteState(Guid flowID);
} }
/// <summary>
/// Contains information about a persisted flow state.
/// </summary>
public class FlowRecord<T>
{
/// <summary>
/// The unique ID of the flow.
/// </summary>
public Guid FlowID { get; }
/// <summary>
/// The time when the flow was initially created.
/// </summary>
public DateTime CreationTime { get; }
/// <summary>
/// The stored flow state.
/// </summary>
public T FlowState { get; }
/// <summary>
/// Creates a new instance of a FlowRecord.
/// </summary>
/// <param name="flowID"></param>
/// <param name="creationTime"></param>
/// <param name="flowState"></param>
public FlowRecord(Guid flowID, DateTime creationTime, T flowState)
{
FlowID = flowID;
CreationTime = creationTime;
FlowState = flowState;
}
}
} }

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Flow.Default; using Tapeti.Flow.Default;
@ -29,6 +30,15 @@ namespace Tapeti.Flow
/// </summary> /// </summary>
/// <param name="flowID"></param> /// <param name="flowID"></param>
Task<IFlowStateLock> LockFlowState(Guid flowID); Task<IFlowStateLock> LockFlowState(Guid flowID);
/// <summary>
/// Returns information about the currently active flows.
/// </summary>
/// <remarks>
/// This is intended for monitoring purposes and should be treated as a snapshot.
/// </remarks>
/// <param name="minimumAge">The minimum age of the flow before it is included in the result. Set to TimeSpan.Zero to return all active flows.</param>
Task<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge);
} }
@ -60,4 +70,33 @@ namespace Tapeti.Flow
/// </summary> /// </summary>
Task DeleteFlowState(); Task DeleteFlowState();
} }
/// <summary>
/// Contains information about an active flow, as returned by <see cref="IFlowStore.GetActiveFlows"/>.
/// </summary>
public class ActiveFlow
{
/// <summary>
/// The ID of the active flow.
/// </summary>
public Guid FlowID { get; }
/// <summary>
/// The time when the flow was initially created.
/// </summary>
public DateTime CreationTime { get; }
/// <summary>
/// Create a new instance of an ActiveFlow.
/// </summary>
/// <param name="flowID">The ID of the active flow.</param>
/// <param name="creationTime">The time when the flow was initially created.</param>
public ActiveFlow(Guid flowID, DateTime creationTime)
{
FlowID = flowID;
CreationTime = creationTime;
}
}
} }