diff --git a/Tapeti.Flow.SQL/ConfigExtensions.cs b/Tapeti.Flow.SQL/ConfigExtensions.cs index 9dd039b..2e2e247 100644 --- a/Tapeti.Flow.SQL/ConfigExtensions.cs +++ b/Tapeti.Flow.SQL/ConfigExtensions.cs @@ -7,9 +7,9 @@ namespace Tapeti.Flow.SQL { public static class ConfigExtensions { - public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config, string connectionString, int serviceId, string schema = "dbo") + public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config, string connectionString, string tableName = "Flow") { - config.Use(new FlowSqlRepositoryBundle(connectionString, serviceId, schema)); + config.Use(new FlowSqlRepositoryBundle(connectionString, tableName)); return config; } } @@ -18,21 +18,19 @@ namespace Tapeti.Flow.SQL internal class FlowSqlRepositoryBundle : ITapetiExtension { private readonly string connectionString; - private readonly string schema; - private readonly int serviceId; + private readonly string tableName; - public FlowSqlRepositoryBundle(string connectionString, int serviceId, string schema) + public FlowSqlRepositoryBundle(string connectionString, string tableName) { this.connectionString = connectionString; - this.serviceId = serviceId; - this.schema = schema; + this.tableName = tableName; } public void RegisterDefaults(IDependencyContainer container) { - container.RegisterDefault(() => new SqlConnectionFlowRepository(connectionString, serviceId, schema)); + container.RegisterDefaultSingleton(() => new SqlConnectionFlowRepository(connectionString, tableName)); } diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs index 4a8640c..d32b645 100644 --- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs +++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs @@ -8,32 +8,28 @@ using Newtonsoft.Json; namespace Tapeti.Flow.SQL { /* - Assumes the following table layout (schema configurable): + Assumes the following table layout (table name configurable and may include schema): - create table shared.Flow + create table Flow ( FlowID uniqueidentifier not null, - ServiceID int not null, CreationTime datetime2(3) not null, StateJson nvarchar(max) null, constraint PK_Flow primary key clustered (FlowID) ); - go; */ public class SqlConnectionFlowRepository : IFlowRepository { private readonly string connectionString; - private readonly int serviceId; - private readonly string schema; + private readonly string tableName; - public SqlConnectionFlowRepository(string connectionString, int serviceId, string schema) + public SqlConnectionFlowRepository(string connectionString, string tableName = "Flow") { this.connectionString = connectionString; - this.serviceId = serviceId; - this.schema = schema; + this.tableName = tableName; } @@ -41,14 +37,7 @@ namespace Tapeti.Flow.SQL { using (var connection = await GetConnection()) { - var flowQuery = new SqlCommand($"select FlowID, StateJson from {schema}.Flow " + - "where ServiceID = @ServiceID ", - connection); - var flowServiceParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int); - - flowServiceParam.Value = serviceId; - - + var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection); var flowReader = await flowQuery.ExecuteReaderAsync(); var result = new List>(); @@ -67,21 +56,53 @@ namespace Tapeti.Flow.SQL } - public Task CreateState(Guid flowID, T state, DateTime timestamp) + public async Task CreateState(Guid flowID, T state, DateTime timestamp) { - //var stateJson = JsonConvert.SerializeObject(state); + using (var connection = await GetConnection()) + { + var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" + + "values (@FlowID, @StateJson, @CreationTime)", + connection); - throw new NotImplementedException(); + var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); + var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar); + var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2); + + flowIDParam.Value = flowID; + stateJsonParam.Value = JsonConvert.SerializeObject(state); + creationTimeParam.Value = timestamp; + + await query.ExecuteNonQueryAsync(); + } } - public Task UpdateState(Guid flowID, T state) + public async Task UpdateState(Guid flowID, T state) { - throw new NotImplementedException(); + using (var connection = await GetConnection()) + { + var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection); + + var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); + var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar); + + flowIDParam.Value = flowID; + stateJsonParam.Value = JsonConvert.SerializeObject(state); + + await query.ExecuteNonQueryAsync(); + } } - public Task DeleteState(Guid flowID) + public async Task DeleteState(Guid flowID) { - throw new NotImplementedException(); + using (var connection = await GetConnection()) + { + var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection); + + var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); + flowIDParam.Value = flowID; + + await query.ExecuteNonQueryAsync(); + } } diff --git a/Test/Program.cs b/Test/Program.cs index caad2c9..be6eed9 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -5,6 +5,7 @@ using Tapeti.DataAnnotations; using Tapeti.Flow; using Tapeti.SimpleInjector; using System.Threading; +using Tapeti.Flow.SQL; namespace Test { @@ -12,10 +13,7 @@ namespace Test { private static void Main() { - // TODO SQL based flow store // TODO logging - // TODO uitzoeken of we consumers kunnen pauzeren (denk: SQL down) --> nee, EFDBContext Get Async maken en retryen? kan dat, of timeout dan Rabbit? - var container = new Container(); container.Register(); @@ -23,6 +21,7 @@ namespace Test container.Register(); var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) + //.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true") .WithFlow() .WithDataAnnotations() .RegisterAllControllers()