From 1bd284166d39ab6fdd905df3a87ff4d0e738ee38 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 6 Apr 2023 06:57:32 +0200 Subject: [PATCH 1/3] Fixed #42: Tapeti.Flow.SQL depends on system.data.sql --- Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs | 2 +- Tapeti.Flow.SQL/SqlExceptionHelper.cs | 2 +- Tapeti.Flow.SQL/SqlRetryHelper.cs | 2 +- Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs index 3cc02af..8a387bf 100644 --- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs +++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs @@ -1,8 +1,8 @@ using System; using System.Collections.Generic; using System.Data; -using System.Data.SqlClient; using System.Threading.Tasks; +using Microsoft.Data.SqlClient; using Newtonsoft.Json; // Neither of these are available in language version 7 required for .NET Standard 2.0 diff --git a/Tapeti.Flow.SQL/SqlExceptionHelper.cs b/Tapeti.Flow.SQL/SqlExceptionHelper.cs index dc00d31..693c9b2 100644 --- a/Tapeti.Flow.SQL/SqlExceptionHelper.cs +++ b/Tapeti.Flow.SQL/SqlExceptionHelper.cs @@ -1,7 +1,7 @@ using System; using System.Collections.Generic; -using System.Data.SqlClient; using System.Linq; +using Microsoft.Data.SqlClient; // ReSharper disable UnusedMember.Global diff --git a/Tapeti.Flow.SQL/SqlRetryHelper.cs b/Tapeti.Flow.SQL/SqlRetryHelper.cs index 5db5b60..b38b498 100644 --- a/Tapeti.Flow.SQL/SqlRetryHelper.cs +++ b/Tapeti.Flow.SQL/SqlRetryHelper.cs @@ -1,6 +1,6 @@ using System; -using System.Data.SqlClient; using System.Threading.Tasks; +using Microsoft.Data.SqlClient; namespace Tapeti.Flow.SQL { diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj index 0a4ca77..9650958 100644 --- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj +++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj @@ -33,7 +33,7 @@ - + From ab2cc1c1bbb7fa2aa50083f38e237e4dc1ebceaa Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 6 Apr 2023 07:25:01 +0200 Subject: [PATCH 2/3] Reverted async changes as they caused message handlers to no longer run in parallel --- Tapeti/Connection/TapetiBasicConsumer.cs | 27 ++++++++++++++---------- Tapeti/Connection/TapetiClient.cs | 3 +-- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/Tapeti/Connection/TapetiBasicConsumer.cs b/Tapeti/Connection/TapetiBasicConsumer.cs index 24badec..42e3181 100644 --- a/Tapeti/Connection/TapetiBasicConsumer.cs +++ b/Tapeti/Connection/TapetiBasicConsumer.cs @@ -17,7 +17,7 @@ namespace Tapeti.Connection /// /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// - internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer + internal class TapetiBasicConsumer : DefaultBasicConsumer { private readonly IConsumer consumer; private readonly long connectionReference; @@ -34,7 +34,7 @@ namespace Tapeti.Connection /// - public override async Task HandleBasicDeliver(string consumerTag, + public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, @@ -49,16 +49,21 @@ namespace Tapeti.Connection // // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 var bodyArray = body.ToArray(); - - try + + // Changing to AsyncDefaultBasicConsumer does not mean HandleBasicDeliver runs in parallel, the Task.Run would + // still be necessary, which is why TapetiBasicConsumer is a DefaultBasicConsumer. + Task.Run(async () => { - var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); - await onRespond(connectionReference, deliveryTag, response); - } - catch - { - await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); - } + try + { + var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); + await onRespond(connectionReference, deliveryTag, response); + } + catch + { + await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); + } + }); } } } diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index b23038e..1c4b016 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -777,8 +777,7 @@ namespace Tapeti.Connection Password = connectionParams.Password, AutomaticRecoveryEnabled = false, TopologyRecoveryEnabled = false, - RequestedHeartbeat = TimeSpan.FromSeconds(30), - DispatchConsumersAsync = true + RequestedHeartbeat = TimeSpan.FromSeconds(30) }; if (connectionParams.ClientProperties != null) From fde278228dc7c52e48ea27f732e2b2e23ab1fd33 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Thu, 6 Apr 2023 07:44:45 +0200 Subject: [PATCH 3/3] Prevent possible concurrency issues in ParallelRequestBuilder --- Tapeti.Flow/Default/FlowProvider.cs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 0ef8aa3..91ffb39 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -407,14 +407,16 @@ namespace Tapeti.Flow.Default if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller?.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); - await Task.WhenAll(requests.Select(requestInfo => - flowProvider.SendRequest( - context, + foreach (var requestInfo in requests) + { + await flowProvider.SendRequest( + context, requestInfo.Message, requestInfo.ResponseHandlerInfo, convergeMethod.Method.Name, convergeMethodSync, - false))); + false); + } await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); });