diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs
index 2a93af9..bba7085 100644
--- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs
+++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs
@@ -38,77 +38,88 @@ namespace Tapeti.Flow.SQL
///
public async Task>> GetStates()
{
- using (var connection = await GetConnection())
+ return await SqlRetryHelper.Execute(async () =>
{
- var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection);
- var flowReader = await flowQuery.ExecuteReaderAsync();
-
- var result = new List>();
-
- while (await flowReader.ReadAsync())
+ using (var connection = await GetConnection())
{
- var flowID = flowReader.GetGuid(0);
- var stateJson = flowReader.GetString(1);
+ var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection);
+ var flowReader = await flowQuery.ExecuteReaderAsync();
- var state = JsonConvert.DeserializeObject(stateJson);
- result.Add(new KeyValuePair(flowID, state));
+ var result = new List>();
+
+ while (await flowReader.ReadAsync())
+ {
+ var flowID = flowReader.GetGuid(0);
+ var stateJson = flowReader.GetString(1);
+
+ var state = JsonConvert.DeserializeObject(stateJson);
+ result.Add(new KeyValuePair(flowID, state));
+ }
+
+ return result;
}
-
- return result;
- }
-
+ });
}
///
public async Task CreateState(Guid flowID, T state, DateTime timestamp)
{
- using (var connection = await GetConnection())
+ await SqlRetryHelper.Execute(async () =>
{
- var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" +
- "values (@FlowID, @StateJson, @CreationTime)",
- connection);
+ using (var connection = await GetConnection())
+ {
+ var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" +
+ "values (@FlowID, @StateJson, @CreationTime)",
+ connection);
- var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
- var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
- var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2);
+ var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
+ var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
+ var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2);
- flowIDParam.Value = flowID;
- stateJsonParam.Value = JsonConvert.SerializeObject(state);
- creationTimeParam.Value = timestamp;
+ flowIDParam.Value = flowID;
+ stateJsonParam.Value = JsonConvert.SerializeObject(state);
+ creationTimeParam.Value = timestamp;
- await query.ExecuteNonQueryAsync();
- }
+ await query.ExecuteNonQueryAsync();
+ }
+ });
}
///
public async Task UpdateState(Guid flowID, T state)
{
- using (var connection = await GetConnection())
+ await SqlRetryHelper.Execute(async () =>
{
- var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection);
+ using (var connection = await GetConnection())
+ {
+ var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection);
- var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
- var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
+ var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
+ var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
- flowIDParam.Value = flowID;
- stateJsonParam.Value = JsonConvert.SerializeObject(state);
+ flowIDParam.Value = flowID;
+ stateJsonParam.Value = JsonConvert.SerializeObject(state);
- await query.ExecuteNonQueryAsync();
- }
+ await query.ExecuteNonQueryAsync();
+ }
+ });
}
///
public async Task DeleteState(Guid flowID)
{
- using (var connection = await GetConnection())
+ await SqlRetryHelper.Execute(async () =>
{
- var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection);
+ using (var connection = await GetConnection())
+ {
+ var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection);
- var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
- flowIDParam.Value = flowID;
+ var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
+ flowIDParam.Value = flowID;
- await query.ExecuteNonQueryAsync();
- }
+ await query.ExecuteNonQueryAsync();
+ }
+ });
}
diff --git a/Tapeti.Flow.SQL/SqlExceptionHelper.cs b/Tapeti.Flow.SQL/SqlExceptionHelper.cs
new file mode 100644
index 0000000..5a596a1
--- /dev/null
+++ b/Tapeti.Flow.SQL/SqlExceptionHelper.cs
@@ -0,0 +1,199 @@
+using System;
+using System.Collections.Generic;
+using System.Data.SqlClient;
+using System.Text;
+
+namespace Tapeti.Flow.SQL
+{
+ internal static class SqlExceptionHelper
+ {
+ // 2601: Cannot insert duplicate key row in object '%.*ls' with unique index '%.*ls'. The duplicate key value is %ls.
+ // 2627: Violation of %ls constraint '%.*ls'. Cannot insert duplicate key in object '%.*ls'. The duplicate key value is %ls.
+ public static bool IsDuplicateKey(SqlException e)
+ {
+ return e != null && (e.Number == 2601 || e.Number == 2627);
+ }
+
+
+ public static bool IsTransientError(Exception e)
+ {
+ switch (e)
+ {
+ case TimeoutException _:
+ return true;
+
+ case Exception exception:
+ {
+ var sqlExceptions = ExtractSqlExceptions(e);
+ foreach (var sqlException in sqlExceptions)
+ {
+ var sqlErrors = UnwrapSqlErrors(sqlException);
+
+ if (IsRecoverableSQLError(sqlErrors))
+ return true;
+ }
+
+ return false;
+ }
+
+ default:
+ return false;
+ }
+ }
+
+ ///
+ /// Extracts alls SqlExceptions from the main and inner or aggregate exceptions
+ ///
+ public static IEnumerable ExtractSqlExceptions(Exception e)
+ {
+ while (e != null)
+ {
+ switch (e)
+ {
+ case AggregateException aggregateException:
+ foreach (var innerException in aggregateException.InnerExceptions)
+ {
+ foreach (var extractedSqlException in ExtractSqlExceptions(innerException))
+ yield return extractedSqlException;
+ }
+ break;
+
+ case SqlException sqlException:
+ yield return sqlException;
+ break;
+ }
+ e = e.InnerException;
+ }
+ }
+
+
+ ///
+ /// Goes through all errors in a SqlException, and any InnerException that is also a SqlException.
+ ///
+ public static IEnumerable UnwrapSqlErrors(SqlException exception)
+ {
+ while (exception != null)
+ {
+ foreach (SqlError error in exception.Errors)
+ yield return error;
+
+ exception = exception.InnerException as SqlException;
+ }
+ }
+
+ // Mostly borrowed from EF:
+ // https://github.com/aspnet/EntityFrameworkCore/blob/d8b7ebbfabff3d2e8560c24b1ff14d1f4244ca6a/src/EFCore.SqlServer/Storage/Internal/SqlServerTransientExceptionDetector.cs
+ private static bool IsRecoverableSQLError(IEnumerable sqlErrors)
+ {
+ foreach (var err in sqlErrors)
+ {
+ switch (err.Number)
+ {
+ // This exception can be thrown even if the operation completed succesfully, so it's safer to let the application fail.
+ // DBNETLIB Error Code: -2
+ // Timeout expired. The timeout period elapsed prior to completion of the operation or the server is not responding. The statement has been terminated.
+ // In onze situatie mag elke repository operation nogmaals uitgevoerd worden dus gaan we deze retry-en, ook al vind EF van niet.
+ case -2:
+
+
+ // EF doesn't recognize this one for some reason, but it occurs when I stop the SQL Server service
+ case 2:
+
+ // I had this occur once as well, even though all credentials were valid, so assume it's transient as well
+ case 4060:
+
+
+ // SQL Error Code: 49920
+ // Cannot process request. Too many operations in progress for subscription "%ld".
+ // The service is busy processing multiple requests for this subscription.
+ // Requests are currently blocked for resource optimization. Query sys.dm_operation_status for operation status.
+ // Wait until pending requests are complete or delete one of your pending requests and retry your request later.
+ case 49920:
+ // SQL Error Code: 49919
+ // Cannot process create or update request. Too many create or update operations in progress for subscription "%ld".
+ // The service is busy processing multiple create or update requests for your subscription or server.
+ // Requests are currently blocked for resource optimization. Query sys.dm_operation_status for pending operations.
+ // Wait till pending create or update requests are complete or delete one of your pending requests and
+ // retry your request later.
+ case 49919:
+ // SQL Error Code: 49918
+ // Cannot process request. Not enough resources to process request.
+ // The service is currently busy.Please retry the request later.
+ case 49918:
+ // SQL Error Code: 41839
+ // Transaction exceeded the maximum number of commit dependencies.
+ case 41839:
+ // SQL Error Code: 41325
+ // The current transaction failed to commit due to a serializable validation failure.
+ case 41325:
+ // SQL Error Code: 41305
+ // The current transaction failed to commit due to a repeatable read validation failure.
+ case 41305:
+ // SQL Error Code: 41302
+ // The current transaction attempted to update a record that has been updated since the transaction started.
+ case 41302:
+ // SQL Error Code: 41301
+ // Dependency failure: a dependency was taken on another transaction that later failed to commit.
+ case 41301:
+ // SQL Error Code: 40613
+ // Database XXXX on server YYYY is not currently available. Please retry the connection later.
+ // If the problem persists, contact customer support, and provide them the session tracing ID of ZZZZZ.
+ case 40613:
+ // SQL Error Code: 40501
+ // The service is currently busy. Retry the request after 10 seconds. Code: (reason code to be decoded).
+ case 40501:
+ // SQL Error Code: 40197
+ // The service has encountered an error processing your request. Please try again.
+ case 40197:
+ // SQL Error Code: 10929
+ // Resource ID: %d. The %s minimum guarantee is %d, maximum limit is %d and the current usage for the database is %d.
+ // However, the server is currently too busy to support requests greater than %d for this database.
+ // For more information, see http://go.microsoft.com/fwlink/?LinkId=267637. Otherwise, please try again.
+ case 10929:
+ // SQL Error Code: 10928
+ // Resource ID: %d. The %s limit for the database is %d and has been reached. For more information,
+ // see http://go.microsoft.com/fwlink/?LinkId=267637.
+ case 10928:
+ // SQL Error Code: 10060
+ // A network-related or instance-specific error occurred while establishing a connection to SQL Server.
+ // The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server
+ // is configured to allow remote connections. (provider: TCP Provider, error: 0 - A connection attempt failed
+ // because the connected party did not properly respond after a period of time, or established connection failed
+ // because connected host has failed to respond.)"}
+ case 10060:
+ // SQL Error Code: 10054
+ // A transport-level error has occurred when sending the request to the server.
+ // (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by the remote host.)
+ case 10054:
+ // SQL Error Code: 10053
+ // A transport-level error has occurred when receiving results from the server.
+ // An established connection was aborted by the software in your host machine.
+ case 10053:
+ // SQL Error Code: 1205
+ // Deadlock
+ case 1205:
+ // SQL Error Code: 233
+ // The client was unable to establish a connection because of an error during connection initialization process before login.
+ // Possible causes include the following: the client tried to connect to an unsupported version of SQL Server;
+ // the server was too busy to accept new connections; or there was a resource limitation (insufficient memory or maximum
+ // allowed connections) on the server. (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by
+ // the remote host.)
+ case 233:
+ // SQL Error Code: 121
+ // The semaphore timeout period has expired
+ case 121:
+ // SQL Error Code: 64
+ // A connection was successfully established with the server, but then an error occurred during the login process.
+ // (provider: TCP Provider, error: 0 - The specified network name is no longer available.)
+ case 64:
+ // DBNETLIB Error Code: 20
+ // The instance of SQL Server you attempted to connect to does not support encryption.
+ case 20:
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
+}
diff --git a/Tapeti.Flow.SQL/SqlRetryHelper.cs b/Tapeti.Flow.SQL/SqlRetryHelper.cs
new file mode 100644
index 0000000..51068d7
--- /dev/null
+++ b/Tapeti.Flow.SQL/SqlRetryHelper.cs
@@ -0,0 +1,60 @@
+using System;
+using System.Data.SqlClient;
+using System.Threading.Tasks;
+
+namespace Tapeti.Flow.SQL
+{
+ internal class SqlRetryHelper
+ {
+ public static readonly TimeSpan[] ExponentialBackoff = {
+ TimeSpan.FromSeconds(1),
+ TimeSpan.FromSeconds(2),
+ TimeSpan.FromSeconds(3),
+ TimeSpan.FromSeconds(5),
+ TimeSpan.FromSeconds(8),
+ TimeSpan.FromSeconds(13),
+ TimeSpan.FromSeconds(21),
+ TimeSpan.FromSeconds(34),
+ TimeSpan.FromSeconds(55)
+ };
+
+
+ public static async Task Execute(Func callback)
+ {
+ var retryAttempt = 0;
+
+ while (true)
+ {
+ try
+ {
+ await callback();
+ break;
+ }
+ catch (SqlException e)
+ {
+ if (SqlExceptionHelper.IsTransientError(e))
+ {
+ await Task.Delay(ExponentialBackoff[retryAttempt]);
+ if (retryAttempt < ExponentialBackoff.Length - 1)
+ retryAttempt++;
+ }
+ else
+ throw;
+ }
+ }
+ }
+
+
+ public static async Task Execute(Func> callback)
+ {
+ var returnValue = default(T);
+
+ await Execute(async () =>
+ {
+ returnValue = await callback();
+ });
+
+ return returnValue;
+ }
+ }
+}
diff --git a/Tapeti.sln.DotSettings b/Tapeti.sln.DotSettings
index c1f05d7..406d866 100644
--- a/Tapeti.sln.DotSettings
+++ b/Tapeti.sln.DotSettings
@@ -3,6 +3,7 @@
API
ID
KV
+ SQL
<Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
True
True