From 66e9f901874a0a07a44fda81bafd43f9c2f6b226 Mon Sep 17 00:00:00 2001 From: Menno van Lavieren Date: Thu, 27 Jul 2017 15:55:37 +0200 Subject: [PATCH] IFlowRepository aangepast. abstracter en simpeler --- Tapeti.Flow.SQL/ConfigExtensions.cs | 2 +- .../SqlConnectionFlowRepository.cs | 67 +++++-------------- Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj | 7 ++ Tapeti.Flow.SQL/packages.config | 4 ++ Tapeti.Flow/Default/FlowStore.cs | 15 ++--- .../Default/NonPersistentFlowRepository.cs | 10 +-- Tapeti.Flow/FlowMiddleware.cs | 2 +- Tapeti.Flow/IFlowRepository.cs | 8 +-- 8 files changed, 47 insertions(+), 68 deletions(-) create mode 100644 Tapeti.Flow.SQL/packages.config diff --git a/Tapeti.Flow.SQL/ConfigExtensions.cs b/Tapeti.Flow.SQL/ConfigExtensions.cs index c5e660d..70b1aff 100644 --- a/Tapeti.Flow.SQL/ConfigExtensions.cs +++ b/Tapeti.Flow.SQL/ConfigExtensions.cs @@ -30,7 +30,7 @@ namespace Tapeti.Flow.SQL public void RegisterDefaults(IDependencyContainer container) { - container.RegisterDefault(() => new SqlConnectionFlowRepository(connectionString, serviceId, schema)); + container.RegisterDefault>(() => new SqlConnectionFlowRepository(connectionString, serviceId, schema)); } diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs index 15c351a..35e9078 100644 --- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs +++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs @@ -3,7 +3,9 @@ using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.Linq; +using System.Text; using System.Threading.Tasks; +using Newtonsoft.Json; namespace Tapeti.Flow.SQL { @@ -11,30 +13,18 @@ namespace Tapeti.Flow.SQL Assumes the following table layout (schema configurable): - create table dbo.Flow + create table shared.Flow ( FlowID uniqueidentifier not null, ServiceID int not null, CreationTime datetime2(3) not null, - Metadata nvarchar(max) null, Flowdata nvarchar(max) null, constraint PK_Flow primary key clustered (FlowID) ); - - create table dbo.FlowContinuation - ( - FlowID uniqueidentifier not null, - ContinuationID uniqueidentifier not null, - Metadata nvarchar(max) null, - - constraint PK_FlowContinuation primary key clustered (FlowID, ContinuationID) - ); go; - - alter table shared.FlowContinuation with check add constraint FK_FlowContinuation_Flow foreign key (FlowID) references shared.Flow (FlowID); */ - public class SqlConnectionFlowRepository : IFlowRepository + public class SqlConnectionFlowRepository : IFlowRepository { private readonly string connectionString; private readonly int serviceId; @@ -49,65 +39,44 @@ namespace Tapeti.Flow.SQL } - public async Task> GetStates() + public async Task>> GetStates() { - var result = new List(); - using (var connection = await GetConnection()) { - var flowQuery = new SqlCommand($"select FlowID, Metadata, Flowdata from {schema}.Flow " + + var flowQuery = new SqlCommand($"select FlowID, StateJson from {schema}.Flow " + "where ServiceID = @ServiceID " + "order by FlowID", connection); var flowServiceParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int); - var continuationQuery = new SqlCommand($"select FlowID, ContinuationID, Metadata from {schema}.FlowContinuation " + - "where ServiceID = @ServiceID " + - "order by FlowID", connection); - var continuationQueryParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int); - - flowServiceParam.Value = serviceId; - continuationQueryParam.Value = serviceId; var flowReader = await flowQuery.ExecuteReaderAsync(); - var continuationReader = await continuationQuery.ExecuteReaderAsync(); - var hasContinuation = await continuationReader.ReadAsync(); + + var result = new List>(); while (await flowReader.ReadAsync()) { - var flowStateRecord = new FlowStateRecord - { - FlowID = flowReader.GetGuid(0), - Metadata = flowReader.GetString(1), - Data = flowReader.GetString(2), - ContinuationMetadata = new Dictionary() - }; + var flowID = flowReader.GetGuid(0); + var stateJson = flowReader.GetString(1); - while (hasContinuation && continuationReader.GetGuid(0) == flowStateRecord.FlowID) - { - flowStateRecord.ContinuationMetadata.Add( - continuationReader.GetGuid(1), - continuationReader.GetString(2) - ); - - hasContinuation = await continuationReader.ReadAsync(); - } - - result.Add(flowStateRecord); + var state = JsonConvert.DeserializeObject(stateJson); + result.Add(new KeyValuePair(flowID, state)); } + + return result; } - return result.AsQueryable(); } - - public Task CreateState(FlowStateRecord stateRecord, DateTime timestamp) + public Task CreateState(Guid flowID, T state, DateTime timestamp) { + var stateJason = JsonConvert.SerializeObject(state); + throw new NotImplementedException(); } - public Task UpdateState(FlowStateRecord stateRecord) + public Task UpdateState(Guid flowID, T state) { throw new NotImplementedException(); } diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj index 8674a4d..8d54f56 100644 --- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj +++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj @@ -33,6 +33,10 @@ 4 + + ..\packages\Newtonsoft.Json.10.0.3\lib\net45\Newtonsoft.Json.dll + True + @@ -57,6 +61,9 @@ Tapeti + + +