From e5614f9d7b5e0574e2bd96107cc12ddfdc342e6f Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 10 Oct 2019 17:19:28 +0200 Subject: [PATCH] Fixed #19: No retry in Flow.SQL --- .../SqlConnectionFlowRepository.cs | 93 ++++---- Tapeti.Flow.SQL/SqlExceptionHelper.cs | 199 ++++++++++++++++++ Tapeti.Flow.SQL/SqlRetryHelper.cs | 60 ++++++ Tapeti.sln.DotSettings | 6 +- 4 files changed, 315 insertions(+), 43 deletions(-) create mode 100644 Tapeti.Flow.SQL/SqlExceptionHelper.cs create mode 100644 Tapeti.Flow.SQL/SqlRetryHelper.cs diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs index d32b645..b2a1a5a 100644 --- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs +++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs @@ -35,74 +35,85 @@ namespace Tapeti.Flow.SQL public async Task>> GetStates() { - using (var connection = await GetConnection()) + return await SqlRetryHelper.Execute(async () => { - var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection); - var flowReader = await flowQuery.ExecuteReaderAsync(); - - var result = new List>(); - - while (await flowReader.ReadAsync()) + using (var connection = await GetConnection()) { - var flowID = flowReader.GetGuid(0); - var stateJson = flowReader.GetString(1); + var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection); + var flowReader = await flowQuery.ExecuteReaderAsync(); - var state = JsonConvert.DeserializeObject(stateJson); - result.Add(new KeyValuePair(flowID, state)); + var result = new List>(); + + while (await flowReader.ReadAsync()) + { + var flowID = flowReader.GetGuid(0); + var stateJson = flowReader.GetString(1); + + var state = JsonConvert.DeserializeObject(stateJson); + result.Add(new KeyValuePair(flowID, state)); + } + + return result; } - - return result; - } - + }); } public async Task CreateState(Guid flowID, T state, DateTime timestamp) { - using (var connection = await GetConnection()) + await SqlRetryHelper.Execute(async () => { - var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" + - "values (@FlowID, @StateJson, @CreationTime)", - connection); + using (var connection = await GetConnection()) + { + var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" + + "values (@FlowID, @StateJson, @CreationTime)", + connection); - var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); - var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar); - var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2); + var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); + var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar); + var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2); - flowIDParam.Value = flowID; - stateJsonParam.Value = JsonConvert.SerializeObject(state); - creationTimeParam.Value = timestamp; + flowIDParam.Value = flowID; + stateJsonParam.Value = JsonConvert.SerializeObject(state); + creationTimeParam.Value = timestamp; - await query.ExecuteNonQueryAsync(); - } + await query.ExecuteNonQueryAsync(); + } + }); } public async Task UpdateState(Guid flowID, T state) { - using (var connection = await GetConnection()) + await SqlRetryHelper.Execute(async () => { - var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection); + using (var connection = await GetConnection()) + { + var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection); - var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); - var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar); + var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); + var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar); - flowIDParam.Value = flowID; - stateJsonParam.Value = JsonConvert.SerializeObject(state); + flowIDParam.Value = flowID; + stateJsonParam.Value = JsonConvert.SerializeObject(state); - await query.ExecuteNonQueryAsync(); - } + await query.ExecuteNonQueryAsync(); + } + }); } public async Task DeleteState(Guid flowID) { - using (var connection = await GetConnection()) + await SqlRetryHelper.Execute(async () => { - var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection); + using (var connection = await GetConnection()) + { + var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection); - var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); - flowIDParam.Value = flowID; + var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); + flowIDParam.Value = flowID; - await query.ExecuteNonQueryAsync(); - } + await query.ExecuteNonQueryAsync(); + } + }); } diff --git a/Tapeti.Flow.SQL/SqlExceptionHelper.cs b/Tapeti.Flow.SQL/SqlExceptionHelper.cs new file mode 100644 index 0000000..5a596a1 --- /dev/null +++ b/Tapeti.Flow.SQL/SqlExceptionHelper.cs @@ -0,0 +1,199 @@ +using System; +using System.Collections.Generic; +using System.Data.SqlClient; +using System.Text; + +namespace Tapeti.Flow.SQL +{ + internal static class SqlExceptionHelper + { + // 2601: Cannot insert duplicate key row in object '%.*ls' with unique index '%.*ls'. The duplicate key value is %ls. + // 2627: Violation of %ls constraint '%.*ls'. Cannot insert duplicate key in object '%.*ls'. The duplicate key value is %ls. + public static bool IsDuplicateKey(SqlException e) + { + return e != null && (e.Number == 2601 || e.Number == 2627); + } + + + public static bool IsTransientError(Exception e) + { + switch (e) + { + case TimeoutException _: + return true; + + case Exception exception: + { + var sqlExceptions = ExtractSqlExceptions(e); + foreach (var sqlException in sqlExceptions) + { + var sqlErrors = UnwrapSqlErrors(sqlException); + + if (IsRecoverableSQLError(sqlErrors)) + return true; + } + + return false; + } + + default: + return false; + } + } + + /// + /// Extracts alls SqlExceptions from the main and inner or aggregate exceptions + /// + public static IEnumerable ExtractSqlExceptions(Exception e) + { + while (e != null) + { + switch (e) + { + case AggregateException aggregateException: + foreach (var innerException in aggregateException.InnerExceptions) + { + foreach (var extractedSqlException in ExtractSqlExceptions(innerException)) + yield return extractedSqlException; + } + break; + + case SqlException sqlException: + yield return sqlException; + break; + } + e = e.InnerException; + } + } + + + /// + /// Goes through all errors in a SqlException, and any InnerException that is also a SqlException. + /// + public static IEnumerable UnwrapSqlErrors(SqlException exception) + { + while (exception != null) + { + foreach (SqlError error in exception.Errors) + yield return error; + + exception = exception.InnerException as SqlException; + } + } + + // Mostly borrowed from EF: + // https://github.com/aspnet/EntityFrameworkCore/blob/d8b7ebbfabff3d2e8560c24b1ff14d1f4244ca6a/src/EFCore.SqlServer/Storage/Internal/SqlServerTransientExceptionDetector.cs + private static bool IsRecoverableSQLError(IEnumerable sqlErrors) + { + foreach (var err in sqlErrors) + { + switch (err.Number) + { + // This exception can be thrown even if the operation completed succesfully, so it's safer to let the application fail. + // DBNETLIB Error Code: -2 + // Timeout expired. The timeout period elapsed prior to completion of the operation or the server is not responding. The statement has been terminated. + // In onze situatie mag elke repository operation nogmaals uitgevoerd worden dus gaan we deze retry-en, ook al vind EF van niet. + case -2: + + + // EF doesn't recognize this one for some reason, but it occurs when I stop the SQL Server service + case 2: + + // I had this occur once as well, even though all credentials were valid, so assume it's transient as well + case 4060: + + + // SQL Error Code: 49920 + // Cannot process request. Too many operations in progress for subscription "%ld". + // The service is busy processing multiple requests for this subscription. + // Requests are currently blocked for resource optimization. Query sys.dm_operation_status for operation status. + // Wait until pending requests are complete or delete one of your pending requests and retry your request later. + case 49920: + // SQL Error Code: 49919 + // Cannot process create or update request. Too many create or update operations in progress for subscription "%ld". + // The service is busy processing multiple create or update requests for your subscription or server. + // Requests are currently blocked for resource optimization. Query sys.dm_operation_status for pending operations. + // Wait till pending create or update requests are complete or delete one of your pending requests and + // retry your request later. + case 49919: + // SQL Error Code: 49918 + // Cannot process request. Not enough resources to process request. + // The service is currently busy.Please retry the request later. + case 49918: + // SQL Error Code: 41839 + // Transaction exceeded the maximum number of commit dependencies. + case 41839: + // SQL Error Code: 41325 + // The current transaction failed to commit due to a serializable validation failure. + case 41325: + // SQL Error Code: 41305 + // The current transaction failed to commit due to a repeatable read validation failure. + case 41305: + // SQL Error Code: 41302 + // The current transaction attempted to update a record that has been updated since the transaction started. + case 41302: + // SQL Error Code: 41301 + // Dependency failure: a dependency was taken on another transaction that later failed to commit. + case 41301: + // SQL Error Code: 40613 + // Database XXXX on server YYYY is not currently available. Please retry the connection later. + // If the problem persists, contact customer support, and provide them the session tracing ID of ZZZZZ. + case 40613: + // SQL Error Code: 40501 + // The service is currently busy. Retry the request after 10 seconds. Code: (reason code to be decoded). + case 40501: + // SQL Error Code: 40197 + // The service has encountered an error processing your request. Please try again. + case 40197: + // SQL Error Code: 10929 + // Resource ID: %d. The %s minimum guarantee is %d, maximum limit is %d and the current usage for the database is %d. + // However, the server is currently too busy to support requests greater than %d for this database. + // For more information, see http://go.microsoft.com/fwlink/?LinkId=267637. Otherwise, please try again. + case 10929: + // SQL Error Code: 10928 + // Resource ID: %d. The %s limit for the database is %d and has been reached. For more information, + // see http://go.microsoft.com/fwlink/?LinkId=267637. + case 10928: + // SQL Error Code: 10060 + // A network-related or instance-specific error occurred while establishing a connection to SQL Server. + // The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server + // is configured to allow remote connections. (provider: TCP Provider, error: 0 - A connection attempt failed + // because the connected party did not properly respond after a period of time, or established connection failed + // because connected host has failed to respond.)"} + case 10060: + // SQL Error Code: 10054 + // A transport-level error has occurred when sending the request to the server. + // (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by the remote host.) + case 10054: + // SQL Error Code: 10053 + // A transport-level error has occurred when receiving results from the server. + // An established connection was aborted by the software in your host machine. + case 10053: + // SQL Error Code: 1205 + // Deadlock + case 1205: + // SQL Error Code: 233 + // The client was unable to establish a connection because of an error during connection initialization process before login. + // Possible causes include the following: the client tried to connect to an unsupported version of SQL Server; + // the server was too busy to accept new connections; or there was a resource limitation (insufficient memory or maximum + // allowed connections) on the server. (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by + // the remote host.) + case 233: + // SQL Error Code: 121 + // The semaphore timeout period has expired + case 121: + // SQL Error Code: 64 + // A connection was successfully established with the server, but then an error occurred during the login process. + // (provider: TCP Provider, error: 0 - The specified network name is no longer available.) + case 64: + // DBNETLIB Error Code: 20 + // The instance of SQL Server you attempted to connect to does not support encryption. + case 20: + return true; + } + } + + return false; + } + } +} diff --git a/Tapeti.Flow.SQL/SqlRetryHelper.cs b/Tapeti.Flow.SQL/SqlRetryHelper.cs new file mode 100644 index 0000000..51068d7 --- /dev/null +++ b/Tapeti.Flow.SQL/SqlRetryHelper.cs @@ -0,0 +1,60 @@ +using System; +using System.Data.SqlClient; +using System.Threading.Tasks; + +namespace Tapeti.Flow.SQL +{ + internal class SqlRetryHelper + { + public static readonly TimeSpan[] ExponentialBackoff = { + TimeSpan.FromSeconds(1), + TimeSpan.FromSeconds(2), + TimeSpan.FromSeconds(3), + TimeSpan.FromSeconds(5), + TimeSpan.FromSeconds(8), + TimeSpan.FromSeconds(13), + TimeSpan.FromSeconds(21), + TimeSpan.FromSeconds(34), + TimeSpan.FromSeconds(55) + }; + + + public static async Task Execute(Func callback) + { + var retryAttempt = 0; + + while (true) + { + try + { + await callback(); + break; + } + catch (SqlException e) + { + if (SqlExceptionHelper.IsTransientError(e)) + { + await Task.Delay(ExponentialBackoff[retryAttempt]); + if (retryAttempt < ExponentialBackoff.Length - 1) + retryAttempt++; + } + else + throw; + } + } + } + + + public static async Task Execute(Func> callback) + { + var returnValue = default(T); + + await Execute(async () => + { + returnValue = await callback(); + }); + + return returnValue; + } + } +} diff --git a/Tapeti.sln.DotSettings b/Tapeti.sln.DotSettings index 06fcad6..31b78dd 100644 --- a/Tapeti.sln.DotSettings +++ b/Tapeti.sln.DotSettings @@ -1,4 +1,6 @@ - + ID KV - <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /> \ No newline at end of file + SQL + <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /> + \ No newline at end of file