1
0
mirror of synced 2024-11-21 08:53:50 +00:00

Merge branch 'release/3.1.1'

This commit is contained in:
Mark van Renswoude 2023-04-06 08:00:44 +02:00
commit 320ccfa0e5
7 changed files with 27 additions and 21 deletions

View File

@ -1,8 +1,8 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data; using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
using Newtonsoft.Json; using Newtonsoft.Json;
// Neither of these are available in language version 7 required for .NET Standard 2.0 // Neither of these are available in language version 7 required for .NET Standard 2.0

View File

@ -1,7 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq; using System.Linq;
using Microsoft.Data.SqlClient;
// ReSharper disable UnusedMember.Global // ReSharper disable UnusedMember.Global

View File

@ -1,6 +1,6 @@
using System; using System;
using System.Data.SqlClient;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
namespace Tapeti.Flow.SQL namespace Tapeti.Flow.SQL
{ {

View File

@ -33,7 +33,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="System.Data.SqlClient" Version="4.8.5" /> <PackageReference Include="Microsoft.Data.SqlClient" Version="5.*" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -407,14 +407,16 @@ namespace Tapeti.Flow.Default
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller?.GetType()) if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller?.GetType())
throw new YieldPointException("Converge method must be in the same controller class"); throw new YieldPointException("Converge method must be in the same controller class");
await Task.WhenAll(requests.Select(requestInfo => foreach (var requestInfo in requests)
flowProvider.SendRequest( {
context, await flowProvider.SendRequest(
context,
requestInfo.Message, requestInfo.Message,
requestInfo.ResponseHandlerInfo, requestInfo.ResponseHandlerInfo,
convergeMethod.Method.Name, convergeMethod.Method.Name,
convergeMethodSync, convergeMethodSync,
false))); false);
}
await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue));
}); });

View File

@ -17,7 +17,7 @@ namespace Tapeti.Connection
/// <summary> /// <summary>
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
/// </summary> /// </summary>
internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer internal class TapetiBasicConsumer : DefaultBasicConsumer
{ {
private readonly IConsumer consumer; private readonly IConsumer consumer;
private readonly long connectionReference; private readonly long connectionReference;
@ -34,7 +34,7 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public override async Task HandleBasicDeliver(string consumerTag, public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag, ulong deliveryTag,
bool redelivered, bool redelivered,
string exchange, string exchange,
@ -49,16 +49,21 @@ namespace Tapeti.Connection
// //
// See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
var bodyArray = body.ToArray(); var bodyArray = body.ToArray();
try // Changing to AsyncDefaultBasicConsumer does not mean HandleBasicDeliver runs in parallel, the Task.Run would
// still be necessary, which is why TapetiBasicConsumer is a DefaultBasicConsumer.
Task.Run(async () =>
{ {
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); try
await onRespond(connectionReference, deliveryTag, response); {
} var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray);
catch await onRespond(connectionReference, deliveryTag, response);
{ }
await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); catch
} {
await onRespond(connectionReference, deliveryTag, ConsumeResult.Error);
}
});
} }
} }
} }

View File

@ -777,8 +777,7 @@ namespace Tapeti.Connection
Password = connectionParams.Password, Password = connectionParams.Password,
AutomaticRecoveryEnabled = false, AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false, TopologyRecoveryEnabled = false,
RequestedHeartbeat = TimeSpan.FromSeconds(30), RequestedHeartbeat = TimeSpan.FromSeconds(30)
DispatchConsumersAsync = true
}; };
if (connectionParams.ClientProperties != null) if (connectionParams.ClientProperties != null)