1
0
mirror of synced 2025-01-22 16:13:07 +01:00

Fixed #19: No retry in Flow.SQL

This commit is contained in:
Mark van Renswoude 2019-10-10 16:26:13 +02:00
parent 38d6f29576
commit 26fb064f42
4 changed files with 312 additions and 41 deletions

View File

@ -38,77 +38,88 @@ namespace Tapeti.Flow.SQL
/// <inheritdoc />
public async Task<List<KeyValuePair<Guid, T>>> GetStates<T>()
{
using (var connection = await GetConnection())
return await SqlRetryHelper.Execute(async () =>
{
var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection);
var flowReader = await flowQuery.ExecuteReaderAsync();
var result = new List<KeyValuePair<Guid, T>>();
while (await flowReader.ReadAsync())
using (var connection = await GetConnection())
{
var flowID = flowReader.GetGuid(0);
var stateJson = flowReader.GetString(1);
var flowQuery = new SqlCommand($"select FlowID, StateJson from {tableName}", connection);
var flowReader = await flowQuery.ExecuteReaderAsync();
var state = JsonConvert.DeserializeObject<T>(stateJson);
result.Add(new KeyValuePair<Guid, T>(flowID, state));
var result = new List<KeyValuePair<Guid, T>>();
while (await flowReader.ReadAsync())
{
var flowID = flowReader.GetGuid(0);
var stateJson = flowReader.GetString(1);
var state = JsonConvert.DeserializeObject<T>(stateJson);
result.Add(new KeyValuePair<Guid, T>(flowID, state));
}
return result;
}
return result;
}
});
}
/// <inheritdoc />
public async Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
{
using (var connection = await GetConnection())
await SqlRetryHelper.Execute(async () =>
{
var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" +
"values (@FlowID, @StateJson, @CreationTime)",
connection);
using (var connection = await GetConnection())
{
var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" +
"values (@FlowID, @StateJson, @CreationTime)",
connection);
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2);
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
var creationTimeParam = query.Parameters.Add("@CreationTime", SqlDbType.DateTime2);
flowIDParam.Value = flowID;
stateJsonParam.Value = JsonConvert.SerializeObject(state);
creationTimeParam.Value = timestamp;
flowIDParam.Value = flowID;
stateJsonParam.Value = JsonConvert.SerializeObject(state);
creationTimeParam.Value = timestamp;
await query.ExecuteNonQueryAsync();
}
await query.ExecuteNonQueryAsync();
}
});
}
/// <inheritdoc />
public async Task UpdateState<T>(Guid flowID, T state)
{
using (var connection = await GetConnection())
await SqlRetryHelper.Execute(async () =>
{
var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection);
using (var connection = await GetConnection())
{
var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection);
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
var stateJsonParam = query.Parameters.Add("@StateJson", SqlDbType.NVarChar);
flowIDParam.Value = flowID;
stateJsonParam.Value = JsonConvert.SerializeObject(state);
flowIDParam.Value = flowID;
stateJsonParam.Value = JsonConvert.SerializeObject(state);
await query.ExecuteNonQueryAsync();
}
await query.ExecuteNonQueryAsync();
}
});
}
/// <inheritdoc />
public async Task DeleteState(Guid flowID)
{
using (var connection = await GetConnection())
await SqlRetryHelper.Execute(async () =>
{
var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection);
using (var connection = await GetConnection())
{
var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection);
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
flowIDParam.Value = flowID;
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
flowIDParam.Value = flowID;
await query.ExecuteNonQueryAsync();
}
await query.ExecuteNonQueryAsync();
}
});
}

View File

@ -0,0 +1,199 @@
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Text;
namespace Tapeti.Flow.SQL
{
internal static class SqlExceptionHelper
{
// 2601: Cannot insert duplicate key row in object '%.*ls' with unique index '%.*ls'. The duplicate key value is %ls.
// 2627: Violation of %ls constraint '%.*ls'. Cannot insert duplicate key in object '%.*ls'. The duplicate key value is %ls.
public static bool IsDuplicateKey(SqlException e)
{
return e != null && (e.Number == 2601 || e.Number == 2627);
}
public static bool IsTransientError(Exception e)
{
switch (e)
{
case TimeoutException _:
return true;
case Exception exception:
{
var sqlExceptions = ExtractSqlExceptions(e);
foreach (var sqlException in sqlExceptions)
{
var sqlErrors = UnwrapSqlErrors(sqlException);
if (IsRecoverableSQLError(sqlErrors))
return true;
}
return false;
}
default:
return false;
}
}
/// <summary>
/// Extracts alls SqlExceptions from the main and inner or aggregate exceptions
/// </summary>
public static IEnumerable<SqlException> ExtractSqlExceptions(Exception e)
{
while (e != null)
{
switch (e)
{
case AggregateException aggregateException:
foreach (var innerException in aggregateException.InnerExceptions)
{
foreach (var extractedSqlException in ExtractSqlExceptions(innerException))
yield return extractedSqlException;
}
break;
case SqlException sqlException:
yield return sqlException;
break;
}
e = e.InnerException;
}
}
/// <summary>
/// Goes through all errors in a SqlException, and any InnerException that is also a SqlException.
/// </summary>
public static IEnumerable<SqlError> UnwrapSqlErrors(SqlException exception)
{
while (exception != null)
{
foreach (SqlError error in exception.Errors)
yield return error;
exception = exception.InnerException as SqlException;
}
}
// Mostly borrowed from EF:
// https://github.com/aspnet/EntityFrameworkCore/blob/d8b7ebbfabff3d2e8560c24b1ff14d1f4244ca6a/src/EFCore.SqlServer/Storage/Internal/SqlServerTransientExceptionDetector.cs
private static bool IsRecoverableSQLError(IEnumerable<SqlError> sqlErrors)
{
foreach (var err in sqlErrors)
{
switch (err.Number)
{
// This exception can be thrown even if the operation completed succesfully, so it's safer to let the application fail.
// DBNETLIB Error Code: -2
// Timeout expired. The timeout period elapsed prior to completion of the operation or the server is not responding. The statement has been terminated.
// In onze situatie mag elke repository operation nogmaals uitgevoerd worden dus gaan we deze retry-en, ook al vind EF van niet.
case -2:
// EF doesn't recognize this one for some reason, but it occurs when I stop the SQL Server service
case 2:
// I had this occur once as well, even though all credentials were valid, so assume it's transient as well
case 4060:
// SQL Error Code: 49920
// Cannot process request. Too many operations in progress for subscription "%ld".
// The service is busy processing multiple requests for this subscription.
// Requests are currently blocked for resource optimization. Query sys.dm_operation_status for operation status.
// Wait until pending requests are complete or delete one of your pending requests and retry your request later.
case 49920:
// SQL Error Code: 49919
// Cannot process create or update request. Too many create or update operations in progress for subscription "%ld".
// The service is busy processing multiple create or update requests for your subscription or server.
// Requests are currently blocked for resource optimization. Query sys.dm_operation_status for pending operations.
// Wait till pending create or update requests are complete or delete one of your pending requests and
// retry your request later.
case 49919:
// SQL Error Code: 49918
// Cannot process request. Not enough resources to process request.
// The service is currently busy.Please retry the request later.
case 49918:
// SQL Error Code: 41839
// Transaction exceeded the maximum number of commit dependencies.
case 41839:
// SQL Error Code: 41325
// The current transaction failed to commit due to a serializable validation failure.
case 41325:
// SQL Error Code: 41305
// The current transaction failed to commit due to a repeatable read validation failure.
case 41305:
// SQL Error Code: 41302
// The current transaction attempted to update a record that has been updated since the transaction started.
case 41302:
// SQL Error Code: 41301
// Dependency failure: a dependency was taken on another transaction that later failed to commit.
case 41301:
// SQL Error Code: 40613
// Database XXXX on server YYYY is not currently available. Please retry the connection later.
// If the problem persists, contact customer support, and provide them the session tracing ID of ZZZZZ.
case 40613:
// SQL Error Code: 40501
// The service is currently busy. Retry the request after 10 seconds. Code: (reason code to be decoded).
case 40501:
// SQL Error Code: 40197
// The service has encountered an error processing your request. Please try again.
case 40197:
// SQL Error Code: 10929
// Resource ID: %d. The %s minimum guarantee is %d, maximum limit is %d and the current usage for the database is %d.
// However, the server is currently too busy to support requests greater than %d for this database.
// For more information, see http://go.microsoft.com/fwlink/?LinkId=267637. Otherwise, please try again.
case 10929:
// SQL Error Code: 10928
// Resource ID: %d. The %s limit for the database is %d and has been reached. For more information,
// see http://go.microsoft.com/fwlink/?LinkId=267637.
case 10928:
// SQL Error Code: 10060
// A network-related or instance-specific error occurred while establishing a connection to SQL Server.
// The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server
// is configured to allow remote connections. (provider: TCP Provider, error: 0 - A connection attempt failed
// because the connected party did not properly respond after a period of time, or established connection failed
// because connected host has failed to respond.)"}
case 10060:
// SQL Error Code: 10054
// A transport-level error has occurred when sending the request to the server.
// (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by the remote host.)
case 10054:
// SQL Error Code: 10053
// A transport-level error has occurred when receiving results from the server.
// An established connection was aborted by the software in your host machine.
case 10053:
// SQL Error Code: 1205
// Deadlock
case 1205:
// SQL Error Code: 233
// The client was unable to establish a connection because of an error during connection initialization process before login.
// Possible causes include the following: the client tried to connect to an unsupported version of SQL Server;
// the server was too busy to accept new connections; or there was a resource limitation (insufficient memory or maximum
// allowed connections) on the server. (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by
// the remote host.)
case 233:
// SQL Error Code: 121
// The semaphore timeout period has expired
case 121:
// SQL Error Code: 64
// A connection was successfully established with the server, but then an error occurred during the login process.
// (provider: TCP Provider, error: 0 - The specified network name is no longer available.)
case 64:
// DBNETLIB Error Code: 20
// The instance of SQL Server you attempted to connect to does not support encryption.
case 20:
return true;
}
}
return false;
}
}
}

View File

@ -0,0 +1,60 @@
using System;
using System.Data.SqlClient;
using System.Threading.Tasks;
namespace Tapeti.Flow.SQL
{
internal class SqlRetryHelper
{
public static readonly TimeSpan[] ExponentialBackoff = {
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(3),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(8),
TimeSpan.FromSeconds(13),
TimeSpan.FromSeconds(21),
TimeSpan.FromSeconds(34),
TimeSpan.FromSeconds(55)
};
public static async Task Execute(Func<Task> callback)
{
var retryAttempt = 0;
while (true)
{
try
{
await callback();
break;
}
catch (SqlException e)
{
if (SqlExceptionHelper.IsTransientError(e))
{
await Task.Delay(ExponentialBackoff[retryAttempt]);
if (retryAttempt < ExponentialBackoff.Length - 1)
retryAttempt++;
}
else
throw;
}
}
}
public static async Task<T> Execute<T>(Func<Task<T>> callback)
{
var returnValue = default(T);
await Execute(async () =>
{
returnValue = await callback();
});
return returnValue;
}
}
}

View File

@ -3,6 +3,7 @@
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=API/@EntryIndexedValue">API</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ID/@EntryIndexedValue">ID</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=KV/@EntryIndexedValue">KV</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SQL/@EntryIndexedValue">SQL</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /&gt;</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>