diff --git a/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj b/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj index bb77e7f..a4a9a3c 100644 --- a/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj +++ b/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj @@ -4,6 +4,7 @@ Exe net6.0 _01_PublishSubscribe + enable diff --git a/Examples/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj b/Examples/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj index 7fafac3..816a9c6 100644 --- a/Examples/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj +++ b/Examples/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj @@ -4,6 +4,7 @@ Exe net6.0 _02_DeclareDurableQueues + enable diff --git a/Examples/03-FlowRequestResponse/03-FlowRequestResponse.csproj b/Examples/03-FlowRequestResponse/03-FlowRequestResponse.csproj index 3dbc0e7..193404a 100644 --- a/Examples/03-FlowRequestResponse/03-FlowRequestResponse.csproj +++ b/Examples/03-FlowRequestResponse/03-FlowRequestResponse.csproj @@ -4,6 +4,7 @@ Exe net6.0 _03_FlowRequestResponse + enable diff --git a/Examples/03-FlowRequestResponse/ParallelFlowController.cs b/Examples/03-FlowRequestResponse/ParallelFlowController.cs index 9a8dc7a..00f9687 100644 --- a/Examples/03-FlowRequestResponse/ParallelFlowController.cs +++ b/Examples/03-FlowRequestResponse/ParallelFlowController.cs @@ -15,9 +15,9 @@ namespace _03_FlowRequestResponse private readonly IFlowProvider flowProvider; private readonly IExampleState exampleState; - public string FirstQuote; - public string SecondQuote; - public string ThirdQuote; + public string? FirstQuote; + public string? SecondQuote; + public string? ThirdQuote; public ParallelFlowController(IFlowProvider flowProvider, IExampleState exampleState) diff --git a/Examples/04-Transient/04-Transient.csproj b/Examples/04-Transient/04-Transient.csproj index 2a9a0d4..5d94ffb 100644 --- a/Examples/04-Transient/04-Transient.csproj +++ b/Examples/04-Transient/04-Transient.csproj @@ -4,6 +4,7 @@ Exe net6.0 _04_Transient + enable diff --git a/Examples/05-SpeedTest/05-SpeedTest.csproj b/Examples/05-SpeedTest/05-SpeedTest.csproj index 12506dc..a53241c 100644 --- a/Examples/05-SpeedTest/05-SpeedTest.csproj +++ b/Examples/05-SpeedTest/05-SpeedTest.csproj @@ -4,6 +4,7 @@ Exe net6.0 _05_SpeedTest + enable diff --git a/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj b/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj index 654499b..2ab68a4 100644 --- a/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj +++ b/Examples/06-StatelessRequestResponse/06-StatelessRequestResponse.csproj @@ -4,6 +4,7 @@ Exe net6.0 _06_StatelessRequestResponse + enable diff --git a/Examples/07-ParallelizationTest/07-ParallelizationTest.csproj b/Examples/07-ParallelizationTest/07-ParallelizationTest.csproj index 47c3eb4..f799228 100644 --- a/Examples/07-ParallelizationTest/07-ParallelizationTest.csproj +++ b/Examples/07-ParallelizationTest/07-ParallelizationTest.csproj @@ -4,6 +4,7 @@ Exe net6.0 _07_ParallelizationTest + enable diff --git a/Examples/07-ParallelizationTest/Program.cs b/Examples/07-ParallelizationTest/Program.cs index 4464a55..4011782 100644 --- a/Examples/07-ParallelizationTest/Program.cs +++ b/Examples/07-ParallelizationTest/Program.cs @@ -107,7 +107,7 @@ namespace _07_ParallelizationTest private int count; private readonly object waitLock = new(); private TaskCompletionSource batchReachedTask = new(); - private Timer messageExpectedTimer; + private Timer? messageExpectedTimer; private readonly TimeSpan messageExpectedTimeout = TimeSpan.FromMilliseconds(5000); diff --git a/Examples/08-MessageHandlerLogging/08-MessageHandlerLogging.csproj b/Examples/08-MessageHandlerLogging/08-MessageHandlerLogging.csproj index d541909..cc1f063 100644 --- a/Examples/08-MessageHandlerLogging/08-MessageHandlerLogging.csproj +++ b/Examples/08-MessageHandlerLogging/08-MessageHandlerLogging.csproj @@ -4,6 +4,7 @@ Exe net6.0 _08_MessageHandlerLogging + enable diff --git a/Examples/ExampleLib/ExampleLib.csproj b/Examples/ExampleLib/ExampleLib.csproj index 1f4adc0..5d5c22e 100644 --- a/Examples/ExampleLib/ExampleLib.csproj +++ b/Examples/ExampleLib/ExampleLib.csproj @@ -3,6 +3,7 @@ net6.0 true + enable diff --git a/Tapeti.Autofac/AutofacDependencyResolver.cs b/Tapeti.Autofac/AutofacDependencyResolver.cs index 9346674..a2c48ae 100644 --- a/Tapeti.Autofac/AutofacDependencyResolver.cs +++ b/Tapeti.Autofac/AutofacDependencyResolver.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics.CodeAnalysis; using Autofac; using Autofac.Builder; @@ -14,22 +15,21 @@ namespace Tapeti.Autofac /// public class AutofacDependencyResolver : IDependencyContainer { - private ContainerBuilder containerBuilder; - private IContainer container; + private ContainerBuilder? containerBuilder; + private IContainer? container; /// - /// The built container. Either set directly, or use the Build method to built the + /// The built container. Either set directly, or use the Build method to build the /// update this reference. /// public IContainer Container { - get => container; + get => container ?? throw new ArgumentNullException(nameof(container)); set { container = value; - if (value != null) - containerBuilder = null; + containerBuilder = null; } } @@ -50,7 +50,7 @@ namespace Tapeti.Autofac CheckContainerBuilder(); Container = containerBuilder.Build(options); - return container; + return Container; } @@ -141,6 +141,7 @@ namespace Tapeti.Autofac } + [MemberNotNull(nameof(containerBuilder))] private void CheckContainerBuilder() { if (containerBuilder == null) diff --git a/Tapeti.Autofac/Tapeti.Autofac.csproj b/Tapeti.Autofac/Tapeti.Autofac.csproj index 57996d9..10e4b50 100644 --- a/Tapeti.Autofac/Tapeti.Autofac.csproj +++ b/Tapeti.Autofac/Tapeti.Autofac.csproj @@ -12,6 +12,7 @@ Tapeti.SimpleInjector.png 2.0.0 9 + enable diff --git a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj index 21b5dfb..bd1af38 100644 --- a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj +++ b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj @@ -12,6 +12,7 @@ Tapeti.SimpleInjector.png 2.0.0 9 + enable diff --git a/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs b/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs index 6b3a723..63dbd0d 100644 --- a/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs +++ b/Tapeti.DataAnnotations/DataAnnotationsMessageMiddleware.cs @@ -13,6 +13,9 @@ namespace Tapeti.DataAnnotations /// public ValueTask Handle(IMessageContext context, Func next) { + if (context.Message == null) + return next(); + var validationContext = new ValidationContext(context.Message); Validator.ValidateObject(context.Message, validationContext, true); diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj index 38a9699..713493f 100644 --- a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj +++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj @@ -12,6 +12,7 @@ Tapeti.DataAnnotations.png 2.0.0 9 + enable diff --git a/Tapeti.Flow.SQL/ConfigExtensions.cs b/Tapeti.Flow.SQL/ConfigExtensions.cs index ef285d9..a70cc22 100644 --- a/Tapeti.Flow.SQL/ConfigExtensions.cs +++ b/Tapeti.Flow.SQL/ConfigExtensions.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Linq; using Tapeti.Config; // ReSharper disable UnusedMember.Global @@ -45,7 +46,7 @@ namespace Tapeti.Flow.SQL public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) { - return null; + return Enumerable.Empty(); } } } diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs index afe5611..3cc02af 100644 --- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs +++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs @@ -58,7 +58,8 @@ namespace Tapeti.Flow.SQL var stateJson = flowReader.GetString(2); var state = JsonConvert.DeserializeObject(stateJson); - result.Add(new FlowRecord(flowID, creationTime, state)); + if (state != null) + result.Add(new FlowRecord(flowID, creationTime, state)); } return result; diff --git a/Tapeti.Flow.SQL/SqlExceptionHelper.cs b/Tapeti.Flow.SQL/SqlExceptionHelper.cs index 5237a6e..dc00d31 100644 --- a/Tapeti.Flow.SQL/SqlExceptionHelper.cs +++ b/Tapeti.Flow.SQL/SqlExceptionHelper.cs @@ -38,11 +38,13 @@ namespace Tapeti.Flow.SQL /// /// Extracts alls SqlExceptions from the main and inner or aggregate exceptions /// - public static IEnumerable ExtractSqlExceptions(Exception e) + public static IEnumerable ExtractSqlExceptions(Exception exception) { - while (e != null) + var exceptionHead = exception; + + while (exceptionHead != null) { - switch (e) + switch (exceptionHead) { case AggregateException aggregateException: foreach (var innerException in aggregateException.InnerExceptions) @@ -56,7 +58,8 @@ namespace Tapeti.Flow.SQL yield return sqlException; break; } - e = e.InnerException; + + exceptionHead = exceptionHead.InnerException; } } @@ -66,12 +69,14 @@ namespace Tapeti.Flow.SQL /// public static IEnumerable UnwrapSqlErrors(SqlException exception) { - while (exception != null) + var exceptionHead = exception; + + while (exceptionHead != null) { - foreach (SqlError error in exception.Errors) + foreach (SqlError error in exceptionHead.Errors) yield return error; - exception = exception.InnerException as SqlException; + exceptionHead = exceptionHead.InnerException as SqlException; } } diff --git a/Tapeti.Flow.SQL/SqlRetryHelper.cs b/Tapeti.Flow.SQL/SqlRetryHelper.cs index 51068d7..5db5b60 100644 --- a/Tapeti.Flow.SQL/SqlRetryHelper.cs +++ b/Tapeti.Flow.SQL/SqlRetryHelper.cs @@ -54,7 +54,7 @@ namespace Tapeti.Flow.SQL returnValue = await callback(); }); - return returnValue; + return returnValue!; } } } diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj index 8385eb5..0a4ca77 100644 --- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj +++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj @@ -12,6 +12,7 @@ Tapeti.Flow.SQL.png 2.0.0 9 + enable diff --git a/Tapeti.Flow/ConfigExtensions.cs b/Tapeti.Flow/ConfigExtensions.cs index 5bc973d..cfc72a4 100644 --- a/Tapeti.Flow/ConfigExtensions.cs +++ b/Tapeti.Flow/ConfigExtensions.cs @@ -15,7 +15,7 @@ namespace Tapeti.Flow /// /// An optional IFlowRepository implementation to persist flow state. If not provided, flow state will be lost when the application restarts. /// - public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository flowRepository = null) + public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository? flowRepository = null) { config.Use(new FlowExtension(flowRepository)); return config; diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 1cb9d6d..8fd3d20 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Linq; using System.Reflection; using System.Threading.Tasks; @@ -46,6 +47,9 @@ namespace Tapeti.Flow.Default { context.Result.SetHandler(async (messageContext, value) => { + if (value == null) + throw new InvalidOperationException("Return value should be a Task, not null"); + await (Task)value; await HandleParallelResponse(messageContext); }); @@ -54,6 +58,10 @@ namespace Tapeti.Flow.Default { context.Result.SetHandler(async (messageContext, value) => { + if (value == null) + // ValueTask is a struct and should never be null + throw new UnreachableException("Return value should be a ValueTask, not null"); + await (ValueTask)value; await HandleParallelResponse(messageContext); }); @@ -82,24 +90,35 @@ namespace Tapeti.Flow.Default switch (taskType) { case TaskType.None: - context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value)); + context.Result.SetHandler((messageContext, value) => + { + if (value == null) + throw new InvalidOperationException("Return value should be an IYieldPoint, not null"); + + return HandleYieldPoint(messageContext, (IYieldPoint)value); + }); break; case TaskType.Task: context.Result.SetHandler(async (messageContext, value) => { + if (value == null) + throw new InvalidOperationException("Return value should be a Task, not null"); + var yieldPoint = await (Task)value; - if (yieldPoint != null) - await HandleYieldPoint(messageContext, yieldPoint); + await HandleYieldPoint(messageContext, yieldPoint); }); break; case TaskType.ValueTask: context.Result.SetHandler(async (messageContext, value) => { + if (value == null) + // ValueTask is a struct and should never be null + throw new UnreachableException("Return value should be a ValueTask, not null"); + var yieldPoint = await (ValueTask)value; - if (yieldPoint != null) - await HandleYieldPoint(messageContext, yieldPoint); + await HandleYieldPoint(messageContext, yieldPoint); }); break; @@ -140,7 +159,7 @@ namespace Tapeti.Flow.Default } - private static object ParallelRequestParameterFactory(IMessageContext context) + private static object? ParallelRequestParameterFactory(IMessageContext context) { var flowHandler = context.Config.DependencyResolver.Resolve(); return flowHandler.GetParallelRequest(new FlowHandlerContext(context)); diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs index 8de2047..1c23143 100644 --- a/Tapeti.Flow/Default/FlowContext.cs +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -6,25 +6,49 @@ namespace Tapeti.Flow.Default { internal class FlowContext : IDisposable { - public IFlowHandlerContext HandlerContext { get; set; } - public IFlowStateLock FlowStateLock { get; set; } - public FlowState FlowState { get; set; } + private readonly IFlowHandlerContext? handlerContext; + private IFlowStateLock? flowStateLock; + private FlowState? flowState; + + + public IFlowHandlerContext HandlerContext => handlerContext ?? throw new InvalidOperationException("FlowContext does not have a HandlerContext"); + public IFlowStateLock FlowStateLock => flowStateLock ?? throw new InvalidOperationException("FlowContext does not have a FlowStateLock"); + public FlowState FlowState => flowState ?? throw new InvalidOperationException("FlowContext does not have a FlowState"); + + public bool HasFlowStateAndLock => flowState != null && flowStateLock != null; public Guid ContinuationID { get; set; } - public ContinuationMetadata ContinuationMetadata { get; set; } + public ContinuationMetadata? ContinuationMetadata { get; set; } private int storeCalled; private int deleteCalled; + public FlowContext(IFlowHandlerContext handlerContext, FlowState flowState, IFlowStateLock flowStateLock) + { + this.flowState = flowState; + this.flowStateLock = flowStateLock; + this.handlerContext = handlerContext; + } + + + public FlowContext(IFlowHandlerContext handlerContext) + { + this.handlerContext = handlerContext; + } + + + public void SetFlowState(FlowState newFlowState, IFlowStateLock newFlowStateLock) + { + flowState = newFlowState; + flowStateLock = newFlowStateLock; + } + + public ValueTask Store(bool persistent) { storeCalled++; - if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext)); - if (FlowState == null) throw new ArgumentNullException(nameof(FlowState)); - if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock)); - FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(HandlerContext.Controller); return FlowStateLock.StoreFlowState(FlowState, persistent); } @@ -32,7 +56,7 @@ namespace Tapeti.Flow.Default public ValueTask Delete() { deleteCalled++; - return FlowStateLock?.DeleteFlowState() ?? default; + return flowStateLock?.DeleteFlowState() ?? default; } public bool IsStoredOrDeleted() @@ -43,7 +67,7 @@ namespace Tapeti.Flow.Default public void EnsureStoreOrDeleteIsCalled() { if (!IsStoredOrDeleted()) - throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID); + throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + flowStateLock?.FlowID); Debug.Assert(storeCalled <= 1, "Store called more than once!"); Debug.Assert(deleteCalled <= 1, "Delete called more than once!"); @@ -51,7 +75,7 @@ namespace Tapeti.Flow.Default public void Dispose() { - FlowStateLock?.Dispose(); + flowStateLock?.Dispose(); } } } diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs index f2de064..51098d3 100644 --- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs +++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs @@ -34,8 +34,12 @@ namespace Tapeti.Flow.Default if (context.TryGet(out var flowPayload)) { + if (controllerPayload.Controller == null) + throw new InvalidOperationException("Controller is not available (method is static?)"); + var flowContext = flowPayload.FlowContext; - Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, controllerPayload.Controller); + if (!string.IsNullOrEmpty(flowContext.FlowState.Data)) + Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, controllerPayload.Controller); // Remove Continuation now because the IYieldPoint result handler will store the new state flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); @@ -65,11 +69,11 @@ namespace Tapeti.Flow.Default var flowContext = flowPayload.FlowContext; - if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method)) + if (flowContext.ContinuationMetadata == null || flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method)) // Do not call when the controller method was filtered, if the same message has two methods return; - if (flowContext.FlowStateLock != null) + if (flowContext.HasFlowStateAndLock) { if (!flowContext.IsStoredOrDeleted()) // The exception strategy can set the consume result to Success. Instead, check if the yield point @@ -82,7 +86,7 @@ namespace Tapeti.Flow.Default - private static async ValueTask EnrichWithFlowContext(IMessageContext context) + private static async ValueTask EnrichWithFlowContext(IMessageContext context) { if (context.TryGet(out var flowPayload)) return flowPayload.FlowContext; @@ -106,13 +110,8 @@ namespace Tapeti.Flow.Default if (flowState == null) return null; - var flowContext = new FlowContext + var flowContext = new FlowContext(new FlowHandlerContext(context), flowState, flowStateLock) { - HandlerContext = new FlowHandlerContext(context), - - FlowStateLock = flowStateLock, - FlowState = flowState, - ContinuationID = continuationID, ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out var continuation) ? continuation : null }; diff --git a/Tapeti.Flow/Default/FlowHandlerContext.cs b/Tapeti.Flow/Default/FlowHandlerContext.cs index c533c4b..da816c6 100644 --- a/Tapeti.Flow/Default/FlowHandlerContext.cs +++ b/Tapeti.Flow/Default/FlowHandlerContext.cs @@ -10,8 +10,11 @@ namespace Tapeti.Flow.Default { /// /// - public FlowHandlerContext() + public FlowHandlerContext(ITapetiConfig config, object? controller, MethodInfo method) { + Config = config; + Controller = controller; + Method = method; } @@ -19,11 +22,7 @@ namespace Tapeti.Flow.Default /// public FlowHandlerContext(IMessageContext source) { - if (source == null) - return; - - if (!source.TryGet(out var controllerPayload)) - return; + var controllerPayload = source.Get(); Config = source.Config; Controller = controllerPayload.Controller; @@ -38,15 +37,15 @@ namespace Tapeti.Flow.Default } /// - public ITapetiConfig Config { get; set; } + public ITapetiConfig Config { get; } /// - public object Controller { get; set; } + public object? Controller { get; } /// - public MethodInfo Method { get; set; } + public MethodInfo Method { get; } /// - public IMessageContext MessageContext { get; set; } + public IMessageContext? MessageContext { get; } } } diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index eb99d18..b203961 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -32,21 +32,21 @@ namespace Tapeti.Flow.Default /// - public IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) + public IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) where TRequest : class where TResponse : class { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } /// - public IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) + public IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) where TRequest : class where TResponse : class { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } /// - public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) + public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) where TRequest : class where TResponse : class { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); @@ -59,7 +59,7 @@ namespace Tapeti.Flow.Default } /// - public IYieldPoint EndWithResponse(TResponse message) + public IYieldPoint EndWithResponse(TResponse message) where TResponse : class { return new DelegateYieldPoint(context => SendResponse(context, message)); } @@ -72,7 +72,7 @@ namespace Tapeti.Flow.Default internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true) + string? convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true) { if (context.FlowState == null) { @@ -105,9 +105,9 @@ namespace Tapeti.Flow.Default private async Task SendResponse(FlowContext context, object message) { - var reply = context.FlowState == null - ? GetReply(context.HandlerContext) - : context.FlowState.Metadata.Reply; + var reply = context.HasFlowStateAndLock + ? context.FlowState.Metadata.Reply + : GetReply(context.HandlerContext); if (reply == null) throw new YieldPointException("No response is required"); @@ -134,7 +134,7 @@ namespace Tapeti.Flow.Default { await context.Delete(); - if (context.FlowState?.Metadata.Reply != null) + if (context.HasFlowStateAndLock && context.FlowState.Metadata.Reply != null) throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); } @@ -159,16 +159,15 @@ namespace Tapeti.Flow.Default if (binding.QueueName == null) throw new ArgumentException("responseHandler is not yet subscribed to a queue, TapetiConnection.Subscribe must be called before starting a flow", nameof(responseHandler)); - return new ResponseHandlerInfo - { - MethodName = MethodSerializer.Serialize(responseHandler.Method), - ReplyToQueue = binding.QueueName, - IsDurableQueue = binding.QueueType == QueueType.Durable - }; + return new ResponseHandlerInfo( + MethodSerializer.Serialize(responseHandler.Method), + binding.QueueName, + binding.QueueType == QueueType.Durable + ); } - private static ReplyMetadata GetReply(IFlowHandlerContext context) + private static ReplyMetadata? GetReply(IFlowHandlerContext context) { var requestAttribute = context.MessageContext?.Message?.GetType().GetCustomAttribute(); if (requestAttribute?.Response == null) @@ -176,7 +175,7 @@ namespace Tapeti.Flow.Default return new ReplyMetadata { - CorrelationId = context.MessageContext.Properties.CorrelationId, + CorrelationId = context.MessageContext!.Properties.CorrelationId, ReplyTo = context.MessageContext.Properties.ReplyTo, ResponseTypeName = requestAttribute.Response.FullName, Mandatory = context.MessageContext.Properties.Persistent.GetValueOrDefault(true) @@ -188,18 +187,17 @@ namespace Tapeti.Flow.Default var flowStore = flowContext.HandlerContext.Config.DependencyResolver.Resolve(); var flowID = Guid.NewGuid(); - flowContext.FlowStateLock = await flowStore.LockFlowState(flowID); + var flowStateLock = await flowStore.LockFlowState(flowID); - if (flowContext.FlowStateLock == null) + if (flowStateLock == null) throw new InvalidOperationException("Unable to lock a new flow"); - flowContext.FlowState = new FlowState + var flowState = new FlowState { - Metadata = new FlowMetadata - { - Reply = GetReply(flowContext.HandlerContext) - } + Metadata = new FlowMetadata(GetReply(flowContext.HandlerContext)) }; + + flowContext.SetFlowState(flowState, flowStateLock); } @@ -207,20 +205,16 @@ namespace Tapeti.Flow.Default public async ValueTask Execute(IFlowHandlerContext context, IYieldPoint yieldPoint) { if (yieldPoint is not DelegateYieldPoint executableYieldPoint) - throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Method.Name}"); + throw new YieldPointException($"Yield point is required in controller {context.Controller?.GetType().Name} for method {context.Method.Name}"); - FlowContext flowContext = null; + FlowContext? flowContext = null; var disposeFlowContext = false; try { - var messageContext = context.MessageContext; - if (messageContext == null || !messageContext.TryGet(out var flowPayload)) + if (context.MessageContext == null || !context.MessageContext.TryGet(out var flowPayload)) { - flowContext = new FlowContext - { - HandlerContext = context - }; + flowContext = new FlowContext(context); // If we ended up here it is because of a Start. No point in storing the new FlowContext // in the messageContext as the yield point is the last to execute. @@ -236,7 +230,7 @@ namespace Tapeti.Flow.Default catch (YieldPointException e) { // Useful for debugging - e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName; + e.Data["Tapeti.Controller.Name"] = context.Controller?.GetType().FullName; e.Data["Tapeti.Controller.Method"] = context.Method.Name; throw; } @@ -246,15 +240,15 @@ namespace Tapeti.Flow.Default finally { if (disposeFlowContext) - flowContext.Dispose(); + flowContext?.Dispose(); } } /// - public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context) + public IFlowParallelRequest? GetParallelRequest(IFlowHandlerContext context) { - return context.MessageContext.TryGet(out var flowPayload) + return context.MessageContext != null && context.MessageContext.TryGet(out var flowPayload) ? new ParallelRequest(config, this, flowPayload.FlowContext) : null; } @@ -263,27 +257,49 @@ namespace Tapeti.Flow.Default /// public ValueTask Converge(IFlowHandlerContext context) { - return Execute(context, new DelegateYieldPoint(flowContext => - Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync))); + return Execute(context, new DelegateYieldPoint(async flowContext => + { + if (flowContext.ContinuationMetadata == null) + throw new InvalidOperationException("Missing ContinuationMetadata in FlowContext"); + + if (flowContext.ContinuationMetadata.ConvergeMethodName == null) + throw new InvalidOperationException("Missing ConvergeMethodName in FlowContext ContinuationMetadata"); + + await Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync); + })); } internal async Task Converge(FlowContext flowContext, string convergeMethodName, bool convergeMethodSync) { - IYieldPoint yieldPoint; + IYieldPoint? yieldPoint; + + if (flowContext.HandlerContext == null) + throw new InvalidOperationException($"Missing HandleContext in FlowContext for converge method {convergeMethodName}"); + + if (flowContext.HandlerContext.MessageContext == null) + throw new InvalidOperationException($"Missing MessageContext in FlowContext for converge method {convergeMethodName}"); if (!flowContext.HandlerContext.MessageContext.TryGet(out var controllerPayload)) throw new ArgumentException("Context does not contain a controller payload", nameof(flowContext)); + if (controllerPayload.Controller == null) + throw new InvalidOperationException($"Controller is not available for converge method {convergeMethodName} (method is static?)"); var method = controllerPayload.Controller.GetType().GetMethod(convergeMethodName, BindingFlags.NonPublic | BindingFlags.Instance); if (method == null) throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {convergeMethodName}"); if (convergeMethodSync) - yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] { }); + yieldPoint = (IYieldPoint?)method.Invoke(controllerPayload.Controller, new object[] { }); else - yieldPoint = await(Task)method.Invoke(controllerPayload.Controller, new object[] { }); + { + var yieldPointTask = method.Invoke(controllerPayload.Controller, new object[] { }); + if (yieldPointTask == null) + throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}"); + + yieldPoint = await (Task)yieldPointTask; + } if (yieldPoint == null) throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}"); @@ -297,8 +313,15 @@ namespace Tapeti.Flow.Default { private class RequestInfo { - public object Message { get; init; } - public ResponseHandlerInfo ResponseHandlerInfo { get; init; } + public object Message { get; } + public ResponseHandlerInfo ResponseHandlerInfo { get; } + + + public RequestInfo(object message, ResponseHandlerInfo responseHandlerInfo) + { + Message = message; + ResponseHandlerInfo = responseHandlerInfo; + } } @@ -314,32 +337,32 @@ namespace Tapeti.Flow.Default } - public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) + public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class { return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) + public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class { return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) + public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class { return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) + public IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class { return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) + public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) where TRequest : class where TResponse : class { return InternalAddRequest(message, responseHandler); } - public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) + public IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) where TRequest : class where TResponse : class { return InternalAddRequest(message, responseHandler); } @@ -347,12 +370,7 @@ namespace Tapeti.Flow.Default private IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler) { - requests.Add(new RequestInfo - { - Message = message, - ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler) - }); - + requests.Add(new RequestInfo(message, GetResponseHandlerInfo(config, message, responseHandler))); return this; } @@ -381,12 +399,12 @@ namespace Tapeti.Flow.Default }; } - if (convergeMethod?.Method == null) + if (convergeMethod.Method == null) throw new ArgumentNullException(nameof(convergeMethod)); return new DelegateYieldPoint(async context => { - if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType()) + if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller?.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); await Task.WhenAll(requests.Select(requestInfo => @@ -419,19 +437,19 @@ namespace Tapeti.Flow.Default } - public Task AddRequest(TRequest message, Func responseHandler) + public Task AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class { return InternalAddRequest(message, responseHandler); } - public Task AddRequest(TRequest message, Func responseHandler) + public Task AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class { return InternalAddRequest(message, responseHandler); } - public Task AddRequestSync(TRequest message, Action responseHandler) + public Task AddRequestSync(TRequest message, Action responseHandler) where TRequest : class where TResponse : class { return InternalAddRequest(message, responseHandler); } @@ -441,6 +459,9 @@ namespace Tapeti.Flow.Default { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); + if (flowContext.ContinuationMetadata == null) + throw new InvalidOperationException("No ContinuationMetadata in FlowContext"); + return flowProvider.SendRequest( flowContext, message, @@ -454,9 +475,17 @@ namespace Tapeti.Flow.Default internal class ResponseHandlerInfo { - public string MethodName { get; set; } - public string ReplyToQueue { get; set; } - public bool IsDurableQueue { get; set; } + public string MethodName { get; } + public string ReplyToQueue { get; } + public bool IsDurableQueue { get; } + + + public ResponseHandlerInfo(string methodName, string replyToQueue, bool isDurableQueue) + { + MethodName = methodName; + ReplyToQueue = replyToQueue; + IsDurableQueue = isDurableQueue; + } } } } diff --git a/Tapeti.Flow/Default/FlowStarter.cs b/Tapeti.Flow/Default/FlowStarter.cs index 238947e..b8952cc 100644 --- a/Tapeti.Flow/Default/FlowStarter.cs +++ b/Tapeti.Flow/Default/FlowStarter.cs @@ -25,39 +25,38 @@ namespace Tapeti.Flow.Default /// public async Task Start(Expression>> methodSelector) where TController : class { - await CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] { }); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), Array.Empty()); } /// public async Task Start(Expression>>> methodSelector) where TController : class { - await CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, new object[] {}); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, Array.Empty()); } /// public async Task Start(Expression>> methodSelector, TParameter parameter) where TController : class { - await CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] {parameter}); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object?[] {parameter}); } /// public async Task Start(Expression>>> methodSelector, TParameter parameter) where TController : class { - await CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, new object[] {parameter}); + await CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, new object?[] {parameter}); } - private async Task CallControllerMethod(MethodInfo method, Func> getYieldPointResult, object[] parameters) where TController : class + private async Task CallControllerMethod(MethodInfo method, Func> getYieldPointResult, object?[] parameters) where TController : class { var controller = config.DependencyResolver.Resolve(); - var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters)); + var result = method.Invoke(controller, parameters); + if (result == null) + throw new InvalidOperationException($"Method {method.Name} must return an IYieldPoint or Task, got null"); - var context = new FlowHandlerContext - { - Config = config, - Controller = controller, - Method = method - }; + var yieldPoint = await getYieldPointResult(result); + + var context = new FlowHandlerContext(config, controller, method); var flowHandler = config.DependencyResolver.Resolve(); await flowHandler.Execute(context, yieldPoint); diff --git a/Tapeti.Flow/Default/FlowState.cs b/Tapeti.Flow/Default/FlowState.cs index a6ce743..763e371 100644 --- a/Tapeti.Flow/Default/FlowState.cs +++ b/Tapeti.Flow/Default/FlowState.cs @@ -9,8 +9,8 @@ namespace Tapeti.Flow.Default /// public class FlowState { - private FlowMetadata metadata; - private Dictionary continuations; + private FlowMetadata? metadata; + private Dictionary? continuations; /// @@ -18,7 +18,7 @@ namespace Tapeti.Flow.Default /// public FlowMetadata Metadata { - get => metadata ??= new FlowMetadata(); + get => metadata ??= new FlowMetadata(null); set => metadata = value; } @@ -26,7 +26,7 @@ namespace Tapeti.Flow.Default /// /// Contains the serialized state which is restored when a flow continues. /// - public string Data { get; set; } + public string? Data { get; set; } /// @@ -45,7 +45,7 @@ namespace Tapeti.Flow.Default public FlowState Clone() { return new FlowState { - metadata = metadata.Clone(), + metadata = metadata?.Clone(), Data = Data, continuations = continuations?.ToDictionary(kv => kv.Key, kv => kv.Value.Clone()) }; @@ -61,18 +61,22 @@ namespace Tapeti.Flow.Default /// /// Contains information about the expected response for this flow. /// - public ReplyMetadata Reply { get; set; } + public ReplyMetadata? Reply { get; } + /// + public FlowMetadata(ReplyMetadata? reply) + { + Reply = reply; + } + + /// /// Creates a deep clone of this FlowMetadata. /// public FlowMetadata Clone() { - return new FlowMetadata - { - Reply = Reply?.Clone() - }; + return new FlowMetadata(Reply); } } @@ -85,17 +89,17 @@ namespace Tapeti.Flow.Default /// /// The queue to which the response should be sent. /// - public string ReplyTo { get; set; } + public string? ReplyTo { get; set; } /// /// The correlation ID included in the original request. /// - public string CorrelationId { get; set; } + public string? CorrelationId { get; set; } /// /// The expected response message class. /// - public string ResponseTypeName { get; set; } + public string? ResponseTypeName { get; set; } /// /// Indicates whether the response should be sent a mandatory. @@ -128,12 +132,12 @@ namespace Tapeti.Flow.Default /// /// The name of the method which will handle the response. /// - public string MethodName { get; set; } + public string? MethodName { get; set; } /// /// The name of the method which is called when all responses have been processed. /// - public string ConvergeMethodName { get; set; } + public string? ConvergeMethodName { get; set; } /// /// Determines if the converge method is synchronous or asynchronous. diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index 324f121..3d26ca5 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -16,11 +16,11 @@ namespace Tapeti.Flow.Default { private class CachedFlowState { - public readonly FlowState FlowState; + public readonly FlowState? FlowState; public readonly DateTime CreationTime; public readonly bool IsPersistent; - public CachedFlowState(FlowState flowState, DateTime creationTime, bool isPersistent) + public CachedFlowState(FlowState? flowState, DateTime creationTime, bool isPersistent) { FlowState = flowState; CreationTime = creationTime; @@ -31,7 +31,7 @@ namespace Tapeti.Flow.Default private readonly ConcurrentDictionary flowStates = new(); private readonly ConcurrentDictionary continuationLookup = new(); private readonly LockCollection locks = new(EqualityComparer.Default); - private HashSet validatedMethods; + private HashSet? validatedMethods; private readonly IFlowRepository repository; private readonly ITapetiConfig config; @@ -85,10 +85,13 @@ namespace Tapeti.Flow.Default private void ValidateContinuation(Guid flowId, Guid continuationId, ContinuationMetadata metadata) { + if (string.IsNullOrEmpty(metadata.MethodName)) + return; + // We could check all the things that are required for a continuation or converge method, but this should suffice // for the common scenario where you change code without realizing that it's signature has been persisted // ReSharper disable once InvertIf - if (validatedMethods.Add(metadata.MethodName)) + if (validatedMethods!.Add(metadata.MethodName)) { var methodInfo = MethodSerializer.Deserialize(metadata.MethodName); if (methodInfo == null) @@ -150,8 +153,8 @@ namespace Tapeti.Flow.Default private class FlowStateLock : IFlowStateLock { private readonly FlowStore owner; - private volatile IDisposable flowLock; - private CachedFlowState cachedFlowState; + private volatile IDisposable? flowLock; + private CachedFlowState? cachedFlowState; public Guid FlowID { get; } @@ -172,12 +175,12 @@ namespace Tapeti.Flow.Default l?.Dispose(); } - public ValueTask GetFlowState() + public ValueTask GetFlowState() { if (flowLock == null) throw new ObjectDisposedException("FlowStateLock"); - return new ValueTask(cachedFlowState?.FlowState?.Clone()); + return new ValueTask(cachedFlowState?.FlowState?.Clone()); } public async ValueTask StoreFlowState(FlowState newFlowState, bool persistent) @@ -189,13 +192,13 @@ namespace Tapeti.Flow.Default newFlowState = newFlowState.Clone(); // Update the lookup dictionary for the ContinuationIDs - if (cachedFlowState != null) + if (cachedFlowState?.FlowState != null) { foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k))) owner.continuationLookup.TryRemove(removedContinuation, out _); } - foreach (var addedContinuation in newFlowState.Continuations.Where(c => cachedFlowState == null || !cachedFlowState.FlowState.Continuations.ContainsKey(c.Key))) + foreach (var addedContinuation in newFlowState.Continuations.Where(c => cachedFlowState?.FlowState == null || !cachedFlowState.FlowState.Continuations.ContainsKey(c.Key))) { owner.continuationLookup.TryAdd(addedContinuation.Key, FlowID); } @@ -203,7 +206,7 @@ namespace Tapeti.Flow.Default var isNew = cachedFlowState == null; var wasPersistent = cachedFlowState?.IsPersistent ?? false; - cachedFlowState = new CachedFlowState(newFlowState, isNew ? DateTime.UtcNow : cachedFlowState.CreationTime, persistent); + cachedFlowState = new CachedFlowState(newFlowState, isNew ? DateTime.UtcNow : cachedFlowState!.CreationTime, persistent); owner.flowStates[FlowID] = cachedFlowState; if (persistent) @@ -231,7 +234,7 @@ namespace Tapeti.Flow.Default if (flowLock == null) throw new ObjectDisposedException("FlowStateLock"); - if (cachedFlowState != null) + if (cachedFlowState?.FlowState != null) { foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys) owner.continuationLookup.TryRemove(removedContinuation, out _); @@ -239,7 +242,7 @@ namespace Tapeti.Flow.Default owner.flowStates.TryRemove(FlowID, out var removedFlowState); cachedFlowState = null; - if (removedFlowState.IsPersistent) + if (removedFlowState is { IsPersistent: true }) await owner.repository.DeleteState(FlowID); } } diff --git a/Tapeti.Flow/FlowExtension.cs b/Tapeti.Flow/FlowExtension.cs index cc28e1e..e7610aa 100644 --- a/Tapeti.Flow/FlowExtension.cs +++ b/Tapeti.Flow/FlowExtension.cs @@ -9,11 +9,11 @@ namespace Tapeti.Flow /// public class FlowExtension : ITapetiExtension { - private readonly IFlowRepository flowRepository; + private readonly IFlowRepository? flowRepository; /// /// - public FlowExtension(IFlowRepository flowRepository) + public FlowExtension(IFlowRepository? flowRepository) { this.flowRepository = flowRepository; } diff --git a/Tapeti.Flow/FlowHelpers/LockCollection.cs b/Tapeti.Flow/FlowHelpers/LockCollection.cs index 2c0372e..df39769 100644 --- a/Tapeti.Flow/FlowHelpers/LockCollection.cs +++ b/Tapeti.Flow/FlowHelpers/LockCollection.cs @@ -7,7 +7,7 @@ namespace Tapeti.Flow.FlowHelpers /// /// Implementation of an asynchronous locking mechanism. /// - public class LockCollection + public class LockCollection where T : notnull { private readonly Dictionary locks; @@ -57,7 +57,7 @@ namespace Tapeti.Flow.FlowHelpers private class LockItem : IDisposable { - internal volatile LockItem Next; + internal volatile LockItem? Next; private readonly Dictionary locks; private readonly TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); diff --git a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs b/Tapeti.Flow/FlowHelpers/MethodSerializer.cs index c11c6eb..38c6f93 100644 --- a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs +++ b/Tapeti.Flow/FlowHelpers/MethodSerializer.cs @@ -25,7 +25,7 @@ namespace Tapeti.Flow.FlowHelpers /// Deserializes the serialized method representation back into it's MethodInfo, or null if not found. /// /// - public static MethodInfo Deserialize(string serializedMethod) + public static MethodInfo? Deserialize(string serializedMethod) { var match = DeserializeRegex.Match(serializedMethod); if (!match.Success) diff --git a/Tapeti.Flow/FlowMessageContextPayload.cs b/Tapeti.Flow/FlowMessageContextPayload.cs index f0cdadb..07476d6 100644 --- a/Tapeti.Flow/FlowMessageContextPayload.cs +++ b/Tapeti.Flow/FlowMessageContextPayload.cs @@ -16,9 +16,8 @@ namespace Tapeti.Flow /// parallel flow is done and the convergeMethod will be called. /// Temporarily disables storing the flow state. /// - public bool FlowIsConverging => FlowContext != null && - FlowContext.FlowState.Continuations.Count == 0 && - FlowContext.ContinuationMetadata.ConvergeMethodName != null; + public bool FlowIsConverging => FlowContext.FlowState.Continuations.Count == 0 && + FlowContext.ContinuationMetadata?.ConvergeMethodName != null; public FlowMessageContextPayload(FlowContext flowContext) @@ -29,7 +28,7 @@ namespace Tapeti.Flow public void Dispose() { - FlowContext?.Dispose(); + FlowContext.Dispose(); } } } diff --git a/Tapeti.Flow/IFlowHandlerContext.cs b/Tapeti.Flow/IFlowHandlerContext.cs index 0b99556..d0420c5 100644 --- a/Tapeti.Flow/IFlowHandlerContext.cs +++ b/Tapeti.Flow/IFlowHandlerContext.cs @@ -18,7 +18,7 @@ namespace Tapeti.Flow /// /// An instance of the controller which starts or continues the flow. /// - object Controller { get; } + object? Controller { get; } /// @@ -31,6 +31,6 @@ namespace Tapeti.Flow /// Access to the message context if this is a continuated flow. /// Will be null when in a starting flow. /// - IMessageContext MessageContext { get; } + IMessageContext? MessageContext { get; } } } diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index b7e18df..613e82d 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -20,7 +20,7 @@ namespace Tapeti.Flow /// /// /// - IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler); + IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) where TRequest : class where TResponse : class; /// @@ -32,7 +32,7 @@ namespace Tapeti.Flow /// /// /// - IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler); + IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) where TRequest : class where TResponse : class; /// @@ -51,7 +51,7 @@ namespace Tapeti.Flow /// /// /// - IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler); + IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) where TRequest : class where TResponse : class; /// @@ -67,7 +67,7 @@ namespace Tapeti.Flow /// /// /// - IYieldPoint EndWithResponse(TResponse message); + IYieldPoint EndWithResponse(TResponse message) where TResponse : class; /// @@ -126,7 +126,7 @@ namespace Tapeti.Flow /// /// Returns the parallel request for the given message context. /// - IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context); + IFlowParallelRequest? GetParallelRequest(IFlowHandlerContext context); /// @@ -174,7 +174,7 @@ namespace Tapeti.Flow /// /// /// - IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class; /// /// Publish a request message and continue the flow when the response arrives. @@ -184,21 +184,21 @@ namespace Tapeti.Flow /// /// /// - IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class; /// /// This overload allows the response handler access to the IFlowParallelRequest interface, which /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. /// /// - IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class; /// /// This overload allows the response handler access to the IFlowParallelRequest interface, which /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. /// /// - IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler); + IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class; /// /// Publish a request message and continue the flow when the response arrives. @@ -208,14 +208,14 @@ namespace Tapeti.Flow /// /// /// - IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler); + IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) where TRequest : class where TResponse : class; /// /// This overload allows the response handler access to the IFlowParallelRequest interface, which /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. /// /// - IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler); + IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler) where TRequest : class where TResponse : class; /// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are /// async, so you should always await them. @@ -260,14 +260,14 @@ namespace Tapeti.Flow /// /// /// - Task AddRequest(TRequest message, Func responseHandler); + Task AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class; /// /// This overload allows the response handler access to the IFlowParallelRequest interface, which /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. /// /// - Task AddRequest(TRequest message, Func responseHandler); + Task AddRequest(TRequest message, Func responseHandler) where TRequest : class where TResponse : class; /// /// Publish a request message and continue the flow when the response arrives. @@ -277,7 +277,7 @@ namespace Tapeti.Flow /// /// /// - Task AddRequestSync(TRequest message, Action responseHandler); + Task AddRequestSync(TRequest message, Action responseHandler) where TRequest : class where TResponse : class; } diff --git a/Tapeti.Flow/IFlowStore.cs b/Tapeti.Flow/IFlowStore.cs index 38dc328..4c68a8f 100644 --- a/Tapeti.Flow/IFlowStore.cs +++ b/Tapeti.Flow/IFlowStore.cs @@ -55,7 +55,7 @@ namespace Tapeti.Flow /// /// Acquires a copy of the flow state. /// - ValueTask GetFlowState(); + ValueTask GetFlowState(); /// /// Stores the new flow state. diff --git a/Tapeti.Flow/ReSharper/JetBrains.Annotations.cs b/Tapeti.Flow/ReSharper/JetBrains.Annotations.cs index ef47d40..39940b0 100644 --- a/Tapeti.Flow/ReSharper/JetBrains.Annotations.cs +++ b/Tapeti.Flow/ReSharper/JetBrains.Annotations.cs @@ -170,11 +170,11 @@ namespace JetBrains.Annotations { public PublicAPIAttribute() { } - public PublicAPIAttribute([NotNull] string comment) + public PublicAPIAttribute(string comment) { Comment = comment; } - [CanBeNull] public string Comment { get; } + public string? Comment { get; } } } \ No newline at end of file diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index 6fc172c..b772e71 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -12,6 +12,7 @@ Tapeti.Flow.png 2.0.0 9 + enable diff --git a/Tapeti.Ninject/Tapeti.Ninject.csproj b/Tapeti.Ninject/Tapeti.Ninject.csproj index 84d56eb..d31c98b 100644 --- a/Tapeti.Ninject/Tapeti.Ninject.csproj +++ b/Tapeti.Ninject/Tapeti.Ninject.csproj @@ -12,6 +12,7 @@ Tapeti.SimpleInjector.png 2.0.0 9 + enable diff --git a/Tapeti.Serilog/Middleware/MessageHandlerLoggingBindingMiddleware.cs b/Tapeti.Serilog/Middleware/MessageHandlerLoggingBindingMiddleware.cs index d3ba51d..17a1c28 100644 --- a/Tapeti.Serilog/Middleware/MessageHandlerLoggingBindingMiddleware.cs +++ b/Tapeti.Serilog/Middleware/MessageHandlerLoggingBindingMiddleware.cs @@ -43,7 +43,7 @@ namespace Tapeti.Serilog.Middleware } - private static object DiagnosticContextFactory(IMessageContext context) + private static object? DiagnosticContextFactory(IMessageContext context) { return context.TryGet(out var diagnosticContextPayload) ? diagnosticContextPayload.DiagnosticContext diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj index 2b520d3..ddef97e 100644 --- a/Tapeti.Serilog/Tapeti.Serilog.csproj +++ b/Tapeti.Serilog/Tapeti.Serilog.csproj @@ -12,6 +12,7 @@ Tapeti.Serilog.png 2.0.0 9 + enable diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs index af3de98..dfdd830 100644 --- a/Tapeti.Serilog/TapetiSeriLogger.cs +++ b/Tapeti.Serilog/TapetiSeriLogger.cs @@ -87,7 +87,7 @@ namespace Tapeti.Serilog public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult) { var message = new StringBuilder("Tapeti: exception in message handler"); - var messageParams = new List(); + var messageParams = new List(); var contextLogger = seriLogger .ForContext("consumeResult", consumeResult) @@ -130,7 +130,7 @@ namespace Tapeti.Serilog } /// - public void QueueExistsWarning(string queueName, IRabbitMQArguments existingArguments, IRabbitMQArguments arguments) + public void QueueExistsWarning(string queueName, IRabbitMQArguments? existingArguments, IRabbitMQArguments? arguments) { seriLogger.Warning("Tapeti: durable queue {queueName} exists with incompatible x-arguments ({existingArguments} vs. {arguments}) and will not be redeclared, queue will be consumed as-is", queueName, diff --git a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs index 0c96418..0960843 100644 --- a/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs +++ b/Tapeti.SimpleInjector/SimpleInjectorDependencyResolver.cs @@ -10,12 +10,12 @@ namespace Tapeti.SimpleInjector public class SimpleInjectorDependencyResolver : IDependencyContainer { private readonly Container container; - private readonly Lifestyle defaultsLifestyle; - private readonly Lifestyle controllersLifestyle; + private readonly Lifestyle? defaultsLifestyle; + private readonly Lifestyle? controllersLifestyle; /// /// - public SimpleInjectorDependencyResolver(Container container, Lifestyle defaultsLifestyle = null, Lifestyle controllersLifestyle = null) + public SimpleInjectorDependencyResolver(Container container, Lifestyle? defaultsLifestyle = null, Lifestyle? controllersLifestyle = null) { this.container = container; this.defaultsLifestyle = defaultsLifestyle; diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj index c60b753..c0c5614 100644 --- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj @@ -12,6 +12,7 @@ Tapeti.SimpleInjector.png 2.0.0 9 + enable diff --git a/Tapeti.Tests/Client/RabbitMQFixture.cs b/Tapeti.Tests/Client/RabbitMQFixture.cs index 5c8d473..240bac0 100644 --- a/Tapeti.Tests/Client/RabbitMQFixture.cs +++ b/Tapeti.Tests/Client/RabbitMQFixture.cs @@ -27,7 +27,7 @@ namespace Tapeti.Tests.Client public ushort RabbitMQManagementPort { get; private set; } - private TestcontainerMessageBroker testcontainers; + private TestcontainerMessageBroker? testcontainers; private const int DefaultRabbitMQPort = 5672; private const int DefaultRabbitMQManagementPort = 15672; diff --git a/Tapeti.Tests/Client/TapetiClientTests.cs b/Tapeti.Tests/Client/TapetiClientTests.cs index c9157ba..49c3bc0 100644 --- a/Tapeti.Tests/Client/TapetiClientTests.cs +++ b/Tapeti.Tests/Client/TapetiClientTests.cs @@ -19,7 +19,7 @@ namespace Tapeti.Tests.Client private readonly RabbitMQFixture fixture; private readonly MockDependencyResolver dependencyResolver = new(); - private TapetiClient client; + private TapetiClient client = null!; public TapetiClientTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper) @@ -41,7 +41,6 @@ namespace Tapeti.Tests.Client public async Task DisposeAsync() { await client.Close(); - client = null; } diff --git a/Tapeti.Tests/Config/QueueArgumentsTest.cs b/Tapeti.Tests/Config/QueueArgumentsTest.cs index ebb8cbc..98a74c2 100644 --- a/Tapeti.Tests/Config/QueueArgumentsTest.cs +++ b/Tapeti.Tests/Config/QueueArgumentsTest.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using FluentAssertions; @@ -13,6 +14,16 @@ using Xunit; namespace Tapeti.Tests.Config { + internal static class UTF8StringExtensions + { + public static string AsUTF8String(this object value) + { + value.Should().BeOfType(); + return Encoding.UTF8.GetString((byte[])value); + } + } + + public class QueueArgumentsTest : BaseControllerTest { private static readonly MockRepository MoqRepository = new(MockBehavior.Strict); @@ -88,12 +99,12 @@ namespace Tapeti.Tests.Config declaredQueues.Should().HaveCount(1); var arguments = declaredQueues["queue-1"]; - arguments.Should().ContainKey("x-custom").WhoseValue.Should().Be("custom value"); + arguments.Should().ContainKey("x-custom").WhoseValue.AsUTF8String().Should().Be("custom value"); arguments.Should().ContainKey("x-another").WhoseValue.Should().Be(true); arguments.Should().ContainKey("x-max-length").WhoseValue.Should().Be(100); arguments.Should().ContainKey("x-max-length-bytes").WhoseValue.Should().Be(100000); arguments.Should().ContainKey("x-message-ttl").WhoseValue.Should().Be(4269); - arguments.Should().ContainKey("x-overflow").WhoseValue.Should().Be("reject-publish"); + arguments.Should().ContainKey("x-overflow").WhoseValue.AsUTF8String().Should().Be("reject-publish"); } @@ -133,7 +144,7 @@ namespace Tapeti.Tests.Config } } - + // ReSharper disable all #pragma warning disable diff --git a/Tapeti.Tests/Mock/MockDependencyResolver.cs b/Tapeti.Tests/Mock/MockDependencyResolver.cs index 51d8698..111ebac 100644 --- a/Tapeti.Tests/Mock/MockDependencyResolver.cs +++ b/Tapeti.Tests/Mock/MockDependencyResolver.cs @@ -8,7 +8,7 @@ namespace Tapeti.Tests.Mock private readonly Dictionary container = new(); - public void Set(TInterface instance) + public void Set(TInterface instance) where TInterface : class { container.Add(typeof(TInterface), instance); } diff --git a/Tapeti.Tests/Mock/MockLogger.cs b/Tapeti.Tests/Mock/MockLogger.cs index 19c41fb..bc02a26 100644 --- a/Tapeti.Tests/Mock/MockLogger.cs +++ b/Tapeti.Tests/Mock/MockLogger.cs @@ -49,8 +49,17 @@ namespace Tapeti.Tests.Mock : $"Declaring {(durable ? "durable" : "dynamic")} queue {queueName}"); } - public void QueueExistsWarning(string queueName, IRabbitMQArguments existingArguments, IRabbitMQArguments arguments) + public void QueueExistsWarning(string queueName, IRabbitMQArguments? existingArguments, IRabbitMQArguments? arguments) { + testOutputHelper.WriteLine($"[Tapeti] Durable queue {queueName} exists with incompatible x-arguments ({GetArgumentsText(existingArguments)} vs. {GetArgumentsText(arguments)}) and will not be redeclared, queue will be consumed as-is"); + } + + + private static string GetArgumentsText(IRabbitMQArguments? arguments) + { + if (arguments == null || arguments.Count == 0) + return "empty"; + var argumentsText = new StringBuilder(); foreach (var pair in arguments) { @@ -60,9 +69,10 @@ namespace Tapeti.Tests.Mock argumentsText.Append($"{pair.Key} = {pair.Value}"); } - testOutputHelper.WriteLine($"Durable queue {queueName} exists with incompatible x-arguments ({argumentsText}) and will not be redeclared, queue will be consumed as-is"); + return argumentsText.ToString(); } + public void QueueBind(string queueName, bool durable, string exchange, string routingKey) { testOutputHelper.WriteLine($"Binding {queueName} to exchange {exchange} with routing key {routingKey}"); diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index 0a2b345..09b729d 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -2,6 +2,7 @@ net6.0 + enable diff --git a/Tapeti.Transient/ITransientPublisher.cs b/Tapeti.Transient/ITransientPublisher.cs index 7c01409..9aea93d 100644 --- a/Tapeti.Transient/ITransientPublisher.cs +++ b/Tapeti.Transient/ITransientPublisher.cs @@ -15,6 +15,6 @@ namespace Tapeti.Transient /// /// /// - Task RequestResponse(TRequest request); + Task RequestResponse(TRequest request) where TRequest : class where TResponse : class; } } \ No newline at end of file diff --git a/Tapeti.Transient/Tapeti.Transient.csproj b/Tapeti.Transient/Tapeti.Transient.csproj index 7a26c25..27af569 100644 --- a/Tapeti.Transient/Tapeti.Transient.csproj +++ b/Tapeti.Transient/Tapeti.Transient.csproj @@ -12,6 +12,7 @@ Tapeti.Flow.png 2.0.0 9 + enable diff --git a/Tapeti.Transient/TransientExtension.cs b/Tapeti.Transient/TransientExtension.cs index ec642e5..4b8d76b 100644 --- a/Tapeti.Transient/TransientExtension.cs +++ b/Tapeti.Transient/TransientExtension.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using Tapeti.Config; namespace Tapeti.Transient @@ -31,7 +32,7 @@ namespace Tapeti.Transient /// public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) { - return null; + return Enumerable.Empty(); } diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs index 0617c62..10c633f 100644 --- a/Tapeti.Transient/TransientGenericBinding.cs +++ b/Tapeti.Transient/TransientGenericBinding.cs @@ -14,10 +14,10 @@ namespace Tapeti.Transient private readonly string dynamicQueuePrefix; /// - public string QueueName { get; private set; } + public string? QueueName { get; private set; } /// - public QueueType QueueType => QueueType.Dynamic; + public QueueType? QueueType => Config.QueueType.Dynamic; /// diff --git a/Tapeti.Transient/TransientPublisher.cs b/Tapeti.Transient/TransientPublisher.cs index fb1f713..25880f7 100644 --- a/Tapeti.Transient/TransientPublisher.cs +++ b/Tapeti.Transient/TransientPublisher.cs @@ -21,7 +21,7 @@ namespace Tapeti.Transient /// - public async Task RequestResponse(TRequest request) + public async Task RequestResponse(TRequest request) where TRequest : class where TResponse : class { return (TResponse)await router.RequestResponse(publisher, request); } diff --git a/Tapeti.Transient/TransientRouter.cs b/Tapeti.Transient/TransientRouter.cs index 96c27af..5425e4c 100644 --- a/Tapeti.Transient/TransientRouter.cs +++ b/Tapeti.Transient/TransientRouter.cs @@ -18,7 +18,7 @@ namespace Tapeti.Transient /// /// The generated name of the dynamic queue to which responses should be sent. /// - public string TransientResponseQueueName { get; set; } + public string? TransientResponseQueueName { get; set; } /// @@ -41,8 +41,13 @@ namespace Tapeti.Transient if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID)) return; - if (map.TryRemove(continuationID, out var tcs)) - tcs.TrySetResult(context.Message); + if (!map.TryRemove(continuationID, out var tcs)) + return; + + if (context.Message == null) + throw new InvalidOperationException(); + + tcs.TrySetResult(context.Message); } @@ -72,7 +77,7 @@ namespace Tapeti.Transient { // Simple cleanup of the task and map dictionary. if (map.TryRemove(correlation, out tcs)) - tcs.TrySetResult(null); + tcs.TrySetResult(null!); throw; } @@ -84,8 +89,10 @@ namespace Tapeti.Transient } - private void TimeoutResponse(object tcs) + private void TimeoutResponse(object? tcs) { + ArgumentNullException.ThrowIfNull(tcs, nameof(tcs)); + ((TaskCompletionSource)tcs).TrySetException(new TimeoutException("Transient RequestResponse timed out at (ms) " + defaultTimeoutMs)); } } diff --git a/Tapeti/Config/ControllerMessageContextPayload.cs b/Tapeti/Config/ControllerMessageContextPayload.cs index 992b053..8bd3f0a 100644 --- a/Tapeti/Config/ControllerMessageContextPayload.cs +++ b/Tapeti/Config/ControllerMessageContextPayload.cs @@ -8,7 +8,7 @@ /// /// An instance of the controller referenced by the binding. Note: can be null during Cleanup or when bound to static methods. /// - public object Controller { get; } + public object? Controller { get; } /// @@ -22,7 +22,7 @@ /// /// An instance of the controller referenced by the binding /// The binding which is currently processing the message - public ControllerMessageContextPayload(object controller, IControllerMethodBinding binding) + public ControllerMessageContextPayload(object? controller, IControllerMethodBinding binding) { Controller = controller; Binding = binding; diff --git a/Tapeti/Config/IBinding.cs b/Tapeti/Config/IBinding.cs index d7fa843..d67255a 100644 --- a/Tapeti/Config/IBinding.cs +++ b/Tapeti/Config/IBinding.cs @@ -29,13 +29,13 @@ namespace Tapeti.Config /// /// The name of the queue the binding is consuming. May change after a reconnect for dynamic queues. /// - string QueueName { get; } + string? QueueName { get; } /// /// Determines the type of queue the binding registers /// - QueueType QueueType { get; } + QueueType? QueueType { get; } /// @@ -82,7 +82,7 @@ namespace Tapeti.Config /// The message class to be bound to the queue /// The name of the durable queue /// Optional arguments - ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments arguments); + ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments); /// /// Binds the messageClass to a dynamic auto-delete queue. @@ -95,7 +95,7 @@ namespace Tapeti.Config /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. /// Optional arguments /// The generated name of the dynamic queue - ValueTask BindDynamic(Type messageClass, string queuePrefix, IRabbitMQArguments arguments); + ValueTask BindDynamic(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments); /// /// Declares a durable queue but does not add a binding for a messageClass' routing key. @@ -103,7 +103,7 @@ namespace Tapeti.Config /// /// The name of the durable queue /// Optional arguments - ValueTask BindDurableDirect(string queueName, IRabbitMQArguments arguments); + ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments); /// /// Declares a dynamic queue but does not add a binding for a messageClass' routing key. @@ -113,7 +113,7 @@ namespace Tapeti.Config /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. /// Optional arguments /// The generated name of the dynamic queue - ValueTask BindDynamicDirect(Type messageClass, string queuePrefix, IRabbitMQArguments arguments); + ValueTask BindDynamicDirect(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments); /// /// Declares a dynamic queue but does not add a binding for a messageClass' routing key. @@ -122,7 +122,7 @@ namespace Tapeti.Config /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. /// Optional arguments /// The generated name of the dynamic queue - ValueTask BindDynamicDirect(string queuePrefix, IRabbitMQArguments arguments); + ValueTask BindDynamicDirect(string? queuePrefix, IRabbitMQArguments? arguments); /// /// Marks the specified durable queue as having an obsolete binding. If after all bindings have subscribed, the queue only contains obsolete diff --git a/Tapeti/Config/IControllerBindingContext.cs b/Tapeti/Config/IControllerBindingContext.cs index 2023e22..3d3e6d2 100644 --- a/Tapeti/Config/IControllerBindingContext.cs +++ b/Tapeti/Config/IControllerBindingContext.cs @@ -11,7 +11,7 @@ namespace Tapeti.Config /// Injects a value for a controller method parameter. /// /// - public delegate object ValueFactory(IMessageContext context); + public delegate object? ValueFactory(IMessageContext context); /// @@ -19,7 +19,7 @@ namespace Tapeti.Config /// /// /// - public delegate ValueTask ResultHandler(IMessageContext context, object value); + public delegate ValueTask ResultHandler(IMessageContext context, object? value); /// @@ -48,7 +48,7 @@ namespace Tapeti.Config /// The message class for this method. Can be null if not yet set by the default MessageBinding or other middleware. /// If required, call next first to ensure it is available. /// - Type MessageClass { get; } + Type? MessageClass { get; } /// /// Determines if SetMessageClass has already been called. diff --git a/Tapeti/Config/IControllerBindingMiddleware.cs b/Tapeti/Config/IControllerBindingMiddleware.cs index cf878b2..4fdf341 100644 --- a/Tapeti/Config/IControllerBindingMiddleware.cs +++ b/Tapeti/Config/IControllerBindingMiddleware.cs @@ -7,12 +7,12 @@ namespace Tapeti.Config /// public interface IControllerBindingMiddleware : IControllerMiddlewareBase { - /// - /// Called before a Controller method is registered. Can change the way parameters and return values are handled, - /// and can inject message middleware specific to a method. - /// - /// - /// Must be called to activate the new layer of middleware. - void Handle(IControllerBindingContext context, Action next); + /// + /// Called before a Controller method is registered. Can change the way parameters and return values are handled, + /// and can inject message middleware specific to a method. + /// + /// + /// Must be called to activate the new layer of middleware. + void Handle(IControllerBindingContext context, Action next); } } diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs index 6fea4cc..2d6c77e 100644 --- a/Tapeti/Config/IMessageContext.cs +++ b/Tapeti/Config/IMessageContext.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics.CodeAnalysis; using System.Threading; // ReSharper disable UnusedMemberInSuper.Global - public API @@ -34,12 +35,12 @@ namespace Tapeti.Config /// /// Contains the raw body of the message. /// - byte[] RawBody { get; } + byte[]? RawBody { get; } /// /// Contains the decoded message instance. /// - object Message { get; } + object? Message { get; } /// /// Provides access to the message metadata. @@ -55,7 +56,7 @@ namespace Tapeti.Config /// Contains a CancellationToken which is cancelled when the connection to the RabbitMQ server is closed. /// Note that this token is cancelled regardless of whether the connection will be reestablished, as any /// messages still in the queue will be redelivered with a new token. - /// + /// CancellationToken ConnectionClosed { get; } /// @@ -87,7 +88,7 @@ namespace Tapeti.Config /// Returns true and the payload value if this message context was previously enriched with the payload T. /// /// The payload type as passed to Enrich - bool TryGet(out T payload) where T : IMessageContextPayload; + bool TryGet([NotNullWhen(true)] out T? payload) where T : IMessageContextPayload; /// /// Stores a key-value pair in the context for passing information between the various @@ -105,7 +106,7 @@ namespace Tapeti.Config /// /// True if the value was found, False otherwise [Obsolete("For backwards compatibility only. Use Get payload overload for typed properties instead")] - bool Get(string key, out T value) where T : class; + bool Get(string key, out T? value) where T : class; } diff --git a/Tapeti/Config/IMessageProperties.cs b/Tapeti/Config/IMessageProperties.cs index 31c203b..07e7eaf 100644 --- a/Tapeti/Config/IMessageProperties.cs +++ b/Tapeti/Config/IMessageProperties.cs @@ -9,13 +9,13 @@ namespace Tapeti.Config public interface IMessageProperties { /// - string ContentType { get; set; } + string? ContentType { get; set; } /// - string CorrelationId { get; set; } + string? CorrelationId { get; set; } /// - string ReplyTo { get; set; } + string? ReplyTo { get; set; } /// bool? Persistent { get; set; } @@ -37,7 +37,7 @@ namespace Tapeti.Config /// /// /// The value if found, null otherwise - string GetHeader(string name); + string? GetHeader(string name); /// diff --git a/Tapeti/Config/IPublishContext.cs b/Tapeti/Config/IPublishContext.cs index a5fb435..60c0859 100644 --- a/Tapeti/Config/IPublishContext.cs +++ b/Tapeti/Config/IPublishContext.cs @@ -16,7 +16,7 @@ namespace Tapeti.Config /// /// The exchange to which the message will be published. /// - string Exchange { get; set; } + string? Exchange { get; set; } /// /// The routing key which will be included with the message. @@ -31,6 +31,6 @@ namespace Tapeti.Config /// /// Provides access to the message metadata. /// - IMessageProperties Properties { get; } + IMessageProperties? Properties { get; } } } diff --git a/Tapeti/Config/ITapetiConfig.cs b/Tapeti/Config/ITapetiConfig.cs index 2b14a26..4bdad56 100644 --- a/Tapeti/Config/ITapetiConfig.cs +++ b/Tapeti/Config/ITapetiConfig.cs @@ -87,40 +87,13 @@ namespace Tapeti.Config /// /// /// The binding if found, null otherwise - IControllerMethodBinding ForMethod(Delegate method); + IControllerMethodBinding? ForMethod(Delegate method); /// /// Searches for a binding linked to the specified method. /// /// /// The binding if found, null otherwise - IControllerMethodBinding ForMethod(MethodInfo method); + IControllerMethodBinding? ForMethod(MethodInfo method); } - - - /* - public interface IBinding - { - Type Controller { get; } - MethodInfo Method { get; } - Type MessageClass { get; } - string QueueName { get; } - QueueBindingMode QueueBindingMode { get; set; } - - IReadOnlyList MessageFilterMiddleware { get; } - IReadOnlyList MessageMiddleware { get; } - - bool Accept(Type messageClass); - bool Accept(IMessageContext context, object message); - Task Invoke(IMessageContext context, object message); - } - */ - - - /* - public interface IBuildBinding : IBinding - { - void SetQueueName(string queueName); - } - */ } diff --git a/Tapeti/Connection/ITapetiClient.cs b/Tapeti/Connection/ITapetiClient.cs index d9df591..eeda4f3 100644 --- a/Tapeti/Connection/ITapetiClient.cs +++ b/Tapeti/Connection/ITapetiClient.cs @@ -37,9 +37,9 @@ namespace Tapeti.Connection } /// - public override bool Equals(object obj) + public override bool Equals(object? obj) { - if (ReferenceEquals(null, obj)) return false; + if (obj is null) return false; return obj is QueueBinding other && Equals(other); } @@ -79,7 +79,7 @@ namespace Tapeti.Connection /// The exchange to publish the message to, or empty to send it directly to a queue /// The routing key for the message, or queue name if exchange is empty /// If true, an exception will be raised if the message can not be delivered to at least one queue - Task Publish(byte[] body, IMessageProperties properties, string exchange, string routingKey, bool mandatory); + Task Publish(byte[] body, IMessageProperties properties, string? exchange, string routingKey, bool mandatory); /// @@ -89,7 +89,7 @@ namespace Tapeti.Connection /// The consumer implementation which will receive the messages from the queue /// Cancelled when the connection is lost /// The consumer tag as returned by BasicConsume. - Task Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken); + Task Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken); /// /// Stops the consumer with the specified tag. @@ -104,7 +104,7 @@ namespace Tapeti.Connection /// A list of bindings. Any bindings already on the queue which are not in this list will be removed /// Optional arguments /// Cancelled when the connection is lost - Task DurableQueueDeclare(string queueName, IEnumerable bindings, IRabbitMQArguments arguments, CancellationToken cancellationToken); + Task DurableQueueDeclare(string queueName, IEnumerable bindings, IRabbitMQArguments? arguments, CancellationToken cancellationToken); /// /// Verifies a durable queue exists. Will raise an exception if it does not. @@ -112,7 +112,7 @@ namespace Tapeti.Connection /// The name of the queue to verify /// Optional arguments /// Cancelled when the connection is lost - Task DurableQueueVerify(string queueName, IRabbitMQArguments arguments, CancellationToken cancellationToken); + Task DurableQueueVerify(string queueName, IRabbitMQArguments? arguments, CancellationToken cancellationToken); /// /// Deletes a durable queue. @@ -128,7 +128,7 @@ namespace Tapeti.Connection /// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue. /// Optional arguments /// Cancelled when the connection is lost - Task DynamicQueueDeclare(string queuePrefix, IRabbitMQArguments arguments, CancellationToken cancellationToken); + Task DynamicQueueDeclare(string? queuePrefix, IRabbitMQArguments? arguments, CancellationToken cancellationToken); /// /// Add a binding to a dynamic queue. diff --git a/Tapeti/Connection/RabbitMQArguments.cs b/Tapeti/Connection/RabbitMQArguments.cs index 5c0d7e9..9e0a576 100644 --- a/Tapeti/Connection/RabbitMQArguments.cs +++ b/Tapeti/Connection/RabbitMQArguments.cs @@ -15,17 +15,10 @@ namespace Tapeti.Connection { } - #if NETSTANDARD2_1_OR_GREATER + public RabbitMQArguments(IReadOnlyDictionary values) : base(values) { } - #else - public RabbitMQArguments(IReadOnlyDictionary values) - { - foreach (var pair in values) - Add(pair.Key, pair.Value); - } - #endif public void AddUTF8(string key, string value) diff --git a/Tapeti/Connection/TapetiChannel.cs b/Tapeti/Connection/TapetiChannel.cs index 0efe5c3..ed39446 100644 --- a/Tapeti/Connection/TapetiChannel.cs +++ b/Tapeti/Connection/TapetiChannel.cs @@ -21,7 +21,7 @@ namespace Tapeti.Connection { private readonly Func modelFactory; private readonly object taskQueueLock = new(); - private SingleThreadTaskQueue taskQueue; + private SingleThreadTaskQueue? taskQueue; private readonly ModelProvider modelProvider; @@ -34,7 +34,7 @@ namespace Tapeti.Connection public async Task Reset() { - SingleThreadTaskQueue capturedTaskQueue; + SingleThreadTaskQueue? capturedTaskQueue; lock (taskQueueLock) { diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index aaa8c2a..ec1316b 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -43,7 +43,7 @@ namespace Tapeti.Connection /// /// Receives events when the connection state changes. /// - public IConnectionEventListener ConnectionEventListener { get; set; } + public IConnectionEventListener? ConnectionEventListener { get; set; } private readonly TapetiChannel consumeChannel; @@ -53,9 +53,9 @@ namespace Tapeti.Connection // These fields must be locked using connectionLock private readonly object connectionLock = new(); private long connectionReference; - private RabbitMQ.Client.IConnection connection; - private IModel consumeChannelModel; - private IModel publishChannelModel; + private RabbitMQ.Client.IConnection? connection; + private IModel? consumeChannelModel; + private IModel? publishChannelModel; private bool isClosing; private bool isReconnect; private DateTime connectedDateTime; @@ -72,8 +72,15 @@ namespace Tapeti.Connection private class ConfirmMessageInfo { - public string ReturnKey; - public TaskCompletionSource CompletionSource; + public string ReturnKey { get; } + public TaskCompletionSource CompletionSource { get; } + + + public ConfirmMessageInfo(string returnKey, TaskCompletionSource completionSource) + { + ReturnKey = returnKey; + CompletionSource = completionSource; + } } @@ -110,7 +117,7 @@ namespace Tapeti.Connection /// - public async Task Publish(byte[] body, IMessageProperties properties, string exchange, string routingKey, bool mandatory) + public async Task Publish(byte[] body, IMessageProperties properties, string? exchange, string routingKey, bool mandatory) { if (string.IsNullOrEmpty(routingKey)) throw new ArgumentNullException(nameof(routingKey)); @@ -118,17 +125,14 @@ namespace Tapeti.Connection await GetTapetiChannel(TapetiChannelType.Publish).QueueWithProvider(async channelProvider => { - Task publishResultTask = null; - var messageInfo = new ConfirmMessageInfo - { - ReturnKey = GetReturnKey(exchange, routingKey), - CompletionSource = new TaskCompletionSource() - }; + Task? publishResultTask = null; + var messageInfo = new ConfirmMessageInfo(GetReturnKey(exchange ?? string.Empty, routingKey), new TaskCompletionSource()); channelProvider.WithRetryableChannel(channel => { - DeclareExchange(channel, exchange); + if (exchange != null) + DeclareExchange(channel, exchange); // The delivery tag is lost after a reconnect, register under the new tag if (config.Features.PublisherConfirms) @@ -153,7 +157,7 @@ namespace Tapeti.Connection try { var publishProperties = new RabbitMQMessageProperties(channel.CreateBasicProperties(), properties); - channel.BasicPublish(exchange ?? "", routingKey, mandatory, publishProperties.BasicProperties, body); + channel.BasicPublish(exchange ?? string.Empty, routingKey, mandatory, publishProperties.BasicProperties, body); } catch { @@ -202,7 +206,7 @@ namespace Tapeti.Connection /// - public async Task Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken) + public async Task Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken) { if (deletedQueues.Contains(queueName)) return null; @@ -212,7 +216,7 @@ namespace Tapeti.Connection long capturedConnectionReference = -1; - string consumerTag = null; + string? consumerTag = null; await GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(channel => { @@ -224,7 +228,9 @@ namespace Tapeti.Connection consumerTag = channel.BasicConsume(queueName, false, basicConsumer); }); - return new TapetiConsumerTag(capturedConnectionReference, consumerTag); + return consumerTag == null + ? null + : new TapetiConsumerTag(capturedConnectionReference, consumerTag); } @@ -289,7 +295,7 @@ namespace Tapeti.Connection } - private async Task GetDurableQueueDeclareRequired(string queueName, IRabbitMQArguments arguments) + private async Task GetDurableQueueDeclareRequired(string queueName, IRabbitMQArguments? arguments) { var existingQueue = await GetQueueInfo(queueName); if (existingQueue == null) @@ -298,9 +304,6 @@ namespace Tapeti.Connection if (!existingQueue.Durable || existingQueue.AutoDelete || existingQueue.Exclusive) throw new InvalidOperationException($"Durable queue {queueName} already exists with incompatible parameters, durable = {existingQueue.Durable} (expected True), autoDelete = {existingQueue.AutoDelete} (expected False), exclusive = {existingQueue.Exclusive} (expected False)"); - if (arguments == null && existingQueue.Arguments.Count == 0) - return true; - var existingArguments = ConvertJsonArguments(existingQueue.Arguments); if (existingArguments.NullSafeSameValues(arguments)) return true; @@ -310,7 +313,7 @@ namespace Tapeti.Connection } - private static RabbitMQArguments ConvertJsonArguments(IReadOnlyDictionary arguments) + private static RabbitMQArguments? ConvertJsonArguments(IReadOnlyDictionary? arguments) { if (arguments == null) return null; @@ -325,7 +328,6 @@ namespace Tapeti.Connection JTokenType.Float => pair.Value.Value(), JTokenType.String => Encoding.UTF8.GetBytes(pair.Value.Value() ?? string.Empty), JTokenType.Boolean => pair.Value.Value(), - JTokenType.Null => null, _ => throw new ArgumentOutOfRangeException(nameof(arguments)) }; @@ -338,7 +340,7 @@ namespace Tapeti.Connection /// - public async Task DurableQueueDeclare(string queueName, IEnumerable bindings, IRabbitMQArguments arguments, CancellationToken cancellationToken) + public async Task DurableQueueDeclare(string queueName, IEnumerable bindings, IRabbitMQArguments? arguments, CancellationToken cancellationToken) { var declareRequired = await GetDurableQueueDeclareRequired(queueName, arguments); @@ -373,7 +375,7 @@ namespace Tapeti.Connection } - private static IDictionary GetDeclareArguments(IRabbitMQArguments arguments) + private static IDictionary? GetDeclareArguments(IRabbitMQArguments? arguments) { return arguments == null || arguments.Count == 0 ? null @@ -382,7 +384,7 @@ namespace Tapeti.Connection /// - public async Task DurableQueueVerify(string queueName, IRabbitMQArguments arguments, CancellationToken cancellationToken) + public async Task DurableQueueVerify(string queueName, IRabbitMQArguments? arguments, CancellationToken cancellationToken) { if (!await GetDurableQueueDeclareRequired(queueName, arguments)) return; @@ -484,9 +486,9 @@ namespace Tapeti.Connection /// - public async Task DynamicQueueDeclare(string queuePrefix, IRabbitMQArguments arguments, CancellationToken cancellationToken) + public async Task DynamicQueueDeclare(string? queuePrefix, IRabbitMQArguments? arguments, CancellationToken cancellationToken) { - string queueName = null; + string? queueName = null; var bindingLogger = logger as IBindingLogger; await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => @@ -507,6 +509,10 @@ namespace Tapeti.Connection } }); + cancellationToken.ThrowIfCancellationRequested(); + if (queueName == null) + throw new InvalidOperationException("Failed to declare dynamic queue"); + return queueName; } @@ -528,9 +534,9 @@ namespace Tapeti.Connection /// public async Task Close() { - IModel capturedConsumeModel; - IModel capturedPublishModel; - RabbitMQ.Client.IConnection capturedConnection; + IModel? capturedConsumeModel; + IModel? capturedPublishModel; + RabbitMQ.Client.IConnection? capturedConnection; lock (connectionLock) { @@ -578,10 +584,10 @@ namespace Tapeti.Connection private class ManagementQueueInfo { [JsonProperty("name")] - public string Name { get; set; } + public string? Name { get; set; } [JsonProperty("vhost")] - public string VHost { get; set; } + public string? VHost { get; set; } [JsonProperty("durable")] public bool Durable { get; set; } @@ -593,7 +599,7 @@ namespace Tapeti.Connection public bool Exclusive { get; set; } [JsonProperty("arguments")] - public Dictionary Arguments { get; set; } + public Dictionary? Arguments { get; set; } [JsonProperty("messages")] public uint Messages { get; set; } @@ -601,7 +607,7 @@ namespace Tapeti.Connection - private async Task GetQueueInfo(string queueName) + private async Task GetQueueInfo(string queueName) { var virtualHostPath = Uri.EscapeDataString(connectionParams.VirtualHost); var queuePath = Uri.EscapeDataString(queueName); @@ -622,25 +628,25 @@ namespace Tapeti.Connection private class ManagementBinding { [JsonProperty("source")] - public string Source { get; set; } + public string? Source { get; set; } [JsonProperty("vhost")] - public string Vhost { get; set; } + public string? Vhost { get; set; } [JsonProperty("destination")] - public string Destination { get; set; } + public string? Destination { get; set; } [JsonProperty("destination_type")] - public string DestinationType { get; set; } + public string? DestinationType { get; set; } [JsonProperty("routing_key")] - public string RoutingKey { get; set; } + public string? RoutingKey { get; set; } [JsonProperty("arguments")] - public Dictionary Arguments { get; set; } + public Dictionary? Arguments { get; set; } [JsonProperty("properties_key")] - public string PropertiesKey { get; set; } + public string? PropertiesKey { get; set; } } @@ -658,8 +664,8 @@ namespace Tapeti.Connection // Filter out the binding to an empty source, which is always present for direct-to-queue routing return bindings? - .Where(binding => !string.IsNullOrEmpty(binding.Source)) - .Select(binding => new QueueBinding(binding.Source, binding.RoutingKey)) + .Where(binding => !string.IsNullOrEmpty(binding.Source) && !string.IsNullOrEmpty(binding.RoutingKey)) + .Select(binding => new QueueBinding(binding.Source!, binding.RoutingKey!)) ?? Enumerable.Empty(); }); } @@ -723,9 +729,6 @@ namespace Tapeti.Connection private void DeclareExchange(IModel channel, string exchange) { - if (string.IsNullOrEmpty(exchange)) - return; - if (declaredExchanges.Contains(exchange)) return; @@ -791,9 +794,9 @@ namespace Tapeti.Connection { try { - RabbitMQ.Client.IConnection capturedConnection; - IModel capturedConsumeChannelModel; - IModel capturedPublishChannelModel; + RabbitMQ.Client.IConnection? capturedConnection; + IModel? capturedConsumeChannelModel; + IModel? capturedPublishChannelModel; lock (connectionLock) @@ -805,7 +808,7 @@ namespace Tapeti.Connection { try { - if (connection.IsOpen) + if (connection is { IsOpen: true }) connection.Close(); } catch (AlreadyClosedException) @@ -813,7 +816,7 @@ namespace Tapeti.Connection } finally { - connection.Dispose(); + connection?.Dispose(); } connection = null; @@ -873,12 +876,7 @@ namespace Tapeti.Connection consumeChannelModel = null; } - ConnectionEventListener?.Disconnected(new DisconnectedEventArgs - { - ReplyCode = e.ReplyCode, - ReplyText = e.ReplyText - }); - + ConnectionEventListener?.Disconnected(new DisconnectedEventArgs(e.ReplyCode, e.ReplyText)); logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText)); // Reconnect if the disconnect was unexpected @@ -906,11 +904,7 @@ namespace Tapeti.Connection connectedDateTime = DateTime.UtcNow; - var connectedEventArgs = new ConnectedEventArgs - { - ConnectionParams = connectionParams, - LocalPort = capturedConnection.LocalPort - }; + var connectedEventArgs = new ConnectedEventArgs(connectionParams, capturedConnection.LocalPort); if (isReconnect) ConnectionEventListener?.Reconnected(connectedEventArgs); @@ -938,7 +932,7 @@ namespace Tapeti.Connection } - private void HandleBasicReturn(object sender, BasicReturnEventArgs e) + private void HandleBasicReturn(object? sender, BasicReturnEventArgs e) { /* * "If the message is also published as mandatory, the basic.return is sent to the client before basic.ack." @@ -968,7 +962,7 @@ namespace Tapeti.Connection } - private void HandleBasicAck(object sender, BasicAckEventArgs e) + private void HandleBasicAck(object? sender, BasicAckEventArgs e) { Monitor.Enter(confirmLock); try @@ -999,7 +993,7 @@ namespace Tapeti.Connection } - private void HandleBasicNack(object sender, BasicNackEventArgs e) + private void HandleBasicNack(object? sender, BasicNackEventArgs e) { Monitor.Enter(confirmLock); try @@ -1048,10 +1042,10 @@ namespace Tapeti.Connection public TapetiConnectionParams ConnectionParams { get; } public bool IsReconnect { get; } public int LocalPort { get; } - public Exception Exception { get; } + public Exception? Exception { get; } - public ConnectContext(TapetiConnectionParams connectionParams, bool isReconnect, int localPort = 0, Exception exception = null) + public ConnectContext(TapetiConnectionParams connectionParams, bool isReconnect, int localPort = 0, Exception? exception = null) { ConnectionParams = connectionParams; IsReconnect = isReconnect; diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index ceee5c6..fb0ddb8 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -40,7 +40,7 @@ namespace Tapeti.Connection /// public async Task Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body) { - object message = null; + object? message = null; try { try @@ -73,7 +73,7 @@ namespace Tapeti.Connection RawBody = body, Message = message, Properties = properties, - Binding = null, + Binding = new ExceptionContextBinding(queueName), ConnectionClosed = CancellationToken.None }; @@ -184,5 +184,42 @@ namespace Tapeti.Connection public string RoutingKey; public IMessageProperties Properties; } + + + private class ExceptionContextBinding : IBinding + { + public string? QueueName { get; } + public QueueType? QueueType => null; + + + public ExceptionContextBinding(string? queueName) + { + QueueName = queueName; + } + + + public ValueTask Apply(IBindingTarget target) + { + throw new InvalidOperationException("Apply method should not be called on a binding in an Exception context"); + } + + + public bool Accept(Type messageClass) + { + throw new InvalidOperationException("Accept method should not be called on a binding in an Exception context"); + } + + + public ValueTask Invoke(IMessageContext context) + { + throw new InvalidOperationException("Invoke method should not be called on a binding in an Exception context"); + } + + + public ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult) + { + throw new InvalidOperationException("Cleanup method should not be called on a binding in an Exception context"); + } + } } } diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs index f383b24..8f18b10 100644 --- a/Tapeti/Connection/TapetiPublisher.cs +++ b/Tapeti/Connection/TapetiPublisher.cs @@ -38,14 +38,14 @@ namespace Tapeti.Connection /// - public async Task PublishRequest(TRequest message, Expression>> responseMethodSelector) where TController : class + public async Task PublishRequest(TRequest message, Expression>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class { await PublishRequest(message, responseMethodSelector.Body); } /// - public async Task PublishRequest(TRequest message, Expression>> responseMethodSelector) where TController : class + public async Task PublishRequest(TRequest message, Expression>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class { await PublishRequest(message, responseMethodSelector.Body); } @@ -97,7 +97,7 @@ namespace Tapeti.Connection /// - public async Task Publish(object message, IMessageProperties properties, bool mandatory) + public async Task Publish(object message, IMessageProperties? properties, bool mandatory) { var messageClass = message.GetType(); var exchange = exchangeStrategy.GetExchange(messageClass); @@ -108,13 +108,13 @@ namespace Tapeti.Connection /// - public async Task PublishDirect(object message, string queueName, IMessageProperties properties, bool mandatory) + public async Task PublishDirect(object message, string queueName, IMessageProperties? properties, bool mandatory) { await Publish(message, properties, null, queueName, mandatory); } - private async Task Publish(object message, IMessageProperties properties, string exchange, string routingKey, bool mandatory) + private async Task Publish(object message, IMessageProperties? properties, string? exchange, string routingKey, bool mandatory) { var writableProperties = new MessageProperties(properties); @@ -151,11 +151,11 @@ namespace Tapeti.Connection private class PublishContext : IPublishContext { - public ITapetiConfig Config { get; init; } - public string Exchange { get; set; } - public string RoutingKey { get; init; } - public object Message { get; init; } - public IMessageProperties Properties { get; init; } + public ITapetiConfig Config { get; init; } = null!; + public string? Exchange { get; set; } + public string RoutingKey { get; init; } = null!; + public object Message { get; init; } = null!; + public IMessageProperties? Properties { get; init; } } } } diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index 1e13cb9..ad3bab8 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -16,7 +16,7 @@ namespace Tapeti.Connection private bool consuming; private readonly List consumerTags = new(); - private CancellationTokenSource initializeCancellationTokenSource; + private CancellationTokenSource? initializeCancellationTokenSource; public TapetiSubscriber(Func clientFactory, ITapetiConfig config) @@ -141,15 +141,24 @@ namespace Tapeti.Connection private async Task ConsumeQueues(CancellationToken cancellationToken) { - var queues = config.Bindings.GroupBy(binding => binding.QueueName); - - consumerTags.AddRange((await Task.WhenAll(queues.Select(async group => + var queues = config.Bindings.GroupBy(binding => { - var queueName = group.Key; - var consumer = new TapetiConsumer(cancellationToken, config, queueName, group); + if (string.IsNullOrEmpty(binding.QueueName)) + throw new InvalidOperationException("QueueName must not be empty"); - return await clientFactory().Consume(queueName, consumer, cancellationToken); - }))).Where(t => t != null)); + return binding.QueueName; + }); + + consumerTags.AddRange( + (await Task.WhenAll(queues.Select(async group => + { + var queueName = group.Key; + var consumer = new TapetiConsumer(cancellationToken, config, queueName, group); + + return await clientFactory().Consume(queueName, consumer, cancellationToken); + }))) + .Where(t => t?.ConsumerTag != null) + .Cast()); } @@ -164,7 +173,7 @@ namespace Tapeti.Connection { public string QueueName; public List MessageClasses; - public IRabbitMQArguments Arguments; + public IRabbitMQArguments? Arguments; } private readonly Dictionary> dynamicQueues = new(); @@ -185,12 +194,12 @@ namespace Tapeti.Connection } - public abstract ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments arguments); - public abstract ValueTask BindDurableDirect(string queueName, IRabbitMQArguments arguments); + public abstract ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments); + public abstract ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments); public abstract ValueTask BindDurableObsolete(string queueName); - public async ValueTask BindDynamic(Type messageClass, string queuePrefix, IRabbitMQArguments arguments) + public async ValueTask BindDynamic(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments) { var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments); if (!result.IsNewMessageClass) @@ -205,14 +214,14 @@ namespace Tapeti.Connection } - public async ValueTask BindDynamicDirect(Type messageClass, string queuePrefix, IRabbitMQArguments arguments) + public async ValueTask BindDynamicDirect(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments) { var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments); return result.QueueName; } - public async ValueTask BindDynamicDirect(string queuePrefix, IRabbitMQArguments arguments) + public async ValueTask BindDynamicDirect(string? queuePrefix, IRabbitMQArguments? arguments) { // If we don't know the routing key, always create a new queue to ensure there is no overlap. // Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either. @@ -226,7 +235,7 @@ namespace Tapeti.Connection public bool IsNewMessageClass; } - private async Task DeclareDynamicQueue(Type messageClass, string queuePrefix, IRabbitMQArguments arguments) + private async Task DeclareDynamicQueue(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments) { // Group by prefix var key = queuePrefix ?? ""; @@ -282,7 +291,7 @@ namespace Tapeti.Connection private struct DurableQueueInfo { public List MessageClasses; - public IRabbitMQArguments Arguments; + public IRabbitMQArguments? Arguments; } @@ -295,7 +304,7 @@ namespace Tapeti.Connection } - public override ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments arguments) + public override ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments) { // Collect the message classes per queue so we can determine afterwards // if any of the bindings currently set on the durable queue are no @@ -324,7 +333,7 @@ namespace Tapeti.Connection } - public override ValueTask BindDurableDirect(string queueName, IRabbitMQArguments arguments) + public override ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments) { if (!durableQueues.TryGetValue(queueName, out var durableQueueInfo)) { @@ -396,12 +405,12 @@ namespace Tapeti.Connection } - public override async ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments arguments) + public override async ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments) { await VerifyDurableQueue(queueName, arguments); } - public override async ValueTask BindDurableDirect(string queueName, IRabbitMQArguments arguments) + public override async ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments) { await VerifyDurableQueue(queueName, arguments); } @@ -412,7 +421,7 @@ namespace Tapeti.Connection } - private async Task VerifyDurableQueue(string queueName, IRabbitMQArguments arguments) + private async Task VerifyDurableQueue(string queueName, IRabbitMQArguments? arguments) { if (!durableQueues.Add(queueName)) return; @@ -429,12 +438,12 @@ namespace Tapeti.Connection } - public override ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments arguments) + public override ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments) { return default; } - public override ValueTask BindDurableDirect(string queueName, IRabbitMQArguments arguments) + public override ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments) { return default; } diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index ed7aa15..e195c23 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -80,14 +80,17 @@ namespace Tapeti.Default } /// - public void QueueExistsWarning(string queueName, IRabbitMQArguments existingArguments, IRabbitMQArguments arguments) + public void QueueExistsWarning(string queueName, IRabbitMQArguments? existingArguments, IRabbitMQArguments? arguments) { Console.WriteLine($"[Tapeti] Durable queue {queueName} exists with incompatible x-arguments ({GetArgumentsText(existingArguments)} vs. {GetArgumentsText(arguments)}) and will not be redeclared, queue will be consumed as-is"); } - private static string GetArgumentsText(IRabbitMQArguments arguments) + private static string GetArgumentsText(IRabbitMQArguments? arguments) { + if (arguments == null || arguments.Count == 0) + return "empty"; + var argumentsText = new StringBuilder(); foreach (var pair in arguments) { diff --git a/Tapeti/Default/ControllerBindingContext.cs b/Tapeti/Default/ControllerBindingContext.cs index 327cf6b..8d96345 100644 --- a/Tapeti/Default/ControllerBindingContext.cs +++ b/Tapeti/Default/ControllerBindingContext.cs @@ -26,7 +26,7 @@ namespace Tapeti.Default /// - public Type MessageClass { get; set; } + public Type? MessageClass { get; set; } /// public bool HasMessageClass => MessageClass != null; @@ -44,10 +44,12 @@ namespace Tapeti.Default public IBindingResult Result => result; - public ControllerBindingContext(IEnumerable parameters, ParameterInfo result) + public ControllerBindingContext(Type controller, MethodInfo method, IEnumerable parameters, ParameterInfo result) { - this.parameters = parameters.Select(parameter => new ControllerBindingParameter(parameter)).ToList(); + Controller = controller; + Method = method; + this.parameters = parameters.Select(parameter => new ControllerBindingParameter(parameter)).ToList(); this.result = new ControllerBindingResult(result); } @@ -84,7 +86,13 @@ namespace Tapeti.Default /// public IEnumerable GetParameterHandlers() { - return parameters.Select(p => p.Binding); + return parameters.Select(p => + { + if (p.Binding == null) + throw new TopologyConfigurationException($"No Binding for parameter {p.Info.Name}"); + + return p.Binding; + }); } @@ -92,7 +100,7 @@ namespace Tapeti.Default /// Returns the configured result handler. /// /// - public ResultHandler GetResultHandler() + public ResultHandler? GetResultHandler() { return result.Handler; } @@ -107,7 +115,7 @@ namespace Tapeti.Default /// /// Provides access to the configured binding. /// - public ValueFactory Binding { get; set; } + public ValueFactory? Binding { get; set; } /// @@ -146,7 +154,7 @@ namespace Tapeti.Default /// /// Provides access to the configured handler. /// - public ResultHandler Handler { get; set; } + public ResultHandler? Handler { get; set; } /// diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index 4d9ba9c..10d8ae4 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -60,7 +60,7 @@ namespace Tapeti.Default /// /// The return value handler. /// - public ResultHandler ResultHandler; + public ResultHandler? ResultHandler; /// @@ -87,10 +87,10 @@ namespace Tapeti.Default /// - public string QueueName { get; private set; } + public string? QueueName { get; private set; } /// - public QueueType QueueType => bindingInfo.QueueInfo.QueueType; + public QueueType? QueueType => bindingInfo.QueueInfo.QueueType; /// public Type Controller => bindingInfo.ControllerType; @@ -116,7 +116,7 @@ namespace Tapeti.Default switch (bindingInfo.BindingTargetMode) { case BindingTargetMode.Default: - if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic) + if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Dynamic) QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); else { @@ -127,7 +127,7 @@ namespace Tapeti.Default break; case BindingTargetMode.Direct: - if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic) + if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Dynamic) QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); else { @@ -141,7 +141,7 @@ namespace Tapeti.Default throw new ArgumentOutOfRangeException(nameof(bindingInfo.BindingTargetMode), bindingInfo.BindingTargetMode, "Invalid BindingTargetMode"); } } - else if (bindingInfo.QueueInfo.QueueType == QueueType.Durable) + else if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Durable) { await target.BindDurableObsolete(bindingInfo.QueueInfo.Name); QueueName = bindingInfo.QueueInfo.Name; @@ -159,8 +159,11 @@ namespace Tapeti.Default /// public async ValueTask Invoke(IMessageContext context) { + if (context.Binding == null) + throw new InvalidOperationException("Invoke should not be called on a context without a binding"); + var controller = Method.IsStatic ? null : dependencyResolver.Resolve(bindingInfo.ControllerType); - context.Store(new ControllerMessageContextPayload(controller, context.Binding as IControllerMethodBinding)); + context.Store(new ControllerMessageContextPayload(controller, (IControllerMethodBinding)context.Binding)); if (!await FilterAllowed(context)) return; @@ -202,7 +205,7 @@ namespace Tapeti.Default private delegate ValueTask MessageHandlerFunc(IMessageContext context); - private MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable parameterFactories, ResultHandler resultHandler) + private MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable parameterFactories, ResultHandler? resultHandler) { if (resultHandler != null) return WrapResultHandlerMethod(method.CreateExpressionInvoke(), parameterFactories, resultHandler); @@ -245,7 +248,7 @@ namespace Tapeti.Default { var controllerPayload = context.Get(); try - { + { invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()); return default; } @@ -296,8 +299,8 @@ namespace Tapeti.Default private void AddExceptionData(Exception exception) { - exception.Data["Tapeti.Controller.Name"] = bindingInfo.ControllerType?.FullName; - exception.Data["Tapeti.Controller.Method"] = bindingInfo.Method?.Name; + exception.Data["Tapeti.Controller.Name"] = bindingInfo.ControllerType.FullName; + exception.Data["Tapeti.Controller.Method"] = bindingInfo.Method.Name; } @@ -319,12 +322,19 @@ namespace Tapeti.Default /// /// Optional arguments (x-arguments) passed when declaring the queue. /// - public IRabbitMQArguments QueueArguments { get; set; } + public IRabbitMQArguments? QueueArguments { get; set; } /// /// Determines if the QueueInfo properties contain a valid combination. /// public bool IsValid => QueueType == QueueType.Dynamic || !string.IsNullOrEmpty(Name); + + + public QueueInfo(QueueType queueType, string name) + { + QueueType = queueType; + Name = name; + } } } } diff --git a/Tapeti/Default/FallbackStringEnumConverter.cs b/Tapeti/Default/FallbackStringEnumConverter.cs index 9af984f..12a5ab3 100644 --- a/Tapeti/Default/FallbackStringEnumConverter.cs +++ b/Tapeti/Default/FallbackStringEnumConverter.cs @@ -24,7 +24,7 @@ namespace Tapeti.Default /// - public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) + public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer) { if (value == null) { @@ -41,7 +41,7 @@ namespace Tapeti.Default /// - public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer) { var isNullable = IsNullableType(objectType); diff --git a/Tapeti/Default/JsonMessageSerializer.cs b/Tapeti/Default/JsonMessageSerializer.cs index bceb42a..881260b 100644 --- a/Tapeti/Default/JsonMessageSerializer.cs +++ b/Tapeti/Default/JsonMessageSerializer.cs @@ -46,7 +46,7 @@ namespace Tapeti.Default /// - public object Deserialize(byte[] body, IMessageProperties properties) + public object? Deserialize(byte[] body, IMessageProperties properties) { if (properties.ContentType is not ContentType) throw new ArgumentException($"content_type must be {ContentType}"); diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs index 3b72e77..697ed30 100644 --- a/Tapeti/Default/MessageContext.cs +++ b/Tapeti/Default/MessageContext.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; using Tapeti.Config; @@ -12,28 +13,28 @@ namespace Tapeti.Default /// - public ITapetiConfig Config { get; set; } + public ITapetiConfig Config { get; set; } = null!; /// - public string Queue { get; set; } + public string Queue { get; set; } = null!; /// - public string Exchange { get; set; } + public string Exchange { get; set; } = null!; /// - public string RoutingKey { get; set; } + public string RoutingKey { get; set; } = null!; /// - public byte[] RawBody { get; set; } + public byte[] RawBody { get; set; } = null!; /// - public object Message { get; set; } + public object? Message { get; set; } /// - public IMessageProperties Properties { get; set; } + public IMessageProperties Properties { get; set; } = null!; /// - public IBinding Binding { get; set; } + public IBinding Binding { get; set; } = null!; /// public CancellationToken ConnectionClosed { get; set; } @@ -57,7 +58,7 @@ namespace Tapeti.Default return (T)payloads[typeof(T)]; } - public bool TryGet(out T payload) where T : IMessageContextPayload + public bool TryGet([NotNullWhen(true)] out T? payload) where T : IMessageContextPayload { if (payloads.TryGetValue(typeof(T), out var payloadValue)) { @@ -100,7 +101,7 @@ namespace Tapeti.Default /// - public bool Get(string key, out T value) where T : class + public bool Get(string key, out T? value) where T : class { if (!TryGet(out var payload) || !payload.TryGetValue(key, out var objectValue)) @@ -109,7 +110,7 @@ namespace Tapeti.Default return false; } - value = (T)objectValue; + value = (T?)objectValue; return true; } @@ -132,7 +133,7 @@ namespace Tapeti.Default } - public bool TryGetValue(string key, out object value) + public bool TryGetValue(string key, out object? value) { return items.TryGetValue(key, out value); } diff --git a/Tapeti/Default/MessageProperties.cs b/Tapeti/Default/MessageProperties.cs index 6a4086b..6c3daab 100644 --- a/Tapeti/Default/MessageProperties.cs +++ b/Tapeti/Default/MessageProperties.cs @@ -13,13 +13,13 @@ namespace Tapeti.Default /// - public string ContentType { get; set; } + public string? ContentType { get; set; } /// - public string CorrelationId { get; set; } + public string? CorrelationId { get; set; } /// - public string ReplyTo { get; set; } + public string? ReplyTo { get; set; } /// public bool? Persistent { get; set; } @@ -37,7 +37,7 @@ namespace Tapeti.Default /// /// - public MessageProperties(IMessageProperties source) + public MessageProperties(IMessageProperties? source) { if (source == null) return; @@ -64,7 +64,7 @@ namespace Tapeti.Default } /// - public string GetHeader(string name) + public string? GetHeader(string name) { return headers.TryGetValue(name, out var value) ? value : null; } diff --git a/Tapeti/Default/PublishResultBinding.cs b/Tapeti/Default/PublishResultBinding.cs index b56dce0..efdc8a0 100644 --- a/Tapeti/Default/PublishResultBinding.cs +++ b/Tapeti/Default/PublishResultBinding.cs @@ -31,7 +31,7 @@ namespace Tapeti.Default // Tapeti 1.2: if you just want to publish another message as a result of the incoming message, explicitly call IPublisher.Publish. // ReSharper disable once ConvertIfStatementToSwitchStatement if (!hasClassResult && expectedClassResult != null || hasClassResult && expectedClassResult != actualType) - throw new ArgumentException($"Message handler for non-request message type {context.MessageClass?.FullName} must return type {expectedClassResult?.FullName ?? "void"} in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}, found: {actualType?.FullName ?? "void"}"); + throw new ArgumentException($"Message handler for non-request message type {context.MessageClass?.FullName} must return type {expectedClassResult?.FullName ?? "void"} in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}, found: {actualType.FullName ?? "void"}"); if (!hasClassResult) return; @@ -48,14 +48,22 @@ namespace Tapeti.Default var handler = GetType().GetMethod(nameof(PublishGenericTaskResult), BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType); Debug.Assert(handler != null, nameof(handler) + " != null"); - context.Result.SetHandler((messageContext, value) => (ValueTask)handler.Invoke(null, new[] { messageContext, value })); + context.Result.SetHandler((messageContext, value) => + { + var result = handler.Invoke(null, new[] { messageContext, value }); + return result != null ? (ValueTask)result : ValueTask.CompletedTask; + }); break; case TaskType.ValueTask: var valueTaskHandler = GetType().GetMethod(nameof(PublishGenericValueTaskResult), BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType); Debug.Assert(valueTaskHandler != null, nameof(handler) + " != null"); - context.Result.SetHandler((messageContext, value) => (ValueTask)valueTaskHandler.Invoke(null, new[] { messageContext, value })); + context.Result.SetHandler((messageContext, value) => + { + var result = valueTaskHandler.Invoke(null, new[] { messageContext, value }); + return result != null ? (ValueTask)result : ValueTask.CompletedTask; + }); break; default: @@ -79,7 +87,7 @@ namespace Tapeti.Default } - private static async ValueTask Reply(object message, IMessageContext messageContext) + private static async ValueTask Reply(object? message, IMessageContext messageContext) { if (message == null) throw new ArgumentException("Return value of a request message handler must not be null"); diff --git a/Tapeti/Default/RabbitMQMessageProperties.cs b/Tapeti/Default/RabbitMQMessageProperties.cs index cdce19b..d4d90e2 100644 --- a/Tapeti/Default/RabbitMQMessageProperties.cs +++ b/Tapeti/Default/RabbitMQMessageProperties.cs @@ -15,21 +15,21 @@ namespace Tapeti.Default /// - public string ContentType + public string? ContentType { get => BasicProperties.IsContentTypePresent() ? BasicProperties.ContentType : null; set { if (!string.IsNullOrEmpty(value)) BasicProperties.ContentType = value; else BasicProperties.ClearContentType(); } } /// - public string CorrelationId + public string? CorrelationId { get => BasicProperties.IsCorrelationIdPresent() ? BasicProperties.CorrelationId : null; set { if (!string.IsNullOrEmpty(value)) BasicProperties.CorrelationId = value; else BasicProperties.ClearCorrelationId(); } } /// - public string ReplyTo + public string? ReplyTo { get => BasicProperties.IsReplyToPresent() ? BasicProperties.ReplyTo : null; set { if (!string.IsNullOrEmpty(value)) BasicProperties.ReplyTo = value; else BasicProperties.ClearReplyTo(); } @@ -66,7 +66,7 @@ namespace Tapeti.Default /// /// - public RabbitMQMessageProperties(IBasicProperties basicProperties, IMessageProperties source) + public RabbitMQMessageProperties(IBasicProperties basicProperties, IMessageProperties? source) { BasicProperties = basicProperties; if (source == null) @@ -97,7 +97,7 @@ namespace Tapeti.Default /// - public string GetHeader(string name) + public string? GetHeader(string name) { if (BasicProperties.Headers == null) return null; diff --git a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs index 0074c61..5a6b2b2 100644 --- a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs +++ b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs @@ -60,7 +60,7 @@ namespace Tapeti.Default } - private static List SplitPascalCase(string value) + private static List? SplitPascalCase(string value) { var split = SeparatorRegex.Split(value); if (split.Length == 0) diff --git a/Tapeti/Helpers/ConnectionstringParser.cs b/Tapeti/Helpers/ConnectionstringParser.cs index 906618a..c872a79 100644 --- a/Tapeti/Helpers/ConnectionstringParser.cs +++ b/Tapeti/Helpers/ConnectionstringParser.cs @@ -10,7 +10,7 @@ namespace Tapeti.Helpers /// public class ConnectionStringParser { - private readonly TapetiConnectionParams result = new TapetiAppSettingsConnectionParams(); + private readonly TapetiConnectionParams result = new(); private readonly string connectionstring; private int pos = -1; diff --git a/Tapeti/Helpers/DictionaryHelper.cs b/Tapeti/Helpers/DictionaryHelper.cs index ed22529..93af905 100644 --- a/Tapeti/Helpers/DictionaryHelper.cs +++ b/Tapeti/Helpers/DictionaryHelper.cs @@ -10,7 +10,7 @@ namespace Tapeti.Helpers /// /// Checks if two dictionaries are considered compatible. If either is null they are considered empty. /// - public static bool NullSafeSameValues(this IReadOnlyDictionary arguments1, IReadOnlyDictionary arguments2) + public static bool NullSafeSameValues(this IReadOnlyDictionary? arguments1, IReadOnlyDictionary? arguments2) { if (arguments1 == null || arguments2 == null) return (arguments1 == null || arguments1.Count == 0) && (arguments2 == null || arguments2.Count == 0); diff --git a/Tapeti/Helpers/ExpressionInvoker.cs b/Tapeti/Helpers/ExpressionInvoker.cs index b0d6a1b..4fa8ece 100644 --- a/Tapeti/Helpers/ExpressionInvoker.cs +++ b/Tapeti/Helpers/ExpressionInvoker.cs @@ -16,7 +16,7 @@ namespace Tapeti.Helpers /// /// The instance on which the method should be called. /// The arguments passed to the method. - public delegate object ExpressionInvoke(object target, params object[] args); + public delegate object ExpressionInvoke(object? target, params object?[] args); /// diff --git a/Tapeti/Helpers/MiddlewareHelper.cs b/Tapeti/Helpers/MiddlewareHelper.cs index 0bdf9a0..cdd3446 100644 --- a/Tapeti/Helpers/MiddlewareHelper.cs +++ b/Tapeti/Helpers/MiddlewareHelper.cs @@ -16,7 +16,7 @@ namespace Tapeti.Helpers /// Receives the middleware which should be called and a reference to the action which will call the next. Pass this on to the middleware. /// The action to execute when the innermost middleware calls next. /// - public static void Go(IReadOnlyList middleware, Action handle, Action lastHandler) + public static void Go(IReadOnlyList? middleware, Action handle, Action lastHandler) { var handlerIndex = middleware?.Count - 1 ?? -1; if (middleware == null || handlerIndex == -1) @@ -45,7 +45,7 @@ namespace Tapeti.Helpers /// Receives the middleware which should be called and a reference to the action which will call the next. Pass this on to the middleware. /// The action to execute when the innermost middleware calls next. /// - public static async ValueTask GoAsync(IReadOnlyList middleware, Func, ValueTask> handle, Func lastHandler) + public static async ValueTask GoAsync(IReadOnlyList? middleware, Func, ValueTask> handle, Func lastHandler) { var handlerIndex = middleware?.Count - 1 ?? -1; if (middleware == null || handlerIndex == -1) diff --git a/Tapeti/IConnection.cs b/Tapeti/IConnection.cs index 8aab48a..da478e0 100644 --- a/Tapeti/IConnection.cs +++ b/Tapeti/IConnection.cs @@ -7,19 +7,27 @@ using System.Threading.Tasks; namespace Tapeti { /// - /// + /// Contains information about the established connection. /// public class ConnectedEventArgs { /// /// The connection parameters used to establish the connection. /// - public TapetiConnectionParams ConnectionParams; + public TapetiConnectionParams ConnectionParams { get; } /// /// The local port for the connection. Useful for identifying the connection in the management interface. /// - public int LocalPort; + public int LocalPort { get; } + + + /// + public ConnectedEventArgs(TapetiConnectionParams connectionParams, int localPort) + { + ConnectionParams = connectionParams; + LocalPort = localPort; + } } @@ -31,12 +39,20 @@ namespace Tapeti /// /// The ReplyCode as indicated by the client library /// - public ushort ReplyCode; + public ushort ReplyCode { get; } /// /// The ReplyText as indicated by the client library /// - public string ReplyText; + public string ReplyText { get; } + + + /// + public DisconnectedEventArgs(ushort replyCode, string replyText) + { + ReplyCode = replyCode; + ReplyText = replyText; + } } diff --git a/Tapeti/ILogger.cs b/Tapeti/ILogger.cs index cb99434..e57a7ec 100644 --- a/Tapeti/ILogger.cs +++ b/Tapeti/ILogger.cs @@ -32,7 +32,7 @@ namespace Tapeti /// /// The exception that caused the connection to fail. /// - Exception Exception { get; } + Exception? Exception { get; } } @@ -138,7 +138,7 @@ namespace Tapeti /// The name of the queue that is declared /// The x-arguments of the existing queue /// The x-arguments of the queue that would be declared - void QueueExistsWarning(string queueName, IRabbitMQArguments existingArguments, IRabbitMQArguments arguments); + void QueueExistsWarning(string queueName, IRabbitMQArguments? existingArguments, IRabbitMQArguments? arguments); /// /// Called before a binding is added to a queue. diff --git a/Tapeti/IMessageSerializer.cs b/Tapeti/IMessageSerializer.cs index b2bbfc4..679e7b2 100644 --- a/Tapeti/IMessageSerializer.cs +++ b/Tapeti/IMessageSerializer.cs @@ -21,6 +21,6 @@ namespace Tapeti /// The encoded message /// The properties as sent along with the message /// A decoded instance of the message - object Deserialize(byte[] body, IMessageProperties properties); + object? Deserialize(byte[] body, IMessageProperties properties); } } diff --git a/Tapeti/IPublisher.cs b/Tapeti/IPublisher.cs index 885133f..3e0ac58 100644 --- a/Tapeti/IPublisher.cs +++ b/Tapeti/IPublisher.cs @@ -29,7 +29,7 @@ namespace Tapeti /// /// An expression defining the method which handles the response. Example: c => c.HandleResponse /// The message to send - Task PublishRequest(TRequest message, Expression>> responseMethodSelector) where TController : class; + Task PublishRequest(TRequest message, Expression>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class; /// @@ -42,7 +42,7 @@ namespace Tapeti /// /// An expression defining the method which handles the response. Example: c => c.HandleResponse /// The message to send - Task PublishRequest(TRequest message, Expression>> responseMethodSelector) where TController : class; + Task PublishRequest(TRequest message, Expression>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class; /// @@ -69,7 +69,7 @@ namespace Tapeti /// An instance of a message class /// Metadata to include in the message /// If true, an exception will be raised if the message can not be delivered to at least one queue - Task Publish(object message, IMessageProperties properties, bool mandatory); + Task Publish(object message, IMessageProperties? properties, bool mandatory); /// @@ -80,6 +80,6 @@ namespace Tapeti /// Metadata to include in the message /// If true, an exception will be raised if the message can not be delivered to the queue /// - Task PublishDirect(object message, string queueName, IMessageProperties properties, bool mandatory); + Task PublishDirect(object message, string queueName, IMessageProperties? properties, bool mandatory); } } diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 9991245..2d1bd84 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -12,6 +12,7 @@ https://github.com/MvRens/Tapeti Tapeti.png 9 + enable diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index ca451c4..ee48d3a 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -17,7 +17,7 @@ namespace Tapeti /// public class TapetiConfig : ITapetiConfigBuilderAccess { - private Config config; + private Config? config; private readonly List bindingMiddleware = new(); @@ -92,29 +92,25 @@ namespace Tapeti var configInstance = GetConfig(); - var middlewareBundle = extension.GetMiddleware(DependencyResolver); - if (middlewareBundle != null) + foreach (var middleware in extension.GetMiddleware(DependencyResolver)) { - foreach (var middleware in middlewareBundle) + switch (middleware) { - switch (middleware) - { - case IControllerBindingMiddleware bindingExtension: - Use(bindingExtension); - break; + case IControllerBindingMiddleware bindingExtension: + Use(bindingExtension); + break; - case IMessageMiddleware messageExtension: - configInstance.Use(messageExtension); - break; + case IMessageMiddleware messageExtension: + configInstance.Use(messageExtension); + break; - case IPublishMiddleware publishExtension: - configInstance.Use(publishExtension); - break; + case IPublishMiddleware publishExtension: + configInstance.Use(publishExtension); + break; - default: - throw new ArgumentException( - $"Unsupported middleware implementation: {middleware?.GetType().Name ?? "null"}"); - } + default: + throw new ArgumentException( + $"Unsupported middleware implementation: {middleware.GetType().Name}"); } } @@ -123,7 +119,7 @@ namespace Tapeti return this; foreach (var binding in bindingBundle) - config.RegisterBinding(binding); + GetConfig().RegisterBinding(binding); return this; } @@ -313,17 +309,23 @@ namespace Tapeti internal class ConfigBindings : List, ITapetiConfigBindings { - private Dictionary methodLookup; + private Dictionary? methodLookup; - public IControllerMethodBinding ForMethod(Delegate method) + public IControllerMethodBinding? ForMethod(Delegate method) { + if (methodLookup == null) + throw new InvalidOperationException("Lock must be called first"); + return methodLookup.TryGetValue(method.Method, out var binding) ? binding : null; } - public IControllerMethodBinding ForMethod(MethodInfo method) + public IControllerMethodBinding? ForMethod(MethodInfo method) { + if (methodLookup == null) + throw new InvalidOperationException("Lock must be called first"); + return methodLookup.TryGetValue(method, out var binding) ? binding : null; } diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs index 4307171..8459445 100644 --- a/Tapeti/TapetiConfigControllers.cs +++ b/Tapeti/TapetiConfigControllers.cs @@ -1,6 +1,7 @@ using System; using System.Linq; using System.Reflection; +using System.Text; using Tapeti.Annotations; using Tapeti.Config; using Tapeti.Connection; @@ -49,12 +50,7 @@ namespace Tapeti { var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute() != null; - var context = new ControllerBindingContext(method.GetParameters(), method.ReturnParameter) - { - Controller = controller, - Method = method - }; - + var context = new ControllerBindingContext(controller, method, method.GetParameters(), method.ReturnParameter); if (method.GetCustomAttribute() != null) context.SetBindingTargetMode(BindingTargetMode.Direct); @@ -124,11 +120,15 @@ namespace Tapeti /// public static ITapetiConfigBuilder RegisterAllControllers(this ITapetiConfigBuilder builder) { - return RegisterAllControllers(builder, Assembly.GetEntryAssembly()); + var assembly = Assembly.GetEntryAssembly(); + if (assembly == null) + throw new InvalidOperationException("No EntryAssembly"); + + return RegisterAllControllers(builder, assembly); } - private static ControllerMethodBinding.QueueInfo GetQueueInfo(MemberInfo member, ControllerMethodBinding.QueueInfo fallbackQueueInfo) + private static ControllerMethodBinding.QueueInfo? GetQueueInfo(MemberInfo member, ControllerMethodBinding.QueueInfo? fallbackQueueInfo) { var dynamicQueueAttribute = member.GetCustomAttribute(); var durableQueueAttribute = member.GetCustomAttribute(); @@ -157,26 +157,33 @@ namespace Tapeti } else { - queueType = fallbackQueueInfo.QueueType; + queueType = fallbackQueueInfo!.QueueType; name = fallbackQueueInfo.Name; } - return new ControllerMethodBinding.QueueInfo + return new ControllerMethodBinding.QueueInfo(queueType, name) { - QueueType = queueType, - Name = name, QueueArguments = GetQueueArguments(queueArgumentsAttribute) ?? fallbackQueueInfo?.QueueArguments }; } - private static IRabbitMQArguments GetQueueArguments(QueueArgumentsAttribute queueArgumentsAttribute) + private static IRabbitMQArguments? GetQueueArguments(QueueArgumentsAttribute? queueArgumentsAttribute) { if (queueArgumentsAttribute == null) return null; - var arguments = new RabbitMQArguments(queueArgumentsAttribute.CustomArguments); - + var arguments = new RabbitMQArguments(queueArgumentsAttribute.CustomArguments.ToDictionary( + p => p.Key, + p => p.Value switch + { + string stringValue => Encoding.UTF8.GetBytes(stringValue), + _ => p.Value + } + )) + { + + }; if (queueArgumentsAttribute.MaxLength > 0) arguments.Add(@"x-max-length", queueArgumentsAttribute.MaxLength); diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index 1bd4bdf..8ab7802 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -24,10 +24,10 @@ namespace Tapeti /// This property must be set before first subscribing or publishing, otherwise it /// will use the default connection parameters. /// - public TapetiConnectionParams Params { get; set; } + public TapetiConnectionParams? Params { get; set; } private readonly Lazy client; - private TapetiSubscriber subscriber; + private TapetiSubscriber? subscriber; private bool disposed; @@ -48,13 +48,13 @@ namespace Tapeti } /// - public event ConnectedEventHandler Connected; + public event ConnectedEventHandler? Connected; /// - public event DisconnectedEventHandler Disconnected; + public event DisconnectedEventHandler? Disconnected; /// - public event ConnectedEventHandler Reconnected; + public event ConnectedEventHandler? Reconnected; /// diff --git a/Tapeti/TapetiConnectionParams.cs b/Tapeti/TapetiConnectionParams.cs index 5e2258e..43f923f 100644 --- a/Tapeti/TapetiConnectionParams.cs +++ b/Tapeti/TapetiConnectionParams.cs @@ -10,7 +10,7 @@ namespace Tapeti /// public class TapetiConnectionParams { - private IDictionary clientProperties; + private IDictionary? clientProperties; /// @@ -59,7 +59,7 @@ namespace Tapeti /// If any of the default keys used by the RabbitMQ Client library (product, version) are specified their value /// will be overwritten. See DefaultClientProperties in Connection.cs in the RabbitMQ .NET client source for the default values. /// - public IDictionary ClientProperties { + public IDictionary? ClientProperties { get => clientProperties ??= new Dictionary(); set => clientProperties = value; } diff --git a/Tapeti/Tasks/SingleThreadTaskQueue.cs b/Tapeti/Tasks/SingleThreadTaskQueue.cs index 19303ae..30032bb 100644 --- a/Tapeti/Tasks/SingleThreadTaskQueue.cs +++ b/Tapeti/Tasks/SingleThreadTaskQueue.cs @@ -120,7 +120,7 @@ namespace Tapeti.Tasks { while (true) { - Task task; + Task? task; lock (scheduledTasks) { task = WaitAndDequeueTask(); @@ -133,7 +133,7 @@ namespace Tapeti.Tasks } } - private Task WaitAndDequeueTask() + private Task? WaitAndDequeueTask() { while (!scheduledTasks.Any() && !disposed) Monitor.Wait(scheduledTasks);