Merge branch 'hotfix/1.3.4'
This commit is contained in:
commit
dd6d6bd95a
@ -35,74 +35,85 @@ namespace Tapeti.Flow.SQL
|
|||||||
|
|
||||||
public async Task<List<KeyValuePair<Guid, T>>> GetStates<T>()
|
public async Task<List<KeyValuePair<Guid, T>>> GetStates<T>()
|
||||||
{
|
{
|
||||||
using (var connection = await GetConnection())
|
return await SqlRetryHelper.Execute(async () =>
|
||||||
{
|
{
|
||||||
var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection);
|
using (var connection = await GetConnection())
|
||||||
var flowReader = await flowQuery.ExecuteReaderAsync();
|
|
||||||
|
|
||||||
var result = new List<KeyValuePair<Guid, T>>();
|
|
||||||
|
|
||||||
while (await flowReader.ReadAsync())
|
|
||||||
{
|
{
|
||||||
var flowID = flowReader.GetGuid(0);
|
var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection);
|
||||||
var stateJson = flowReader.GetString(1);
|
var flowReader = await flowQuery.ExecuteReaderAsync();
|
||||||
|
|
||||||
var state = JsonConvert.DeserializeObject<T>(stateJson);
|
var result = new List<KeyValuePair<Guid, T>>();
|
||||||
result.Add(new KeyValuePair<Guid, T>(flowID, state));
|
|
||||||
|
while (await flowReader.ReadAsync())
|
||||||
|
{
|
||||||
|
var flowID = flowReader.GetGuid(0);
|
||||||
|
var stateJson = flowReader.GetString(1);
|
||||||
|
|
||||||
|
var state = JsonConvert.DeserializeObject<T>(stateJson);
|
||||||
|
result.Add(new KeyValuePair<Guid, T>(flowID, state));
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
});
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
|
public async Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
|
||||||
{
|
{
|
||||||
using (var connection = await GetConnection())
|
await SqlRetryHelper.Execute(async () =>
|
||||||
{
|
{
|
||||||
var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" +
|
using (var connection = await GetConnection())
|
||||||
"values (@FlowID, @StateJson, @CreationTime)",
|
{
|
||||||
connection);
|
var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" +
|
||||||
|
"values (@FlowID, @StateJson, @CreationTime)",
|
||||||
|
connection);
|
||||||
|
|
||||||
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
|
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
|
||||||
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
|
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
|
||||||
var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2);
|
var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2);
|
||||||
|
|
||||||
flowIDParam.Value = flowID;
|
flowIDParam.Value = flowID;
|
||||||
stateJsonParam.Value = JsonConvert.SerializeObject(state);
|
stateJsonParam.Value = JsonConvert.SerializeObject(state);
|
||||||
creationTimeParam.Value = timestamp;
|
creationTimeParam.Value = timestamp;
|
||||||
|
|
||||||
await query.ExecuteNonQueryAsync();
|
await query.ExecuteNonQueryAsync();
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task UpdateState<T>(Guid flowID, T state)
|
public async Task UpdateState<T>(Guid flowID, T state)
|
||||||
{
|
{
|
||||||
using (var connection = await GetConnection())
|
await SqlRetryHelper.Execute(async () =>
|
||||||
{
|
{
|
||||||
var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection);
|
using (var connection = await GetConnection())
|
||||||
|
{
|
||||||
|
var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection);
|
||||||
|
|
||||||
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
|
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
|
||||||
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
|
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
|
||||||
|
|
||||||
flowIDParam.Value = flowID;
|
flowIDParam.Value = flowID;
|
||||||
stateJsonParam.Value = JsonConvert.SerializeObject(state);
|
stateJsonParam.Value = JsonConvert.SerializeObject(state);
|
||||||
|
|
||||||
await query.ExecuteNonQueryAsync();
|
await query.ExecuteNonQueryAsync();
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task DeleteState(Guid flowID)
|
public async Task DeleteState(Guid flowID)
|
||||||
{
|
{
|
||||||
using (var connection = await GetConnection())
|
await SqlRetryHelper.Execute(async () =>
|
||||||
{
|
{
|
||||||
var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection);
|
using (var connection = await GetConnection())
|
||||||
|
{
|
||||||
|
var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection);
|
||||||
|
|
||||||
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
|
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
|
||||||
flowIDParam.Value = flowID;
|
flowIDParam.Value = flowID;
|
||||||
|
|
||||||
await query.ExecuteNonQueryAsync();
|
await query.ExecuteNonQueryAsync();
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
199
Tapeti.Flow.SQL/SqlExceptionHelper.cs
Normal file
199
Tapeti.Flow.SQL/SqlExceptionHelper.cs
Normal file
@ -0,0 +1,199 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Data.SqlClient;
|
||||||
|
using System.Text;
|
||||||
|
|
||||||
|
namespace Tapeti.Flow.SQL
|
||||||
|
{
|
||||||
|
internal static class SqlExceptionHelper
|
||||||
|
{
|
||||||
|
// 2601: Cannot insert duplicate key row in object '%.*ls' with unique index '%.*ls'. The duplicate key value is %ls.
|
||||||
|
// 2627: Violation of %ls constraint '%.*ls'. Cannot insert duplicate key in object '%.*ls'. The duplicate key value is %ls.
|
||||||
|
public static bool IsDuplicateKey(SqlException e)
|
||||||
|
{
|
||||||
|
return e != null && (e.Number == 2601 || e.Number == 2627);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static bool IsTransientError(Exception e)
|
||||||
|
{
|
||||||
|
switch (e)
|
||||||
|
{
|
||||||
|
case TimeoutException _:
|
||||||
|
return true;
|
||||||
|
|
||||||
|
case Exception exception:
|
||||||
|
{
|
||||||
|
var sqlExceptions = ExtractSqlExceptions(e);
|
||||||
|
foreach (var sqlException in sqlExceptions)
|
||||||
|
{
|
||||||
|
var sqlErrors = UnwrapSqlErrors(sqlException);
|
||||||
|
|
||||||
|
if (IsRecoverableSQLError(sqlErrors))
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Extracts alls SqlExceptions from the main and inner or aggregate exceptions
|
||||||
|
/// </summary>
|
||||||
|
public static IEnumerable<SqlException> ExtractSqlExceptions(Exception e)
|
||||||
|
{
|
||||||
|
while (e != null)
|
||||||
|
{
|
||||||
|
switch (e)
|
||||||
|
{
|
||||||
|
case AggregateException aggregateException:
|
||||||
|
foreach (var innerException in aggregateException.InnerExceptions)
|
||||||
|
{
|
||||||
|
foreach (var extractedSqlException in ExtractSqlExceptions(innerException))
|
||||||
|
yield return extractedSqlException;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case SqlException sqlException:
|
||||||
|
yield return sqlException;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
e = e.InnerException;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Goes through all errors in a SqlException, and any InnerException that is also a SqlException.
|
||||||
|
/// </summary>
|
||||||
|
public static IEnumerable<SqlError> UnwrapSqlErrors(SqlException exception)
|
||||||
|
{
|
||||||
|
while (exception != null)
|
||||||
|
{
|
||||||
|
foreach (SqlError error in exception.Errors)
|
||||||
|
yield return error;
|
||||||
|
|
||||||
|
exception = exception.InnerException as SqlException;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mostly borrowed from EF:
|
||||||
|
// https://github.com/aspnet/EntityFrameworkCore/blob/d8b7ebbfabff3d2e8560c24b1ff14d1f4244ca6a/src/EFCore.SqlServer/Storage/Internal/SqlServerTransientExceptionDetector.cs
|
||||||
|
private static bool IsRecoverableSQLError(IEnumerable<SqlError> sqlErrors)
|
||||||
|
{
|
||||||
|
foreach (var err in sqlErrors)
|
||||||
|
{
|
||||||
|
switch (err.Number)
|
||||||
|
{
|
||||||
|
// This exception can be thrown even if the operation completed succesfully, so it's safer to let the application fail.
|
||||||
|
// DBNETLIB Error Code: -2
|
||||||
|
// Timeout expired. The timeout period elapsed prior to completion of the operation or the server is not responding. The statement has been terminated.
|
||||||
|
// In onze situatie mag elke repository operation nogmaals uitgevoerd worden dus gaan we deze retry-en, ook al vind EF van niet.
|
||||||
|
case -2:
|
||||||
|
|
||||||
|
|
||||||
|
// EF doesn't recognize this one for some reason, but it occurs when I stop the SQL Server service
|
||||||
|
case 2:
|
||||||
|
|
||||||
|
// I had this occur once as well, even though all credentials were valid, so assume it's transient as well
|
||||||
|
case 4060:
|
||||||
|
|
||||||
|
|
||||||
|
// SQL Error Code: 49920
|
||||||
|
// Cannot process request. Too many operations in progress for subscription "%ld".
|
||||||
|
// The service is busy processing multiple requests for this subscription.
|
||||||
|
// Requests are currently blocked for resource optimization. Query sys.dm_operation_status for operation status.
|
||||||
|
// Wait until pending requests are complete or delete one of your pending requests and retry your request later.
|
||||||
|
case 49920:
|
||||||
|
// SQL Error Code: 49919
|
||||||
|
// Cannot process create or update request. Too many create or update operations in progress for subscription "%ld".
|
||||||
|
// The service is busy processing multiple create or update requests for your subscription or server.
|
||||||
|
// Requests are currently blocked for resource optimization. Query sys.dm_operation_status for pending operations.
|
||||||
|
// Wait till pending create or update requests are complete or delete one of your pending requests and
|
||||||
|
// retry your request later.
|
||||||
|
case 49919:
|
||||||
|
// SQL Error Code: 49918
|
||||||
|
// Cannot process request. Not enough resources to process request.
|
||||||
|
// The service is currently busy.Please retry the request later.
|
||||||
|
case 49918:
|
||||||
|
// SQL Error Code: 41839
|
||||||
|
// Transaction exceeded the maximum number of commit dependencies.
|
||||||
|
case 41839:
|
||||||
|
// SQL Error Code: 41325
|
||||||
|
// The current transaction failed to commit due to a serializable validation failure.
|
||||||
|
case 41325:
|
||||||
|
// SQL Error Code: 41305
|
||||||
|
// The current transaction failed to commit due to a repeatable read validation failure.
|
||||||
|
case 41305:
|
||||||
|
// SQL Error Code: 41302
|
||||||
|
// The current transaction attempted to update a record that has been updated since the transaction started.
|
||||||
|
case 41302:
|
||||||
|
// SQL Error Code: 41301
|
||||||
|
// Dependency failure: a dependency was taken on another transaction that later failed to commit.
|
||||||
|
case 41301:
|
||||||
|
// SQL Error Code: 40613
|
||||||
|
// Database XXXX on server YYYY is not currently available. Please retry the connection later.
|
||||||
|
// If the problem persists, contact customer support, and provide them the session tracing ID of ZZZZZ.
|
||||||
|
case 40613:
|
||||||
|
// SQL Error Code: 40501
|
||||||
|
// The service is currently busy. Retry the request after 10 seconds. Code: (reason code to be decoded).
|
||||||
|
case 40501:
|
||||||
|
// SQL Error Code: 40197
|
||||||
|
// The service has encountered an error processing your request. Please try again.
|
||||||
|
case 40197:
|
||||||
|
// SQL Error Code: 10929
|
||||||
|
// Resource ID: %d. The %s minimum guarantee is %d, maximum limit is %d and the current usage for the database is %d.
|
||||||
|
// However, the server is currently too busy to support requests greater than %d for this database.
|
||||||
|
// For more information, see http://go.microsoft.com/fwlink/?LinkId=267637. Otherwise, please try again.
|
||||||
|
case 10929:
|
||||||
|
// SQL Error Code: 10928
|
||||||
|
// Resource ID: %d. The %s limit for the database is %d and has been reached. For more information,
|
||||||
|
// see http://go.microsoft.com/fwlink/?LinkId=267637.
|
||||||
|
case 10928:
|
||||||
|
// SQL Error Code: 10060
|
||||||
|
// A network-related or instance-specific error occurred while establishing a connection to SQL Server.
|
||||||
|
// The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server
|
||||||
|
// is configured to allow remote connections. (provider: TCP Provider, error: 0 - A connection attempt failed
|
||||||
|
// because the connected party did not properly respond after a period of time, or established connection failed
|
||||||
|
// because connected host has failed to respond.)"}
|
||||||
|
case 10060:
|
||||||
|
// SQL Error Code: 10054
|
||||||
|
// A transport-level error has occurred when sending the request to the server.
|
||||||
|
// (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by the remote host.)
|
||||||
|
case 10054:
|
||||||
|
// SQL Error Code: 10053
|
||||||
|
// A transport-level error has occurred when receiving results from the server.
|
||||||
|
// An established connection was aborted by the software in your host machine.
|
||||||
|
case 10053:
|
||||||
|
// SQL Error Code: 1205
|
||||||
|
// Deadlock
|
||||||
|
case 1205:
|
||||||
|
// SQL Error Code: 233
|
||||||
|
// The client was unable to establish a connection because of an error during connection initialization process before login.
|
||||||
|
// Possible causes include the following: the client tried to connect to an unsupported version of SQL Server;
|
||||||
|
// the server was too busy to accept new connections; or there was a resource limitation (insufficient memory or maximum
|
||||||
|
// allowed connections) on the server. (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by
|
||||||
|
// the remote host.)
|
||||||
|
case 233:
|
||||||
|
// SQL Error Code: 121
|
||||||
|
// The semaphore timeout period has expired
|
||||||
|
case 121:
|
||||||
|
// SQL Error Code: 64
|
||||||
|
// A connection was successfully established with the server, but then an error occurred during the login process.
|
||||||
|
// (provider: TCP Provider, error: 0 - The specified network name is no longer available.)
|
||||||
|
case 64:
|
||||||
|
// DBNETLIB Error Code: 20
|
||||||
|
// The instance of SQL Server you attempted to connect to does not support encryption.
|
||||||
|
case 20:
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
60
Tapeti.Flow.SQL/SqlRetryHelper.cs
Normal file
60
Tapeti.Flow.SQL/SqlRetryHelper.cs
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
using System;
|
||||||
|
using System.Data.SqlClient;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Tapeti.Flow.SQL
|
||||||
|
{
|
||||||
|
internal class SqlRetryHelper
|
||||||
|
{
|
||||||
|
public static readonly TimeSpan[] ExponentialBackoff = {
|
||||||
|
TimeSpan.FromSeconds(1),
|
||||||
|
TimeSpan.FromSeconds(2),
|
||||||
|
TimeSpan.FromSeconds(3),
|
||||||
|
TimeSpan.FromSeconds(5),
|
||||||
|
TimeSpan.FromSeconds(8),
|
||||||
|
TimeSpan.FromSeconds(13),
|
||||||
|
TimeSpan.FromSeconds(21),
|
||||||
|
TimeSpan.FromSeconds(34),
|
||||||
|
TimeSpan.FromSeconds(55)
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
public static async Task Execute(Func<Task> callback)
|
||||||
|
{
|
||||||
|
var retryAttempt = 0;
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await callback();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
catch (SqlException e)
|
||||||
|
{
|
||||||
|
if (SqlExceptionHelper.IsTransientError(e))
|
||||||
|
{
|
||||||
|
await Task.Delay(ExponentialBackoff[retryAttempt]);
|
||||||
|
if (retryAttempt < ExponentialBackoff.Length - 1)
|
||||||
|
retryAttempt++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static async Task<T> Execute<T>(Func<Task<T>> callback)
|
||||||
|
{
|
||||||
|
var returnValue = default(T);
|
||||||
|
|
||||||
|
await Execute(async () =>
|
||||||
|
{
|
||||||
|
returnValue = await callback();
|
||||||
|
});
|
||||||
|
|
||||||
|
return returnValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,6 @@
|
|||||||
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
|
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
|
||||||
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ID/@EntryIndexedValue">ID</s:String>
|
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ID/@EntryIndexedValue">ID</s:String>
|
||||||
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=KV/@EntryIndexedValue">KV</s:String>
|
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=KV/@EntryIndexedValue">KV</s:String>
|
||||||
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue"><Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /></s:String></wpf:ResourceDictionary>
|
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SQL/@EntryIndexedValue">SQL</s:String>
|
||||||
|
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue"><Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /></s:String>
|
||||||
|
</wpf:ResourceDictionary>
|
Loading…
Reference in New Issue
Block a user