1
0
mirror of synced 2024-11-16 06:43:51 +00:00

IFlowRepository aangepast. abstracter en simpeler

This commit is contained in:
Menno van Lavieren 2017-07-27 15:55:37 +02:00
parent 580a4716c0
commit 66e9f90187
8 changed files with 47 additions and 68 deletions

View File

@ -30,7 +30,7 @@ namespace Tapeti.Flow.SQL
public void RegisterDefaults(IDependencyContainer container) public void RegisterDefaults(IDependencyContainer container)
{ {
container.RegisterDefault<IFlowRepository>(() => new SqlConnectionFlowRepository(connectionString, serviceId, schema)); container.RegisterDefault<IFlowRepository<Default.FlowState>>(() => new SqlConnectionFlowRepository<Default.FlowState>(connectionString, serviceId, schema));
} }

View File

@ -3,7 +3,9 @@ using System.Collections.Generic;
using System.Data; using System.Data;
using System.Data.SqlClient; using System.Data.SqlClient;
using System.Linq; using System.Linq;
using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Newtonsoft.Json;
namespace Tapeti.Flow.SQL namespace Tapeti.Flow.SQL
{ {
@ -11,30 +13,18 @@ namespace Tapeti.Flow.SQL
Assumes the following table layout (schema configurable): Assumes the following table layout (schema configurable):
create table dbo.Flow create table shared.Flow
( (
FlowID uniqueidentifier not null, FlowID uniqueidentifier not null,
ServiceID int not null, ServiceID int not null,
CreationTime datetime2(3) not null, CreationTime datetime2(3) not null,
Metadata nvarchar(max) null,
Flowdata nvarchar(max) null, Flowdata nvarchar(max) null,
constraint PK_Flow primary key clustered (FlowID) constraint PK_Flow primary key clustered (FlowID)
); );
create table dbo.FlowContinuation
(
FlowID uniqueidentifier not null,
ContinuationID uniqueidentifier not null,
Metadata nvarchar(max) null,
constraint PK_FlowContinuation primary key clustered (FlowID, ContinuationID)
);
go; go;
alter table shared.FlowContinuation with check add constraint FK_FlowContinuation_Flow foreign key (FlowID) references shared.Flow (FlowID);
*/ */
public class SqlConnectionFlowRepository : IFlowRepository public class SqlConnectionFlowRepository<T> : IFlowRepository<T>
{ {
private readonly string connectionString; private readonly string connectionString;
private readonly int serviceId; private readonly int serviceId;
@ -49,65 +39,44 @@ namespace Tapeti.Flow.SQL
} }
public async Task<IQueryable<FlowStateRecord>> GetStates() public async Task<List<KeyValuePair<Guid, T>>> GetStates()
{ {
var result = new List<FlowStateRecord>();
using (var connection = await GetConnection()) using (var connection = await GetConnection())
{ {
var flowQuery = new SqlCommand($"select FlowID, Metadata, Flowdata from {schema}.Flow " + var flowQuery = new SqlCommand($"select FlowID, StateJson from {schema}.Flow " +
"where ServiceID = @ServiceID " + "where ServiceID = @ServiceID " +
"order by FlowID", connection); "order by FlowID", connection);
var flowServiceParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int); var flowServiceParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int);
var continuationQuery = new SqlCommand($"select FlowID, ContinuationID, Metadata from {schema}.FlowContinuation " +
"where ServiceID = @ServiceID " +
"order by FlowID", connection);
var continuationQueryParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int);
flowServiceParam.Value = serviceId; flowServiceParam.Value = serviceId;
continuationQueryParam.Value = serviceId;
var flowReader = await flowQuery.ExecuteReaderAsync(); var flowReader = await flowQuery.ExecuteReaderAsync();
var continuationReader = await continuationQuery.ExecuteReaderAsync();
var hasContinuation = await continuationReader.ReadAsync(); var result = new List<KeyValuePair<Guid, T>>();
while (await flowReader.ReadAsync()) while (await flowReader.ReadAsync())
{ {
var flowStateRecord = new FlowStateRecord var flowID = flowReader.GetGuid(0);
var stateJson = flowReader.GetString(1);
var state = JsonConvert.DeserializeObject<T>(stateJson);
result.Add(new KeyValuePair<Guid, T>(flowID, state));
}
return result;
}
}
public Task CreateState(Guid flowID, T state, DateTime timestamp)
{ {
FlowID = flowReader.GetGuid(0), var stateJason = JsonConvert.SerializeObject(state);
Metadata = flowReader.GetString(1),
Data = flowReader.GetString(2),
ContinuationMetadata = new Dictionary<Guid, string>()
};
while (hasContinuation && continuationReader.GetGuid(0) == flowStateRecord.FlowID)
{
flowStateRecord.ContinuationMetadata.Add(
continuationReader.GetGuid(1),
continuationReader.GetString(2)
);
hasContinuation = await continuationReader.ReadAsync();
}
result.Add(flowStateRecord);
}
}
return result.AsQueryable();
}
public Task CreateState(FlowStateRecord stateRecord, DateTime timestamp)
{
throw new NotImplementedException(); throw new NotImplementedException();
} }
public Task UpdateState(FlowStateRecord stateRecord) public Task UpdateState(Guid flowID, T state)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }

View File

@ -33,6 +33,10 @@
<WarningLevel>4</WarningLevel> <WarningLevel>4</WarningLevel>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<Reference Include="Newtonsoft.Json, Version=10.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\packages\Newtonsoft.Json.10.0.3\lib\net45\Newtonsoft.Json.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" /> <Reference Include="System" />
<Reference Include="System.Core" /> <Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" /> <Reference Include="System.Xml.Linq" />
@ -57,6 +61,9 @@
<Name>Tapeti</Name> <Name>Tapeti</Name>
</ProjectReference> </ProjectReference>
</ItemGroup> </ItemGroup>
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it. <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets. Other similar extension points exist, see Microsoft.Common.targets.

View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Newtonsoft.Json" version="10.0.3" targetFramework="net461" />
</packages>

View File

@ -13,10 +13,10 @@ namespace Tapeti.Flow.Default
private static readonly ConcurrentDictionary<Guid, FlowState> FlowStates = new ConcurrentDictionary<Guid, FlowState>(); private static readonly ConcurrentDictionary<Guid, FlowState> FlowStates = new ConcurrentDictionary<Guid, FlowState>();
private static readonly ConcurrentDictionary<Guid, Guid> ContinuationLookup = new ConcurrentDictionary<Guid, Guid>(); private static readonly ConcurrentDictionary<Guid, Guid> ContinuationLookup = new ConcurrentDictionary<Guid, Guid>();
private readonly IFlowRepository repository; private readonly IFlowRepository<FlowState> repository;
public FlowStore(IFlowRepository repository) public FlowStore(IFlowRepository<FlowState> repository)
{ {
this.repository = repository; this.repository = repository;
} }
@ -29,11 +29,10 @@ namespace Tapeti.Flow.Default
foreach (var flowStateRecord in await repository.GetStates()) foreach (var flowStateRecord in await repository.GetStates())
{ {
var flowState = ToFlowState(flowStateRecord); FlowStates.TryAdd(flowStateRecord.Key, flowStateRecord.Value);
FlowStates.GetOrAdd(flowStateRecord.FlowID, flowState);
foreach (var continuation in flowStateRecord.ContinuationMetadata) foreach (var continuation in flowStateRecord.Value.Continuations)
ContinuationLookup.GetOrAdd(continuation.Key, flowStateRecord.FlowID); ContinuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key);
} }
} }
@ -139,11 +138,11 @@ namespace Tapeti.Flow.Default
{ {
isNew = false; isNew = false;
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
await owner.repository.CreateState(ToFlowStateRecord(flowID, flowState), now); await owner.repository.CreateState(flowID, flowState, now);
} }
else else
{ {
await owner.repository.UpdateState(ToFlowStateRecord(flowID, flowState)); await owner.repository.UpdateState(flowID, flowState);
} }
} }

View File

@ -5,19 +5,19 @@ using System.Threading.Tasks;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
{ {
public class NonPersistentFlowRepository : IFlowRepository public class NonPersistentFlowRepository<T> : IFlowRepository<T>
{ {
public Task<IQueryable<FlowStateRecord>> GetStates() Task<List<KeyValuePair<Guid, T>>> IFlowRepository<T>.GetStates()
{ {
return Task.FromResult(new List<FlowStateRecord>().AsQueryable()); return Task.FromResult(new List<KeyValuePair<Guid, T>>());
} }
public Task CreateState(FlowStateRecord stateRecord, DateTime timestamp) public Task CreateState(Guid flowID, T state, DateTime timestamp)
{ {
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task UpdateState(FlowStateRecord stateRecord) public Task UpdateState(Guid flowID, T state)
{ {
return Task.CompletedTask; return Task.CompletedTask;
} }

View File

@ -11,7 +11,7 @@ namespace Tapeti.Flow
container.RegisterDefault<IFlowProvider, FlowProvider>(); container.RegisterDefault<IFlowProvider, FlowProvider>();
container.RegisterDefault<IFlowStarter, FlowStarter>(); container.RegisterDefault<IFlowStarter, FlowStarter>();
container.RegisterDefault<IFlowHandler, FlowProvider>(); container.RegisterDefault<IFlowHandler, FlowProvider>();
container.RegisterDefault<IFlowRepository, NonPersistentFlowRepository>(); container.RegisterDefault<IFlowRepository<FlowState>, NonPersistentFlowRepository<FlowState>>();
container.RegisterDefault<IFlowStore, FlowStore>(); container.RegisterDefault<IFlowStore, FlowStore>();
} }

View File

@ -5,11 +5,11 @@ using System.Threading.Tasks;
namespace Tapeti.Flow namespace Tapeti.Flow
{ {
public interface IFlowRepository public interface IFlowRepository<T>
{ {
Task<IQueryable<FlowStateRecord>> GetStates(); Task<List<KeyValuePair<Guid, T>>> GetStates();
Task CreateState(FlowStateRecord stateRecord, DateTime timestamp); Task CreateState(Guid flowID, T state, DateTime timestamp);
Task UpdateState(FlowStateRecord stateRecord); Task UpdateState(Guid flowID, T state);
Task DeleteState(Guid flowID); Task DeleteState(Guid flowID);
} }