diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs
index 19f7a8b..3da64b3 100644
--- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs
+++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs
@@ -37,24 +37,25 @@ namespace Tapeti.Flow.SQL
///
- public async Task>> GetStates()
+ public async Task>> GetStates()
{
return await SqlRetryHelper.Execute(async () =>
{
using (var connection = await GetConnection())
{
- var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection);
+ var flowQuery = new SqlCommand($"select FlowID, CreationTime, StateJson from {tableName}", connection);
var flowReader = await flowQuery.ExecuteReaderAsync();
- var result = new List>();
+ var result = new List>();
while (await flowReader.ReadAsync())
{
var flowID = flowReader.GetGuid(0);
- var stateJson = flowReader.GetString(1);
+ var creationTime = flowReader.GetDateTime(1);
+ var stateJson = flowReader.GetString(2);
var state = JsonConvert.DeserializeObject(stateJson);
- result.Add(new KeyValuePair(flowID, state));
+ result.Add(new FlowRecord(flowID, creationTime, state));
}
return result;
diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs
index 8b0aa96..044e9d0 100644
--- a/Tapeti.Flow/Default/FlowStore.cs
+++ b/Tapeti.Flow/Default/FlowStore.cs
@@ -18,11 +18,13 @@ namespace Tapeti.Flow.Default
private class CachedFlowState
{
public readonly FlowState FlowState;
+ public readonly DateTime CreationTime;
public readonly bool IsPersistent;
- public CachedFlowState(FlowState flowState, bool isPersistent)
+ public CachedFlowState(FlowState flowState, DateTime creationTime, bool isPersistent)
{
FlowState = flowState;
+ CreationTime = creationTime;
IsPersistent = isPersistent;
}
}
@@ -64,12 +66,12 @@ namespace Tapeti.Flow.Default
{
foreach (var flowStateRecord in await repository.GetStates())
{
- flowStates.TryAdd(flowStateRecord.Key, new CachedFlowState(flowStateRecord.Value, true));
+ flowStates.TryAdd(flowStateRecord.FlowID, new CachedFlowState(flowStateRecord.FlowState, flowStateRecord.CreationTime, true));
- foreach (var continuation in flowStateRecord.Value.Continuations)
+ foreach (var continuation in flowStateRecord.FlowState.Continuations)
{
- ValidateContinuation(flowStateRecord.Key, continuation.Key, continuation.Value);
- continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key);
+ ValidateContinuation(flowStateRecord.FlowID, continuation.Key, continuation.Value);
+ continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.FlowID);
}
}
}
@@ -134,6 +136,18 @@ namespace Tapeti.Flow.Default
}
+ ///
+ public Task> GetActiveFlows(TimeSpan minimumAge)
+ {
+ var maximumDateTime = DateTime.UtcNow - minimumAge;
+
+ return Task.FromResult(flowStates
+ .Where(p => p.Value.CreationTime <= maximumDateTime)
+ .Select(p => new ActiveFlow(p.Key, p.Value.CreationTime))
+ .ToArray() as IEnumerable);
+ }
+
+
private class FlowStateLock : IFlowStateLock
{
private readonly FlowStore owner;
@@ -190,7 +204,7 @@ namespace Tapeti.Flow.Default
var isNew = cachedFlowState == null;
var wasPersistent = cachedFlowState?.IsPersistent ?? false;
- cachedFlowState = new CachedFlowState(newFlowState, persistent);
+ cachedFlowState = new CachedFlowState(newFlowState, isNew ? DateTime.UtcNow : cachedFlowState.CreationTime, persistent);
owner.flowStates[FlowID] = cachedFlowState;
if (persistent)
@@ -198,8 +212,7 @@ namespace Tapeti.Flow.Default
// Storing the flowstate in the underlying repository
if (isNew)
{
- var now = DateTime.UtcNow;
- await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, now);
+ await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, cachedFlowState.CreationTime);
}
else
{
diff --git a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs
index b20bfd6..b1aa283 100644
--- a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs
+++ b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Threading.Tasks;
namespace Tapeti.Flow.Default
@@ -10,9 +11,9 @@ namespace Tapeti.Flow.Default
///
public class NonPersistentFlowRepository : IFlowRepository
{
- Task>> IFlowRepository.GetStates()
+ Task>> IFlowRepository.GetStates()
{
- return Task.FromResult(new List>());
+ return Task.FromResult(Enumerable.Empty>());
}
///
diff --git a/Tapeti.Flow/IFlowRepository.cs b/Tapeti.Flow/IFlowRepository.cs
index f147684..cde801c 100644
--- a/Tapeti.Flow/IFlowRepository.cs
+++ b/Tapeti.Flow/IFlowRepository.cs
@@ -13,30 +13,64 @@ namespace Tapeti.Flow
/// Load the previously persisted flow states.
///
/// A list of flow states, where the key is the unique Flow ID and the value is the deserialized T.
- Task>> GetStates();
+ Task>> GetStates();
///
/// Stores a new flow state. Guaranteed to be run in a lock for the specified flow ID.
///
- ///
- ///
- ///
+ /// The unique ID of the flow.
+ /// The flow state to be stored.
+ /// The time when the flow was initially created.
///
Task CreateState(Guid flowID, T state, DateTime timestamp);
///
/// Updates an existing flow state. Guaranteed to be run in a lock for the specified flow ID.
///
- ///
- ///
- ///
- ///
+ /// The unique ID of the flow.
+ /// The flow state to be stored.
Task UpdateState(Guid flowID, T state);
///
/// Delete a flow state. Guaranteed to be run in a lock for the specified flow ID.
///
- ///
+ /// The unique ID of the flow.
Task DeleteState(Guid flowID);
}
+
+
+ ///
+ /// Contains information about a persisted flow state.
+ ///
+ public class FlowRecord
+ {
+ ///
+ /// The unique ID of the flow.
+ ///
+ public Guid FlowID { get; }
+
+ ///
+ /// The time when the flow was initially created.
+ ///
+ public DateTime CreationTime { get; }
+
+ ///
+ /// The stored flow state.
+ ///
+ public T FlowState { get; }
+
+
+ ///
+ /// Creates a new instance of a FlowRecord.
+ ///
+ ///
+ ///
+ ///
+ public FlowRecord(Guid flowID, DateTime creationTime, T flowState)
+ {
+ FlowID = flowID;
+ CreationTime = creationTime;
+ FlowState = flowState;
+ }
+ }
}
diff --git a/Tapeti.Flow/IFlowStore.cs b/Tapeti.Flow/IFlowStore.cs
index 21c4337..b3720b3 100644
--- a/Tapeti.Flow/IFlowStore.cs
+++ b/Tapeti.Flow/IFlowStore.cs
@@ -1,4 +1,5 @@
using System;
+using System.Collections.Generic;
using System.Threading.Tasks;
using Tapeti.Flow.Default;
@@ -29,6 +30,15 @@ namespace Tapeti.Flow
///
///
Task LockFlowState(Guid flowID);
+
+ ///
+ /// Returns information about the currently active flows.
+ ///
+ ///
+ /// This is intended for monitoring purposes and should be treated as a snapshot.
+ ///
+ /// The minimum age of the flow before it is included in the result. Set to TimeSpan.Zero to return all active flows.
+ Task> GetActiveFlows(TimeSpan minimumAge);
}
@@ -60,4 +70,33 @@ namespace Tapeti.Flow
///
Task DeleteFlowState();
}
+
+
+ ///
+ /// Contains information about an active flow, as returned by .
+ ///
+ public class ActiveFlow
+ {
+ ///
+ /// The ID of the active flow.
+ ///
+ public Guid FlowID { get; }
+
+ ///
+ /// The time when the flow was initially created.
+ ///
+ public DateTime CreationTime { get; }
+
+
+ ///
+ /// Create a new instance of an ActiveFlow.
+ ///
+ /// The ID of the active flow.
+ /// The time when the flow was initially created.
+ public ActiveFlow(Guid flowID, DateTime creationTime)
+ {
+ FlowID = flowID;
+ CreationTime = creationTime;
+ }
+ }
}