1
0
mirror of synced 2024-12-23 01:33:07 +01:00
Tapeti/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs

137 lines
5.0 KiB
C#
Raw Normal View History

using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
using Newtonsoft.Json;
// Neither of these are available in language version 7 required for .NET Standard 2.0
// ReSharper disable ConvertToUsingDeclaration
// ReSharper disable UseAwaitUsing
namespace Tapeti.Flow.SQL
{
/// <summary>
/// IFlowRepository implementation for SQL server.
/// </summary>
/// <remarks>
/// Assumes the following table layout (table name configurable and may include schema):
/// create table Flow
/// (
/// FlowID uniqueidentifier not null,
/// CreationTime datetime2(3) not null,
/// StateJson nvarchar(max) null,
/// constraint PK_Flow primary key clustered(FlowID)
/// );
/// </remarks>
2017-08-14 13:58:01 +02:00
public class SqlConnectionFlowRepository : IFlowRepository
{
private readonly string connectionString;
2018-12-19 21:41:19 +01:00
private readonly string tableName;
/// <summary>
/// </summary>
2018-12-19 21:41:19 +01:00
public SqlConnectionFlowRepository(string connectionString, string tableName = "Flow")
{
this.connectionString = connectionString;
2018-12-19 21:41:19 +01:00
this.tableName = tableName;
}
/// <inheritdoc />
public async ValueTask<IEnumerable<FlowRecord<T>>> GetStates<T>()
{
2019-10-10 16:26:13 +02:00
return await SqlRetryHelper.Execute(async () =>
{
using var connection = await GetConnection().ConfigureAwait(false);
var flowQuery = new SqlCommand($"select FlowID, CreationTime, StateJson from {tableName}", connection);
var flowReader = await flowQuery.ExecuteReaderAsync().ConfigureAwait(false);
var result = new List<FlowRecord<T>>();
while (await flowReader.ReadAsync().ConfigureAwait(false))
{
var flowID = flowReader.GetGuid(0);
var creationTime = flowReader.GetDateTime(1);
var stateJson = flowReader.GetString(2);
2019-10-10 16:26:13 +02:00
var state = JsonConvert.DeserializeObject<T>(stateJson);
if (state != null)
result.Add(new FlowRecord<T>(flowID, creationTime, state));
2019-10-10 16:26:13 +02:00
}
return result;
}).ConfigureAwait(false);
}
/// <inheritdoc />
public async ValueTask CreateState<T>(Guid flowID, T state, DateTime timestamp)
{
2019-10-10 16:26:13 +02:00
await SqlRetryHelper.Execute(async () =>
2018-12-19 21:41:19 +01:00
{
using var connection = await GetConnection().ConfigureAwait(false);
var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" +
"values (@FlowID, @StateJson, @CreationTime)",
connection);
2018-12-19 21:41:19 +01:00
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2);
2018-12-19 21:41:19 +01:00
flowIDParam.Value = flowID;
stateJsonParam.Value = JsonConvert.SerializeObject(state);
creationTimeParam.Value = timestamp;
await query.ExecuteNonQueryAsync().ConfigureAwait(false);
}).ConfigureAwait(false);
}
/// <inheritdoc />
public async ValueTask UpdateState<T>(Guid flowID, T state)
{
2019-10-10 16:26:13 +02:00
await SqlRetryHelper.Execute(async () =>
2018-12-19 21:41:19 +01:00
{
using var connection = await GetConnection().ConfigureAwait(false);
var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection);
2018-12-19 21:41:19 +01:00
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
2018-12-19 21:41:19 +01:00
flowIDParam.Value = flowID;
stateJsonParam.Value = JsonConvert.SerializeObject(state);
2018-12-19 21:41:19 +01:00
await query.ExecuteNonQueryAsync().ConfigureAwait(false);
}).ConfigureAwait(false);
}
/// <inheritdoc />
public async ValueTask DeleteState(Guid flowID)
{
2019-10-10 16:26:13 +02:00
await SqlRetryHelper.Execute(async () =>
2018-12-19 21:41:19 +01:00
{
using var connection = await GetConnection().ConfigureAwait(false);
2018-12-19 21:41:19 +01:00
var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection);
2018-12-19 21:41:19 +01:00
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
flowIDParam.Value = flowID;
await query.ExecuteNonQueryAsync().ConfigureAwait(false);
}).ConfigureAwait(false);
}
private async Task<SqlConnection> GetConnection()
{
var connection = new SqlConnection(connectionString);
await connection.OpenAsync().ConfigureAwait(false);
return connection;
}
}
}