Implemented SQL flow repository

This commit is contained in:
Mark van Renswoude 2018-12-19 21:41:19 +01:00
parent 2042cc074f
commit 525e83bde9
3 changed files with 53 additions and 35 deletions

View File

@ -7,9 +7,9 @@ namespace Tapeti.Flow.SQL
{
public static class ConfigExtensions
{
public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config, string connectionString, int serviceId, string schema = "dbo")
public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config, string connectionString, string tableName = "Flow")
{
config.Use(new FlowSqlRepositoryBundle(connectionString, serviceId, schema));
config.Use(new FlowSqlRepositoryBundle(connectionString, tableName));
return config;
}
}
@ -18,21 +18,19 @@ namespace Tapeti.Flow.SQL
internal class FlowSqlRepositoryBundle : ITapetiExtension
{
private readonly string connectionString;
private readonly string schema;
private readonly int serviceId;
private readonly string tableName;
public FlowSqlRepositoryBundle(string connectionString, int serviceId, string schema)
public FlowSqlRepositoryBundle(string connectionString, string tableName)
{
this.connectionString = connectionString;
this.serviceId = serviceId;
this.schema = schema;
this.tableName = tableName;
}
public void RegisterDefaults(IDependencyContainer container)
{
container.RegisterDefault<IFlowRepository>(() => new SqlConnectionFlowRepository(connectionString, serviceId, schema));
container.RegisterDefaultSingleton<IFlowRepository>(() => new SqlConnectionFlowRepository(connectionString, tableName));
}

View File

@ -8,32 +8,28 @@ using Newtonsoft.Json;
namespace Tapeti.Flow.SQL
{
/*
Assumes the following table layout (schema configurable):
Assumes the following table layout (table name configurable and may include schema):
create table shared.Flow
create table Flow
(
FlowID uniqueidentifier not null,
ServiceID int not null,
CreationTime datetime2(3) not null,
StateJson nvarchar(max) null,
constraint PK_Flow primary key clustered (FlowID)
);
go;
*/
public class SqlConnectionFlowRepository : IFlowRepository
{
private readonly string connectionString;
private readonly int serviceId;
private readonly string schema;
private readonly string tableName;
public SqlConnectionFlowRepository(string connectionString, int serviceId, string schema)
public SqlConnectionFlowRepository(string connectionString, string tableName = "Flow")
{
this.connectionString = connectionString;
this.serviceId = serviceId;
this.schema = schema;
this.tableName = tableName;
}
@ -41,14 +37,7 @@ namespace Tapeti.Flow.SQL
{
using (var connection = await GetConnection())
{
var flowQuery = new SqlCommand($"select FlowID, StateJson from {schema}.Flow " +
"where ServiceID = @ServiceID ",
connection);
var flowServiceParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int);
flowServiceParam.Value = serviceId;
var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection);
var flowReader = await flowQuery.ExecuteReaderAsync();
var result = new List<KeyValuePair<Guid, T>>();
@ -67,21 +56,53 @@ namespace Tapeti.Flow.SQL
}
public Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
public async Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
{
//var stateJson = JsonConvert.SerializeObject(state);
using (var connection = await GetConnection())
{
var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" +
"values (@FlowID, @StateJson, @CreationTime)",
connection);
throw new NotImplementedException();
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2);
flowIDParam.Value = flowID;
stateJsonParam.Value = JsonConvert.SerializeObject(state);
creationTimeParam.Value = timestamp;
await query.ExecuteNonQueryAsync();
}
}
public Task UpdateState<T>(Guid flowID, T state)
public async Task UpdateState<T>(Guid flowID, T state)
{
throw new NotImplementedException();
using (var connection = await GetConnection())
{
var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection);
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
flowIDParam.Value = flowID;
stateJsonParam.Value = JsonConvert.SerializeObject(state);
await query.ExecuteNonQueryAsync();
}
}
public Task DeleteState(Guid flowID)
public async Task DeleteState(Guid flowID)
{
throw new NotImplementedException();
using (var connection = await GetConnection())
{
var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection);
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
flowIDParam.Value = flowID;
await query.ExecuteNonQueryAsync();
}
}

View File

@ -5,6 +5,7 @@ using Tapeti.DataAnnotations;
using Tapeti.Flow;
using Tapeti.SimpleInjector;
using System.Threading;
using Tapeti.Flow.SQL;
namespace Test
{
@ -12,10 +13,7 @@ namespace Test
{
private static void Main()
{
// TODO SQL based flow store
// TODO logging
// TODO uitzoeken of we consumers kunnen pauzeren (denk: SQL down) --> nee, EFDBContext Get Async maken en retryen? kan dat, of timeout dan Rabbit?
var container = new Container();
container.Register<MarcoEmitter>();
@ -23,6 +21,7 @@ namespace Test
container.Register<ILogger, Tapeti.Default.ConsoleLogger>();
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
//.WithFlowSqlRepository("Server=localhost;Database=TapetiTest;Integrated Security=true")
.WithFlow()
.WithDataAnnotations()
.RegisterAllControllers()