diff --git a/07-ParallelizationTest/Program.cs b/07-ParallelizationTest/Program.cs index 437c737..99917d2 100644 --- a/07-ParallelizationTest/Program.cs +++ b/07-ParallelizationTest/Program.cs @@ -72,14 +72,16 @@ namespace _07_ParallelizationTest var publisher = dependencyResolver.Resolve(); - Console.WriteLine($"Publishing {MessageCount * RepeatBatch} messages..."); + Console.WriteLine($"Publishing first {MessageCount} of {MessageCount * RepeatBatch} messages..."); - await PublishMessages(publisher, MessageCount * RepeatBatch); + await PublishMessages(publisher, MessageCount); - - Console.WriteLine("Consuming messages..."); + Console.WriteLine("Consuming messages while publishing the rest..."); await subscriber.Resume(); + + await PublishMessages(publisher, MessageCount * (RepeatBatch - 1)); + await waitForDone(); } diff --git a/Examples/03-FlowRequestResponse/SimpleFlowController.cs b/Examples/03-FlowRequestResponse/SimpleFlowController.cs index a5f972b..e093ccd 100644 --- a/Examples/03-FlowRequestResponse/SimpleFlowController.cs +++ b/Examples/03-FlowRequestResponse/SimpleFlowController.cs @@ -31,6 +31,12 @@ namespace _03_FlowRequestResponse // Public fields will be stored. public DateTime RequestStartTime; + + // Be sure not to accidentally use any public fields that aren't serializable, for example: + //public TaskCompletionSource SerializationFail = new TaskCompletionSource(); + // + // In the Newtonsoft.Json version at the time of writing, this will not result in an exception but instead hang the flow! + public SimpleFlowController(IFlowProvider flowProvider, IExampleState exampleState) { diff --git a/Tapeti.Cmd/Program.cs b/Tapeti.Cmd/Program.cs index fc179c7..1171015 100644 --- a/Tapeti.Cmd/Program.cs +++ b/Tapeti.Cmd/Program.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.ComponentModel; using System.Diagnostics; using System.IO; using System.Linq; diff --git a/Tapeti/Connection/TapetiChannel.cs b/Tapeti/Connection/TapetiChannel.cs new file mode 100644 index 0000000..0efe5c3 --- /dev/null +++ b/Tapeti/Connection/TapetiChannel.cs @@ -0,0 +1,125 @@ +using System; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; +using Tapeti.Tasks; + +namespace Tapeti.Connection +{ + internal interface ITapetiChannelModelProvider + { + void WithChannel(Action operation); + void WithRetryableChannel(Action operation); + } + + + /// + /// Represents both a RabbitMQ Client Channel (IModel) as well as it's associated single-thread task queue. + /// Access to the IModel is limited by design to enforce this relationship. + /// + internal class TapetiChannel + { + private readonly Func modelFactory; + private readonly object taskQueueLock = new(); + private SingleThreadTaskQueue taskQueue; + private readonly ModelProvider modelProvider; + + + public TapetiChannel(Func modelFactory) + { + this.modelFactory = modelFactory; + modelProvider = new ModelProvider(this); + } + + + public async Task Reset() + { + SingleThreadTaskQueue capturedTaskQueue; + + lock (taskQueueLock) + { + capturedTaskQueue = taskQueue; + taskQueue = null; + } + + if (capturedTaskQueue == null) + return; + + await capturedTaskQueue.Add(() => { }); + capturedTaskQueue.Dispose(); + } + + + public Task Queue(Action operation) + { + return GetTaskQueue().Add(() => + { + modelProvider.WithChannel(operation); + }); + } + + + + public Task QueueRetryable(Action operation) + { + return GetTaskQueue().Add(() => + { + modelProvider.WithRetryableChannel(operation); + }); + } + + + + public Task QueueWithProvider(Func operation) + { + return GetTaskQueue().Add(async () => + { + await operation(modelProvider); + }); + } + + + + private SingleThreadTaskQueue GetTaskQueue() + { + lock (taskQueueLock) + { + return taskQueue ??= new SingleThreadTaskQueue(); + } + } + + + private class ModelProvider : ITapetiChannelModelProvider + { + private readonly TapetiChannel owner; + + + public ModelProvider(TapetiChannel owner) + { + this.owner = owner; + } + + + public void WithChannel(Action operation) + { + operation(owner.modelFactory()); + } + + + public void WithRetryableChannel(Action operation) + { + while (true) + { + try + { + operation(owner.modelFactory()); + break; + } + catch (AlreadyClosedException) + { + } + } + } + } + } +} diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index c41e7b4..eb8f282 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -13,10 +13,16 @@ using RabbitMQ.Client.Exceptions; using Tapeti.Config; using Tapeti.Default; using Tapeti.Exceptions; -using Tapeti.Tasks; namespace Tapeti.Connection { + internal enum TapetiChannelType + { + Consume, + Publish + } + + /// /// /// Implementation of ITapetiClient for the RabbitMQ Client library @@ -39,23 +45,27 @@ namespace Tapeti.Connection public IConnectionEventListener ConnectionEventListener { get; set; } - private readonly Lazy taskQueue = new Lazy(); + private readonly TapetiChannel consumeChannel; + private readonly TapetiChannel publishChannel; + private readonly HttpClient managementClient; - - // These fields are for use in the taskQueue only! + // These fields must be locked using connectionLock + private readonly object connectionLock = new(); private RabbitMQ.Client.IConnection connection; + private IModel consumeChannelModel; + private IModel publishChannelModel; private bool isClosing; private bool isReconnect; - private IModel channelInstance; - private ulong lastDeliveryTag; private DateTime connectedDateTime; - private readonly HttpClient managementClient; - private readonly HashSet deletedQueues = new HashSet(); - // These fields must be locked, since the callbacks for BasicAck/BasicReturn can run in a different thread - private readonly object confirmLock = new object(); - private readonly Dictionary confirmMessages = new Dictionary(); - private readonly Dictionary returnRoutingKeys = new Dictionary(); + // These fields are for use in a single TapetiChannel's queue only! + private ulong lastDeliveryTag; + private readonly HashSet deletedQueues = new(); + + // These fields must be locked using confirmLock, since the callbacks for BasicAck/BasicReturn can run in a different thread + private readonly object confirmLock = new(); + private readonly Dictionary confirmMessages = new(); + private readonly Dictionary returnRoutingKeys = new(); private class ConfirmMessageInfo @@ -79,6 +89,9 @@ namespace Tapeti.Connection logger = config.DependencyResolver.Resolve(); + consumeChannel = new TapetiChannel(() => GetModel(TapetiChannelType.Consume)); + publishChannel = new TapetiChannel(() => GetModel(TapetiChannelType.Publish)); + var handler = new HttpClientHandler { @@ -100,7 +113,8 @@ namespace Tapeti.Connection if (string.IsNullOrEmpty(routingKey)) throw new ArgumentNullException(nameof(routingKey)); - await taskQueue.Value.Add(async () => + + await GetTapetiChannel(TapetiChannelType.Publish).QueueWithProvider(async channelProvider => { Task publishResultTask = null; var messageInfo = new ConfirmMessageInfo @@ -110,7 +124,7 @@ namespace Tapeti.Connection }; - WithRetryableChannel(channel => + channelProvider.WithRetryableChannel(channel => { DeclareExchange(channel, exchange); @@ -169,15 +183,18 @@ namespace Tapeti.Connection var replyCode = publishResultTask.Result; - // There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code - // at the time of writing... - if (replyCode == 312) - throw new NoRouteException( - $"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' does not have a route"); - - if (replyCode > 0) - throw new NoRouteException( - $"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' could not be delivered, reply code: {replyCode}"); + switch (replyCode) + { + // There is no RabbitMQ.Client.Framing.Constants value for this "No route" reply code + // at the time of writing... + case 312: + throw new NoRouteException( + $"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' does not have a route"); + + case > 0: + throw new NoRouteException( + $"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' could not be delivered, reply code: {replyCode}"); + } }); } @@ -194,7 +211,7 @@ namespace Tapeti.Connection string consumerTag = null; - await QueueWithRetryableChannel(channel => + await GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(channel => { if (cancellationToken.IsCancellationRequested) return; @@ -215,7 +232,7 @@ namespace Tapeti.Connection // No need for a retryable channel here, if the connection is lost // so is the consumer. - await Queue(channel => + await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => { channel.BasicCancel(consumerTag); }); @@ -224,7 +241,7 @@ namespace Tapeti.Connection private async Task Respond(ulong deliveryTag, ConsumeResult result) { - await Queue(channel => + await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => { // No need for a retryable channel here, if the connection is lost we can't // use the deliveryTag anymore. @@ -257,7 +274,7 @@ namespace Tapeti.Connection var currentBindings = bindings.ToList(); var bindingLogger = logger as IBindingLogger; - await Queue(channel => + await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => { if (cancellationToken.IsCancellationRequested) return; @@ -284,7 +301,7 @@ namespace Tapeti.Connection /// public async Task DurableQueueVerify(CancellationToken cancellationToken, string queueName) { - await Queue(channel => + await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => { if (cancellationToken.IsCancellationRequested) return; @@ -302,7 +319,7 @@ namespace Tapeti.Connection { uint deletedMessages = 0; - await Queue(channel => + await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => { if (cancellationToken.IsCancellationRequested) return; @@ -316,7 +333,7 @@ namespace Tapeti.Connection } - await taskQueue.Value.Add(async () => + await GetTapetiChannel(TapetiChannelType.Consume).QueueWithProvider(async channelProvider => { bool retry; do @@ -343,8 +360,10 @@ namespace Tapeti.Connection // includes the GetQueueInfo, the next time around it should have Messages > 0 try { - var channel = GetChannel(); - channel.QueueDelete(queueName, false, true); + channelProvider.WithChannel(channel => + { + channel.QueueDelete(queueName, false, true); + }); deletedQueues.Add(queueName); (logger as IBindingLogger)?.QueueObsolete(queueName, true, 0); @@ -364,10 +383,11 @@ namespace Tapeti.Connection if (existingBindings.Count > 0) { - var channel = GetChannel(); - - foreach (var binding in existingBindings) - channel.QueueUnbind(queueName, binding.Exchange, binding.RoutingKey); + channelProvider.WithChannel(channel => + { + foreach (var binding in existingBindings) + channel.QueueUnbind(queueName, binding.Exchange, binding.RoutingKey); + }); } (logger as IBindingLogger)?.QueueObsolete(queueName, false, queueInfo.Messages); @@ -383,7 +403,7 @@ namespace Tapeti.Connection string queueName = null; var bindingLogger = logger as IBindingLogger; - await Queue(channel => + await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => { if (cancellationToken.IsCancellationRequested) return; @@ -407,7 +427,7 @@ namespace Tapeti.Connection /// public async Task DynamicQueueBind(CancellationToken cancellationToken, string queueName, QueueBinding binding) { - await Queue(channel => + await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => { if (cancellationToken.IsCancellationRequested) return; @@ -422,32 +442,46 @@ namespace Tapeti.Connection /// public async Task Close() { - if (!taskQueue.IsValueCreated) - return; + IModel capturedConsumeModel; + IModel capturedPublishModel; + RabbitMQ.Client.IConnection capturedConnection; - await taskQueue.Value.Add(() => + lock (connectionLock) { isClosing = true; + capturedConsumeModel = consumeChannelModel; + capturedPublishModel = publishChannelModel; + capturedConnection = connection; - if (channelInstance != null) + consumeChannelModel = null; + publishChannelModel = null; + connection = null; + } + + // Empty the queue + await consumeChannel.Reset(); + await publishChannel.Reset(); + + // No need to close the channels as the connection will be closed + capturedConsumeModel.Dispose(); + capturedPublishModel.Dispose(); + + // ReSharper disable once InvertIf + if (capturedConnection != null) + { + try { - channelInstance.Dispose(); - channelInstance = null; + capturedConnection.Close(); } - - // ReSharper disable once InvertIf - if (connection != null) + finally { - connection.Dispose(); - connection = null; + capturedConnection.Dispose(); } - - taskQueue.Value.Dispose(); - }); + } } - private static readonly List TransientStatusCodes = new List + private static readonly List TransientStatusCodes = new() { HttpStatusCode.GatewayTimeout, HttpStatusCode.RequestTimeout, @@ -545,39 +579,37 @@ namespace Tapeti.Connection { var requestUri = new Uri($"http://{connectionParams.HostName}:{connectionParams.ManagementPort}/api/{path}"); - using (var request = new HttpRequestMessage(HttpMethod.Get, requestUri)) + using var request = new HttpRequestMessage(HttpMethod.Get, requestUri); + var retryDelayIndex = 0; + + while (true) { - var retryDelayIndex = 0; - - while (true) + try { - try - { - var response = await managementClient.SendAsync(request); - return await handleResponse(response); - } - catch (TimeoutException) - { - } - catch (WebException e) - { - if (!(e.Response is HttpWebResponse response)) - throw; - - if (!TransientStatusCodes.Contains(response.StatusCode)) - throw; - } - - await Task.Delay(ExponentialBackoff[retryDelayIndex]); - - if (retryDelayIndex < ExponentialBackoff.Length - 1) - retryDelayIndex++; + var response = await managementClient.SendAsync(request); + return await handleResponse(response); } + catch (TimeoutException) + { + } + catch (WebException e) + { + if (!(e.Response is HttpWebResponse response)) + throw; + + if (!TransientStatusCodes.Contains(response.StatusCode)) + throw; + } + + await Task.Delay(ExponentialBackoff[retryDelayIndex]); + + if (retryDelayIndex < ExponentialBackoff.Length - 1) + retryDelayIndex++; } } - private readonly HashSet declaredExchanges = new HashSet(); + private readonly HashSet declaredExchanges = new(); private void DeclareExchange(IModel channel, string exchange) { @@ -593,163 +625,182 @@ namespace Tapeti.Connection } - private async Task Queue(Action operation) + private TapetiChannel GetTapetiChannel(TapetiChannelType channelType) { - await taskQueue.Value.Add(() => - { - var channel = GetChannel(); - operation(channel); - }); + return channelType == TapetiChannelType.Publish + ? publishChannel + : consumeChannel; } - - private async Task QueueWithRetryableChannel(Action operation) - { - await taskQueue.Value.Add(() => - { - WithRetryableChannel(operation); - }); - } - - + /// /// Only call this from a task in the taskQueue to ensure IModel is only used /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. /// - private void WithRetryableChannel(Action operation) + private IModel GetModel(TapetiChannelType channelType) { - while (true) + lock (connectionLock) { - try + var channel = channelType == TapetiChannelType.Publish + ? publishChannelModel + : consumeChannelModel; + + if (channel != null && channel.IsOpen) + return channel; + + // If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ + // not related to the connection), wait for a bit to avoid spamming the connection + if ((DateTime.UtcNow - connectedDateTime).TotalMilliseconds <= MinimumConnectedReconnectDelay) + Thread.Sleep(ReconnectDelay); + + + var connectionFactory = new ConnectionFactory { - operation(GetChannel()); - break; - } - catch (AlreadyClosedException) - { - } - } - } + HostName = connectionParams.HostName, + Port = connectionParams.Port, + VirtualHost = connectionParams.VirtualHost, + UserName = connectionParams.Username, + Password = connectionParams.Password, + AutomaticRecoveryEnabled = false, + TopologyRecoveryEnabled = false, + RequestedHeartbeat = TimeSpan.FromSeconds(30) + }; - - /// - /// Only call this from a task in the taskQueue to ensure IModel is only used - /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. - /// - private IModel GetChannel() - { - if (channelInstance != null && channelInstance.IsOpen) - return channelInstance; - - // If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ - // not related to the connection), wait for a bit to avoid spamming the connection - if ((DateTime.UtcNow - connectedDateTime).TotalMilliseconds <= MinimumConnectedReconnectDelay) - Thread.Sleep(ReconnectDelay); - - - var connectionFactory = new ConnectionFactory - { - HostName = connectionParams.HostName, - Port = connectionParams.Port, - VirtualHost = connectionParams.VirtualHost, - UserName = connectionParams.Username, - Password = connectionParams.Password, - AutomaticRecoveryEnabled = false, - TopologyRecoveryEnabled = false, - RequestedHeartbeat = TimeSpan.FromSeconds(30) - }; - - if (connectionParams.ClientProperties != null) - foreach (var pair in connectionParams.ClientProperties) - { - if (connectionFactory.ClientProperties.ContainsKey(pair.Key)) - connectionFactory.ClientProperties[pair.Key] = Encoding.UTF8.GetBytes(pair.Value); - else - connectionFactory.ClientProperties.Add(pair.Key, Encoding.UTF8.GetBytes(pair.Value)); - } - - - while (true) - { - try - { - logger.Connect(new ConnectContext(connectionParams, isReconnect)); - - connection = connectionFactory.CreateConnection(); - channelInstance = connection.CreateModel(); - - if (channelInstance == null) - throw new BrokerUnreachableException(null); - - if (config.Features.PublisherConfirms) + if (connectionParams.ClientProperties != null) + foreach (var pair in connectionParams.ClientProperties) { - lastDeliveryTag = 0; - - Monitor.Enter(confirmLock); - try - { - foreach (var pair in confirmMessages) - pair.Value.CompletionSource.SetCanceled(); - - confirmMessages.Clear(); - } - finally - { - Monitor.Exit(confirmLock); - } - - channelInstance.ConfirmSelect(); + if (connectionFactory.ClientProperties.ContainsKey(pair.Key)) + connectionFactory.ClientProperties[pair.Key] = Encoding.UTF8.GetBytes(pair.Value); + else + connectionFactory.ClientProperties.Add(pair.Key, Encoding.UTF8.GetBytes(pair.Value)); } - if (connectionParams.PrefetchCount > 0) - channelInstance.BasicQos(0, connectionParams.PrefetchCount, false); - channelInstance.ModelShutdown += (sender, e) => - { - ConnectionEventListener?.Disconnected(new DisconnectedEventArgs - { - ReplyCode = e.ReplyCode, - ReplyText = e.ReplyText - }); - - logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText)); - - channelInstance = null; - - if (!isClosing) - taskQueue.Value.Add(() => WithRetryableChannel(channel => { })); - }; - - channelInstance.BasicReturn += HandleBasicReturn; - channelInstance.BasicAcks += HandleBasicAck; - channelInstance.BasicNacks += HandleBasicNack; - - connectedDateTime = DateTime.UtcNow; - - var connectedEventArgs = new ConnectedEventArgs - { - ConnectionParams = connectionParams, - LocalPort = connection.LocalPort - }; - - if (isReconnect) - ConnectionEventListener?.Reconnected(connectedEventArgs); - else - ConnectionEventListener?.Connected(connectedEventArgs); - - logger.ConnectSuccess(new ConnectContext(connectionParams, isReconnect, connection.LocalPort)); - isReconnect = true; - - break; - } - catch (BrokerUnreachableException e) + while (true) { - logger.ConnectFailed(new ConnectContext(connectionParams, isReconnect, exception: e)); - Thread.Sleep(ReconnectDelay); - } - } + try + { + if (connection != null) + { + try + { + connection.Close(); + } + finally + { + connection.Dispose(); + } - return channelInstance; + connection = null; + } + + logger.Connect(new ConnectContext(connectionParams, isReconnect)); + + connection = connectionFactory.CreateConnection(); + consumeChannelModel = connection.CreateModel(); + if (consumeChannel == null) + throw new BrokerUnreachableException(null); + + publishChannelModel = connection.CreateModel(); + if (publishChannel == null) + throw new BrokerUnreachableException(null); + + + if (config.Features.PublisherConfirms) + { + lastDeliveryTag = 0; + + Monitor.Enter(confirmLock); + try + { + foreach (var pair in confirmMessages) + pair.Value.CompletionSource.SetCanceled(); + + confirmMessages.Clear(); + } + finally + { + Monitor.Exit(confirmLock); + } + + publishChannelModel.ConfirmSelect(); + } + + if (connectionParams.PrefetchCount > 0) + consumeChannelModel.BasicQos(0, connectionParams.PrefetchCount, false); + + var capturedConsumeChannelModel = consumeChannelModel; + consumeChannelModel.ModelShutdown += (_, e) => + { + lock (connectionLock) + { + if (consumeChannelModel == null || consumeChannelModel != capturedConsumeChannelModel) + return; + + consumeChannelModel = null; + } + + ConnectionEventListener?.Disconnected(new DisconnectedEventArgs + { + ReplyCode = e.ReplyCode, + ReplyText = e.ReplyText + }); + + logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText)); + + // Reconnect if the disconnect was unexpected + if (!isClosing) + GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(_ => { }); + }; + + var capturedPublishChannelModel = publishChannelModel; + publishChannelModel.ModelShutdown += (_, _) => + { + lock (connectionLock) + { + if (publishChannelModel == null || publishChannelModel != capturedPublishChannelModel) + return; + + publishChannelModel = null; + } + + // No need to reconnect, the next Publish will + }; + + + publishChannelModel.BasicReturn += HandleBasicReturn; + publishChannelModel.BasicAcks += HandleBasicAck; + publishChannelModel.BasicNacks += HandleBasicNack; + + connectedDateTime = DateTime.UtcNow; + + var connectedEventArgs = new ConnectedEventArgs + { + ConnectionParams = connectionParams, + LocalPort = connection.LocalPort + }; + + if (isReconnect) + ConnectionEventListener?.Reconnected(connectedEventArgs); + else + ConnectionEventListener?.Connected(connectedEventArgs); + + logger.ConnectSuccess(new ConnectContext(connectionParams, isReconnect, connection.LocalPort)); + isReconnect = true; + + break; + } + catch (BrokerUnreachableException e) + { + logger.ConnectFailed(new ConnectContext(connectionParams, isReconnect, exception: e)); + Thread.Sleep(ReconnectDelay); + } + } + + return channelType == TapetiChannelType.Publish + ? publishChannelModel + : consumeChannelModel; + } } diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 19fb870..6d97155 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -57,7 +57,7 @@ namespace Tapeti.Connection } catch (Exception dispatchException) { - using (var emptyContext = new MessageContext + await using var emptyContext = new MessageContext { Config = config, Queue = queueName, @@ -66,12 +66,11 @@ namespace Tapeti.Connection Message = message, Properties = properties, Binding = null - }) - { - var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException); - HandleException(exceptionContext); - return exceptionContext.ConsumeResult; - } + }; + + var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException); + HandleException(exceptionContext); + return exceptionContext.ConsumeResult; } } @@ -100,7 +99,7 @@ namespace Tapeti.Connection private async Task InvokeUsingBinding(object message, MessageContextData messageContextData, IBinding binding) { - using (var context = new MessageContext + await using var context = new MessageContext { Config = config, Queue = queueName, @@ -109,25 +108,24 @@ namespace Tapeti.Connection Message = message, Properties = messageContextData.Properties, Binding = binding - }) + }; + + try { - try - { - await MiddlewareHelper.GoAsync(config.Middleware.Message, - async (handler, next) => await handler.Handle(context, next), - async () => { await binding.Invoke(context); }); + await MiddlewareHelper.GoAsync(config.Middleware.Message, + async (handler, next) => await handler.Handle(context, next), + async () => { await binding.Invoke(context); }); - await binding.Cleanup(context, ConsumeResult.Success); - return ConsumeResult.Success; - } - catch (Exception invokeException) - { - var exceptionContext = new ExceptionStrategyContext(context, invokeException); - HandleException(exceptionContext); + await binding.Cleanup(context, ConsumeResult.Success); + return ConsumeResult.Success; + } + catch (Exception invokeException) + { + var exceptionContext = new ExceptionStrategyContext(context, invokeException); + HandleException(exceptionContext); - await binding.Cleanup(context, exceptionContext.ConsumeResult); - return exceptionContext.ConsumeResult; - } + await binding.Cleanup(context, exceptionContext.ConsumeResult); + return exceptionContext.ConsumeResult; } } @@ -158,18 +156,12 @@ namespace Tapeti.Connection private static bool IgnoreExceptionDuringShutdown(Exception e) { - switch (e) + return e switch { - case AggregateException aggregateException: - return aggregateException.InnerExceptions.Any(IgnoreExceptionDuringShutdown); - - case TaskCanceledException _: - case OperationCanceledException _: // thrown by CancellationTokenSource.ThrowIfCancellationRequested - return true; - - default: - return e.InnerException != null && IgnoreExceptionDuringShutdown(e.InnerException); - } + AggregateException aggregateException => aggregateException.InnerExceptions.Any(IgnoreExceptionDuringShutdown), + TaskCanceledException or OperationCanceledException => true, + _ => e.InnerException != null && IgnoreExceptionDuringShutdown(e.InnerException) + }; } diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs index 9955aa4..c590528 100644 --- a/Tapeti/Connection/TapetiPublisher.cs +++ b/Tapeti/Connection/TapetiPublisher.cs @@ -118,9 +118,7 @@ namespace Tapeti.Connection { var writableProperties = new MessageProperties(properties); - if (!writableProperties.Timestamp.HasValue) - writableProperties.Timestamp = DateTime.UtcNow; - + writableProperties.Timestamp ??= DateTime.UtcNow; writableProperties.Persistent = true; diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 537a030..89c00da 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -13,7 +13,7 @@ namespace Tapeti.Connection private readonly Func clientFactory; private readonly ITapetiConfig config; private bool consuming; - private readonly List consumerTags = new List(); + private readonly List consumerTags = new(); private CancellationTokenSource initializeCancellationTokenSource; @@ -166,7 +166,7 @@ namespace Tapeti.Connection public List MessageClasses; } - private readonly Dictionary> dynamicQueues = new Dictionary>(); + private readonly Dictionary> dynamicQueues = new(); protected CustomBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) @@ -277,8 +277,8 @@ namespace Tapeti.Connection private class DeclareDurableQueuesBindingTarget : CustomBindingTarget { - private readonly Dictionary> durableQueues = new Dictionary>(); - private readonly HashSet obsoleteDurableQueues = new HashSet(); + private readonly Dictionary> durableQueues = new(); + private readonly HashSet obsoleteDurableQueues = new(); public DeclareDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken) @@ -358,7 +358,7 @@ namespace Tapeti.Connection private class PassiveDurableQueuesBindingTarget : CustomBindingTarget { - private readonly List durableQueues = new List(); + private readonly List durableQueues = new(); public PassiveDurableQueuesBindingTarget(Func clientFactory, IRoutingKeyStrategy routingKeyStrategy, IExchangeStrategy exchangeStrategy, CancellationToken cancellationToken) : base(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken) diff --git a/Tapeti/Default/ControllerBindingContext.cs b/Tapeti/Default/ControllerBindingContext.cs index 9318c98..57ee673 100644 --- a/Tapeti/Default/ControllerBindingContext.cs +++ b/Tapeti/Default/ControllerBindingContext.cs @@ -9,7 +9,7 @@ namespace Tapeti.Default internal class ControllerBindingContext : IControllerBindingContext { private BindingTargetMode? bindingTargetMode; - private readonly List middleware = new List(); + private readonly List middleware = new(); private readonly List parameters; private readonly ControllerBindingResult result; diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index 1d52cb4..66fe846 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -160,37 +160,35 @@ namespace Tapeti.Default public async Task Invoke(IMessageContext context) { var controller = dependencyResolver.Resolve(bindingInfo.ControllerType); - - using (var controllerContext = new ControllerMessageContext(context) + + await using var controllerContext = new ControllerMessageContext(context) { Controller = controller - }) - { - if (!await FilterAllowed(controllerContext)) - return; + }; + + if (!await FilterAllowed(controllerContext)) + return; - await MiddlewareHelper.GoAsync( - bindingInfo.MessageMiddleware, - async (handler, next) => await handler.Handle(controllerContext, next), - async () => await messageHandler(controllerContext)); - } + await MiddlewareHelper.GoAsync( + bindingInfo.MessageMiddleware, + async (handler, next) => await handler.Handle(controllerContext, next), + async () => await messageHandler(controllerContext)); } /// public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult) { - using (var controllerContext = new ControllerMessageContext(context) + await using var controllerContext = new ControllerMessageContext(context) { Controller = null - }) - { - await MiddlewareHelper.GoAsync( - bindingInfo.CleanupMiddleware, - async (handler, next) => await handler.Cleanup(controllerContext, consumeResult, next), - () => Task.CompletedTask); - } + }; + + await MiddlewareHelper.GoAsync( + bindingInfo.CleanupMiddleware, + async (handler, next) => await handler.Cleanup(controllerContext, consumeResult, next), + () => Task.CompletedTask); } diff --git a/Tapeti/Default/JsonMessageSerializer.cs b/Tapeti/Default/JsonMessageSerializer.cs index cebed7d..4ca8ae4 100644 --- a/Tapeti/Default/JsonMessageSerializer.cs +++ b/Tapeti/Default/JsonMessageSerializer.cs @@ -16,8 +16,8 @@ namespace Tapeti.Default private const string ClassTypeHeader = "classType"; - private readonly ConcurrentDictionary deserializedTypeNames = new ConcurrentDictionary(); - private readonly ConcurrentDictionary serializedTypeNames = new ConcurrentDictionary(); + private readonly ConcurrentDictionary deserializedTypeNames = new(); + private readonly ConcurrentDictionary serializedTypeNames = new(); private readonly JsonSerializerSettings serializerSettings; diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs index 9612990..1729dc3 100644 --- a/Tapeti/Default/MessageContext.cs +++ b/Tapeti/Default/MessageContext.cs @@ -7,7 +7,7 @@ namespace Tapeti.Default { internal class MessageContext : IMessageContext { - private readonly Dictionary items = new Dictionary(); + private readonly Dictionary items = new(); /// diff --git a/Tapeti/Default/MessageProperties.cs b/Tapeti/Default/MessageProperties.cs index 3184da9..8227934 100644 --- a/Tapeti/Default/MessageProperties.cs +++ b/Tapeti/Default/MessageProperties.cs @@ -10,7 +10,7 @@ namespace Tapeti.Default /// public class MessageProperties : IMessageProperties { - private readonly Dictionary headers = new Dictionary(); + private readonly Dictionary headers = new(); /// diff --git a/Tapeti/Default/NamespaceMatchExchangeStrategy.cs b/Tapeti/Default/NamespaceMatchExchangeStrategy.cs index 3c4465e..54d16d0 100644 --- a/Tapeti/Default/NamespaceMatchExchangeStrategy.cs +++ b/Tapeti/Default/NamespaceMatchExchangeStrategy.cs @@ -13,7 +13,7 @@ namespace Tapeti.Default /// public class NamespaceMatchExchangeStrategy : IExchangeStrategy { - private static readonly Regex NamespaceRegex = new Regex("^(Messaging\\.)?(?[^\\.]+)", RegexOptions.Compiled | RegexOptions.Singleline); + private static readonly Regex NamespaceRegex = new("^(Messaging\\.)?(?[^\\.]+)", RegexOptions.Compiled | RegexOptions.Singleline); /// diff --git a/Tapeti/Default/RabbitMQMessageProperties.cs b/Tapeti/Default/RabbitMQMessageProperties.cs index 0b524ae..cdce19b 100644 --- a/Tapeti/Default/RabbitMQMessageProperties.cs +++ b/Tapeti/Default/RabbitMQMessageProperties.cs @@ -87,8 +87,7 @@ namespace Tapeti.Default /// public void SetHeader(string name, string value) { - if (BasicProperties.Headers == null) - BasicProperties.Headers = new Dictionary(); + BasicProperties.Headers ??= new Dictionary(); if (BasicProperties.Headers.ContainsKey(name)) BasicProperties.Headers[name] = Encoding.UTF8.GetBytes(value); diff --git a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs index f31af08..35ee8d8 100644 --- a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs +++ b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs @@ -28,9 +28,9 @@ namespace Tapeti.Default (?(?<=[A-Z])[A-Z](?=[a-z])|[A-Z]) )"; - private static readonly Regex SeparatorRegex = new Regex(SeparatorPattern, RegexOptions.IgnorePatternWhitespace | RegexOptions.Compiled); + private static readonly Regex SeparatorRegex = new(SeparatorPattern, RegexOptions.IgnorePatternWhitespace | RegexOptions.Compiled); - private static readonly ConcurrentDictionary RoutingKeyCache = new ConcurrentDictionary(); + private static readonly ConcurrentDictionary RoutingKeyCache = new(); /// diff --git a/Tapeti/Helpers/ConnectionstringParser.cs b/Tapeti/Helpers/ConnectionstringParser.cs index 4bab299..f49f35a 100644 --- a/Tapeti/Helpers/ConnectionstringParser.cs +++ b/Tapeti/Helpers/ConnectionstringParser.cs @@ -8,7 +8,7 @@ namespace Tapeti.Helpers /// public class ConnectionStringParser { - private readonly TapetiConnectionParams result = new TapetiConnectionParams(); + private readonly TapetiConnectionParams result = new(); private readonly string connectionstring; private int pos = -1; diff --git a/Tapeti/ISubscriber.cs b/Tapeti/ISubscriber.cs index a5e9975..b3128fe 100644 --- a/Tapeti/ISubscriber.cs +++ b/Tapeti/ISubscriber.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; // ReSharper disable UnusedMember.Global +// ReSharper disable UnusedMemberInSuper.Global namespace Tapeti { diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 51630ec..4751189 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -11,6 +11,7 @@ Unlicense https://github.com/MvRens/Tapeti Tapeti.png + latest diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 2746fbc..57abd6e 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -18,7 +18,7 @@ namespace Tapeti public class TapetiConfig : ITapetiConfigBuilder, ITapetiConfigBuilderAccess { private Config config; - private readonly List bindingMiddleware = new List(); + private readonly List bindingMiddleware = new(); /// @@ -112,7 +112,7 @@ namespace Tapeti default: throw new ArgumentException( - $"Unsupported middleware implementation: {(middleware == null ? "null" : middleware.GetType().Name)}"); + $"Unsupported middleware implementation: {middleware?.GetType().Name ?? "null"}"); } } } @@ -224,9 +224,9 @@ namespace Tapeti /// internal class Config : ITapetiConfig { - private readonly ConfigFeatures features = new ConfigFeatures(); - private readonly ConfigMiddleware middleware = new ConfigMiddleware(); - private readonly ConfigBindings bindings = new ConfigBindings(); + private readonly ConfigFeatures features = new(); + private readonly ConfigMiddleware middleware = new(); + private readonly ConfigBindings bindings = new(); public IDependencyResolver DependencyResolver { get; } public ITapetiConfigFeatues Features => features; @@ -290,8 +290,8 @@ namespace Tapeti internal class ConfigMiddleware : ITapetiConfigMiddleware { - private readonly List messageMiddleware = new List(); - private readonly List publishMiddleware = new List(); + private readonly List messageMiddleware = new(); + private readonly List publishMiddleware = new(); public IReadOnlyList Message => messageMiddleware; diff --git a/Tapeti/TapetiConnectionParams.cs b/Tapeti/TapetiConnectionParams.cs index a3a6ea6..5e2258e 100644 --- a/Tapeti/TapetiConnectionParams.cs +++ b/Tapeti/TapetiConnectionParams.cs @@ -60,7 +60,7 @@ namespace Tapeti /// will be overwritten. See DefaultClientProperties in Connection.cs in the RabbitMQ .NET client source for the default values. /// public IDictionary ClientProperties { - get => clientProperties ?? (clientProperties = new Dictionary()); + get => clientProperties ??= new Dictionary(); set => clientProperties = value; } diff --git a/Tapeti/Tasks/SingleThreadTaskQueue.cs b/Tapeti/Tasks/SingleThreadTaskQueue.cs index acee8c7..c08a1d4 100644 --- a/Tapeti/Tasks/SingleThreadTaskQueue.cs +++ b/Tapeti/Tasks/SingleThreadTaskQueue.cs @@ -12,10 +12,10 @@ namespace Tapeti.Tasks /// public class SingleThreadTaskQueue : IDisposable { - private readonly object previousTaskLock = new object(); + private readonly object previousTaskLock = new(); private Task previousTask = Task.CompletedTask; - private readonly Lazy singleThreadScheduler = new Lazy(); + private readonly Lazy singleThreadScheduler = new(); /// @@ -26,7 +26,7 @@ namespace Tapeti.Tasks { lock (previousTaskLock) { - previousTask = previousTask.ContinueWith(t => action(), CancellationToken.None + previousTask = previousTask.ContinueWith(_ => action(), CancellationToken.None , TaskContinuationOptions.None , singleThreadScheduler.Value); @@ -43,7 +43,7 @@ namespace Tapeti.Tasks { lock (previousTaskLock) { - var task = previousTask.ContinueWith(t => func(), CancellationToken.None + var task = previousTask.ContinueWith(_ => func(), CancellationToken.None , TaskContinuationOptions.None , singleThreadScheduler.Value); @@ -70,7 +70,7 @@ namespace Tapeti.Tasks public override int MaximumConcurrencyLevel => 1; - private readonly Queue scheduledTasks = new Queue(); + private readonly Queue scheduledTasks = new(); private bool disposed;