Implemented nullable reference types support

This commit is contained in:
Mark van Renswoude 2022-11-23 09:13:38 +01:00
parent bcdb376256
commit 97672f4321
96 changed files with 706 additions and 510 deletions

View File

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_01_PublishSubscribe</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_02_DeclareDurableQueues</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_03_FlowRequestResponse</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -15,9 +15,9 @@ namespace _03_FlowRequestResponse
private readonly IFlowProvider flowProvider;
private readonly IExampleState exampleState;
public string FirstQuote;
public string SecondQuote;
public string ThirdQuote;
public string? FirstQuote;
public string? SecondQuote;
public string? ThirdQuote;
public ParallelFlowController(IFlowProvider flowProvider, IExampleState exampleState)

View File

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_04_Transient</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_05_SpeedTest</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_06_StatelessRequestResponse</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_07_ParallelizationTest</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -107,7 +107,7 @@ namespace _07_ParallelizationTest
private int count;
private readonly object waitLock = new();
private TaskCompletionSource<bool> batchReachedTask = new();
private Timer messageExpectedTimer;
private Timer? messageExpectedTimer;
private readonly TimeSpan messageExpectedTimeout = TimeSpan.FromMilliseconds(5000);

View File

@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_08_MessageHandlerLogging</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -3,6 +3,7 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -1,4 +1,5 @@
using System;
using System.Diagnostics.CodeAnalysis;
using Autofac;
using Autofac.Builder;
@ -14,22 +15,21 @@ namespace Tapeti.Autofac
/// </summary>
public class AutofacDependencyResolver : IDependencyContainer
{
private ContainerBuilder containerBuilder;
private IContainer container;
private ContainerBuilder? containerBuilder;
private IContainer? container;
/// <summary>
/// The built container. Either set directly, or use the Build method to built the
/// The built container. Either set directly, or use the Build method to build the
/// update this reference.
/// </summary>
public IContainer Container
{
get => container;
get => container ?? throw new ArgumentNullException(nameof(container));
set
{
container = value;
if (value != null)
containerBuilder = null;
containerBuilder = null;
}
}
@ -50,7 +50,7 @@ namespace Tapeti.Autofac
CheckContainerBuilder();
Container = containerBuilder.Build(options);
return container;
return Container;
}
@ -141,6 +141,7 @@ namespace Tapeti.Autofac
}
[MemberNotNull(nameof(containerBuilder))]
private void CheckContainerBuilder()
{
if (containerBuilder == null)

View File

@ -12,6 +12,7 @@
<PackageIcon>Tapeti.SimpleInjector.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -12,6 +12,7 @@
<PackageIcon>Tapeti.SimpleInjector.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -13,6 +13,9 @@ namespace Tapeti.DataAnnotations
/// <inheritdoc />
public ValueTask Handle(IMessageContext context, Func<ValueTask> next)
{
if (context.Message == null)
return next();
var validationContext = new ValidationContext(context.Message);
Validator.ValidateObject(context.Message, validationContext, true);

View File

@ -12,6 +12,7 @@
<PackageIcon>Tapeti.DataAnnotations.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">

View File

@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Linq;
using Tapeti.Config;
// ReSharper disable UnusedMember.Global
@ -45,7 +46,7 @@ namespace Tapeti.Flow.SQL
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
{
return null;
return Enumerable.Empty<object>();
}
}
}

View File

@ -58,7 +58,8 @@ namespace Tapeti.Flow.SQL
var stateJson = flowReader.GetString(2);
var state = JsonConvert.DeserializeObject<T>(stateJson);
result.Add(new FlowRecord<T>(flowID, creationTime, state));
if (state != null)
result.Add(new FlowRecord<T>(flowID, creationTime, state));
}
return result;

View File

@ -38,11 +38,13 @@ namespace Tapeti.Flow.SQL
/// <summary>
/// Extracts alls SqlExceptions from the main and inner or aggregate exceptions
/// </summary>
public static IEnumerable<SqlException> ExtractSqlExceptions(Exception e)
public static IEnumerable<SqlException> ExtractSqlExceptions(Exception exception)
{
while (e != null)
var exceptionHead = exception;
while (exceptionHead != null)
{
switch (e)
switch (exceptionHead)
{
case AggregateException aggregateException:
foreach (var innerException in aggregateException.InnerExceptions)
@ -56,7 +58,8 @@ namespace Tapeti.Flow.SQL
yield return sqlException;
break;
}
e = e.InnerException;
exceptionHead = exceptionHead.InnerException;
}
}
@ -66,12 +69,14 @@ namespace Tapeti.Flow.SQL
/// </summary>
public static IEnumerable<SqlError> UnwrapSqlErrors(SqlException exception)
{
while (exception != null)
var exceptionHead = exception;
while (exceptionHead != null)
{
foreach (SqlError error in exception.Errors)
foreach (SqlError error in exceptionHead.Errors)
yield return error;
exception = exception.InnerException as SqlException;
exceptionHead = exceptionHead.InnerException as SqlException;
}
}

View File

@ -54,7 +54,7 @@ namespace Tapeti.Flow.SQL
returnValue = await callback();
});
return returnValue;
return returnValue!;
}
}
}

View File

@ -12,6 +12,7 @@
<PackageIcon>Tapeti.Flow.SQL.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">

View File

@ -15,7 +15,7 @@ namespace Tapeti.Flow
/// <param name="config"></param>
/// <param name="flowRepository">An optional IFlowRepository implementation to persist flow state. If not provided, flow state will be lost when the application restarts.</param>
/// <returns></returns>
public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository flowRepository = null)
public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository? flowRepository = null)
{
config.Use(new FlowExtension(flowRepository));
return config;

View File

@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
@ -46,6 +47,9 @@ namespace Tapeti.Flow.Default
{
context.Result.SetHandler(async (messageContext, value) =>
{
if (value == null)
throw new InvalidOperationException("Return value should be a Task, not null");
await (Task)value;
await HandleParallelResponse(messageContext);
});
@ -54,6 +58,10 @@ namespace Tapeti.Flow.Default
{
context.Result.SetHandler(async (messageContext, value) =>
{
if (value == null)
// ValueTask is a struct and should never be null
throw new UnreachableException("Return value should be a ValueTask, not null");
await (ValueTask)value;
await HandleParallelResponse(messageContext);
});
@ -82,24 +90,35 @@ namespace Tapeti.Flow.Default
switch (taskType)
{
case TaskType.None:
context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value));
context.Result.SetHandler((messageContext, value) =>
{
if (value == null)
throw new InvalidOperationException("Return value should be an IYieldPoint, not null");
return HandleYieldPoint(messageContext, (IYieldPoint)value);
});
break;
case TaskType.Task:
context.Result.SetHandler(async (messageContext, value) =>
{
if (value == null)
throw new InvalidOperationException("Return value should be a Task<IYieldPoint>, not null");
var yieldPoint = await (Task<IYieldPoint>)value;
if (yieldPoint != null)
await HandleYieldPoint(messageContext, yieldPoint);
await HandleYieldPoint(messageContext, yieldPoint);
});
break;
case TaskType.ValueTask:
context.Result.SetHandler(async (messageContext, value) =>
{
if (value == null)
// ValueTask is a struct and should never be null
throw new UnreachableException("Return value should be a ValueTask<IYieldPoint>, not null");
var yieldPoint = await (ValueTask<IYieldPoint>)value;
if (yieldPoint != null)
await HandleYieldPoint(messageContext, yieldPoint);
await HandleYieldPoint(messageContext, yieldPoint);
});
break;
@ -140,7 +159,7 @@ namespace Tapeti.Flow.Default
}
private static object ParallelRequestParameterFactory(IMessageContext context)
private static object? ParallelRequestParameterFactory(IMessageContext context)
{
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.GetParallelRequest(new FlowHandlerContext(context));

View File

@ -6,25 +6,49 @@ namespace Tapeti.Flow.Default
{
internal class FlowContext : IDisposable
{
public IFlowHandlerContext HandlerContext { get; set; }
public IFlowStateLock FlowStateLock { get; set; }
public FlowState FlowState { get; set; }
private readonly IFlowHandlerContext? handlerContext;
private IFlowStateLock? flowStateLock;
private FlowState? flowState;
public IFlowHandlerContext HandlerContext => handlerContext ?? throw new InvalidOperationException("FlowContext does not have a HandlerContext");
public IFlowStateLock FlowStateLock => flowStateLock ?? throw new InvalidOperationException("FlowContext does not have a FlowStateLock");
public FlowState FlowState => flowState ?? throw new InvalidOperationException("FlowContext does not have a FlowState");
public bool HasFlowStateAndLock => flowState != null && flowStateLock != null;
public Guid ContinuationID { get; set; }
public ContinuationMetadata ContinuationMetadata { get; set; }
public ContinuationMetadata? ContinuationMetadata { get; set; }
private int storeCalled;
private int deleteCalled;
public FlowContext(IFlowHandlerContext handlerContext, FlowState flowState, IFlowStateLock flowStateLock)
{
this.flowState = flowState;
this.flowStateLock = flowStateLock;
this.handlerContext = handlerContext;
}
public FlowContext(IFlowHandlerContext handlerContext)
{
this.handlerContext = handlerContext;
}
public void SetFlowState(FlowState newFlowState, IFlowStateLock newFlowStateLock)
{
flowState = newFlowState;
flowStateLock = newFlowStateLock;
}
public ValueTask Store(bool persistent)
{
storeCalled++;
if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext));
if (FlowState == null) throw new ArgumentNullException(nameof(FlowState));
if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock));
FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(HandlerContext.Controller);
return FlowStateLock.StoreFlowState(FlowState, persistent);
}
@ -32,7 +56,7 @@ namespace Tapeti.Flow.Default
public ValueTask Delete()
{
deleteCalled++;
return FlowStateLock?.DeleteFlowState() ?? default;
return flowStateLock?.DeleteFlowState() ?? default;
}
public bool IsStoredOrDeleted()
@ -43,7 +67,7 @@ namespace Tapeti.Flow.Default
public void EnsureStoreOrDeleteIsCalled()
{
if (!IsStoredOrDeleted())
throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID);
throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + flowStateLock?.FlowID);
Debug.Assert(storeCalled <= 1, "Store called more than once!");
Debug.Assert(deleteCalled <= 1, "Delete called more than once!");
@ -51,7 +75,7 @@ namespace Tapeti.Flow.Default
public void Dispose()
{
FlowStateLock?.Dispose();
flowStateLock?.Dispose();
}
}
}

View File

@ -34,8 +34,12 @@ namespace Tapeti.Flow.Default
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload))
{
if (controllerPayload.Controller == null)
throw new InvalidOperationException("Controller is not available (method is static?)");
var flowContext = flowPayload.FlowContext;
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, controllerPayload.Controller);
if (!string.IsNullOrEmpty(flowContext.FlowState.Data))
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, controllerPayload.Controller);
// Remove Continuation now because the IYieldPoint result handler will store the new state
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
@ -65,11 +69,11 @@ namespace Tapeti.Flow.Default
var flowContext = flowPayload.FlowContext;
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
if (flowContext.ContinuationMetadata == null || flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
// Do not call when the controller method was filtered, if the same message has two methods
return;
if (flowContext.FlowStateLock != null)
if (flowContext.HasFlowStateAndLock)
{
if (!flowContext.IsStoredOrDeleted())
// The exception strategy can set the consume result to Success. Instead, check if the yield point
@ -82,7 +86,7 @@ namespace Tapeti.Flow.Default
private static async ValueTask<FlowContext> EnrichWithFlowContext(IMessageContext context)
private static async ValueTask<FlowContext?> EnrichWithFlowContext(IMessageContext context)
{
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload))
return flowPayload.FlowContext;
@ -106,13 +110,8 @@ namespace Tapeti.Flow.Default
if (flowState == null)
return null;
var flowContext = new FlowContext
var flowContext = new FlowContext(new FlowHandlerContext(context), flowState, flowStateLock)
{
HandlerContext = new FlowHandlerContext(context),
FlowStateLock = flowStateLock,
FlowState = flowState,
ContinuationID = continuationID,
ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out var continuation) ? continuation : null
};

View File

@ -10,8 +10,11 @@ namespace Tapeti.Flow.Default
{
/// <summary>
/// </summary>
public FlowHandlerContext()
public FlowHandlerContext(ITapetiConfig config, object? controller, MethodInfo method)
{
Config = config;
Controller = controller;
Method = method;
}
@ -19,11 +22,7 @@ namespace Tapeti.Flow.Default
/// </summary>
public FlowHandlerContext(IMessageContext source)
{
if (source == null)
return;
if (!source.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return;
var controllerPayload = source.Get<ControllerMessageContextPayload>();
Config = source.Config;
Controller = controllerPayload.Controller;
@ -38,15 +37,15 @@ namespace Tapeti.Flow.Default
}
/// <inheritdoc />
public ITapetiConfig Config { get; set; }
public ITapetiConfig Config { get; }
/// <inheritdoc />
public object Controller { get; set; }
public object? Controller { get; }
/// <inheritdoc />
public MethodInfo Method { get; set; }
public MethodInfo Method { get; }
/// <inheritdoc />
public IMessageContext MessageContext { get; set; }
public IMessageContext? MessageContext { get; }
}
}

View File

@ -32,21 +32,21 @@ namespace Tapeti.Flow.Default
/// <inheritdoc />
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler)
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
/// <inheritdoc />
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask<IYieldPoint>> responseHandler)
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
/// <inheritdoc />
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler)
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
@ -59,7 +59,7 @@ namespace Tapeti.Flow.Default
}
/// <inheritdoc />
public IYieldPoint EndWithResponse<TResponse>(TResponse message)
public IYieldPoint EndWithResponse<TResponse>(TResponse message) where TResponse : class
{
return new DelegateYieldPoint(context => SendResponse(context, message));
}
@ -72,7 +72,7 @@ namespace Tapeti.Flow.Default
internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true)
string? convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true)
{
if (context.FlowState == null)
{
@ -105,9 +105,9 @@ namespace Tapeti.Flow.Default
private async Task SendResponse(FlowContext context, object message)
{
var reply = context.FlowState == null
? GetReply(context.HandlerContext)
: context.FlowState.Metadata.Reply;
var reply = context.HasFlowStateAndLock
? context.FlowState.Metadata.Reply
: GetReply(context.HandlerContext);
if (reply == null)
throw new YieldPointException("No response is required");
@ -134,7 +134,7 @@ namespace Tapeti.Flow.Default
{
await context.Delete();
if (context.FlowState?.Metadata.Reply != null)
if (context.HasFlowStateAndLock && context.FlowState.Metadata.Reply != null)
throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}");
}
@ -159,16 +159,15 @@ namespace Tapeti.Flow.Default
if (binding.QueueName == null)
throw new ArgumentException("responseHandler is not yet subscribed to a queue, TapetiConnection.Subscribe must be called before starting a flow", nameof(responseHandler));
return new ResponseHandlerInfo
{
MethodName = MethodSerializer.Serialize(responseHandler.Method),
ReplyToQueue = binding.QueueName,
IsDurableQueue = binding.QueueType == QueueType.Durable
};
return new ResponseHandlerInfo(
MethodSerializer.Serialize(responseHandler.Method),
binding.QueueName,
binding.QueueType == QueueType.Durable
);
}
private static ReplyMetadata GetReply(IFlowHandlerContext context)
private static ReplyMetadata? GetReply(IFlowHandlerContext context)
{
var requestAttribute = context.MessageContext?.Message?.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response == null)
@ -176,7 +175,7 @@ namespace Tapeti.Flow.Default
return new ReplyMetadata
{
CorrelationId = context.MessageContext.Properties.CorrelationId,
CorrelationId = context.MessageContext!.Properties.CorrelationId,
ReplyTo = context.MessageContext.Properties.ReplyTo,
ResponseTypeName = requestAttribute.Response.FullName,
Mandatory = context.MessageContext.Properties.Persistent.GetValueOrDefault(true)
@ -188,18 +187,17 @@ namespace Tapeti.Flow.Default
var flowStore = flowContext.HandlerContext.Config.DependencyResolver.Resolve<IFlowStore>();
var flowID = Guid.NewGuid();
flowContext.FlowStateLock = await flowStore.LockFlowState(flowID);
var flowStateLock = await flowStore.LockFlowState(flowID);
if (flowContext.FlowStateLock == null)
if (flowStateLock == null)
throw new InvalidOperationException("Unable to lock a new flow");
flowContext.FlowState = new FlowState
var flowState = new FlowState
{
Metadata = new FlowMetadata
{
Reply = GetReply(flowContext.HandlerContext)
}
Metadata = new FlowMetadata(GetReply(flowContext.HandlerContext))
};
flowContext.SetFlowState(flowState, flowStateLock);
}
@ -207,20 +205,16 @@ namespace Tapeti.Flow.Default
public async ValueTask Execute(IFlowHandlerContext context, IYieldPoint yieldPoint)
{
if (yieldPoint is not DelegateYieldPoint executableYieldPoint)
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Method.Name}");
throw new YieldPointException($"Yield point is required in controller {context.Controller?.GetType().Name} for method {context.Method.Name}");
FlowContext flowContext = null;
FlowContext? flowContext = null;
var disposeFlowContext = false;
try
{
var messageContext = context.MessageContext;
if (messageContext == null || !messageContext.TryGet<FlowMessageContextPayload>(out var flowPayload))
if (context.MessageContext == null || !context.MessageContext.TryGet<FlowMessageContextPayload>(out var flowPayload))
{
flowContext = new FlowContext
{
HandlerContext = context
};
flowContext = new FlowContext(context);
// If we ended up here it is because of a Start. No point in storing the new FlowContext
// in the messageContext as the yield point is the last to execute.
@ -236,7 +230,7 @@ namespace Tapeti.Flow.Default
catch (YieldPointException e)
{
// Useful for debugging
e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName;
e.Data["Tapeti.Controller.Name"] = context.Controller?.GetType().FullName;
e.Data["Tapeti.Controller.Method"] = context.Method.Name;
throw;
}
@ -246,15 +240,15 @@ namespace Tapeti.Flow.Default
finally
{
if (disposeFlowContext)
flowContext.Dispose();
flowContext?.Dispose();
}
}
/// <inheritdoc />
public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context)
public IFlowParallelRequest? GetParallelRequest(IFlowHandlerContext context)
{
return context.MessageContext.TryGet<FlowMessageContextPayload>(out var flowPayload)
return context.MessageContext != null && context.MessageContext.TryGet<FlowMessageContextPayload>(out var flowPayload)
? new ParallelRequest(config, this, flowPayload.FlowContext)
: null;
}
@ -263,27 +257,49 @@ namespace Tapeti.Flow.Default
/// <inheritdoc />
public ValueTask Converge(IFlowHandlerContext context)
{
return Execute(context, new DelegateYieldPoint(flowContext =>
Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync)));
return Execute(context, new DelegateYieldPoint(async flowContext =>
{
if (flowContext.ContinuationMetadata == null)
throw new InvalidOperationException("Missing ContinuationMetadata in FlowContext");
if (flowContext.ContinuationMetadata.ConvergeMethodName == null)
throw new InvalidOperationException("Missing ConvergeMethodName in FlowContext ContinuationMetadata");
await Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync);
}));
}
internal async Task Converge(FlowContext flowContext, string convergeMethodName, bool convergeMethodSync)
{
IYieldPoint yieldPoint;
IYieldPoint? yieldPoint;
if (flowContext.HandlerContext == null)
throw new InvalidOperationException($"Missing HandleContext in FlowContext for converge method {convergeMethodName}");
if (flowContext.HandlerContext.MessageContext == null)
throw new InvalidOperationException($"Missing MessageContext in FlowContext for converge method {convergeMethodName}");
if (!flowContext.HandlerContext.MessageContext.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
throw new ArgumentException("Context does not contain a controller payload", nameof(flowContext));
if (controllerPayload.Controller == null)
throw new InvalidOperationException($"Controller is not available for converge method {convergeMethodName} (method is static?)");
var method = controllerPayload.Controller.GetType().GetMethod(convergeMethodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {convergeMethodName}");
if (convergeMethodSync)
yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] { });
yieldPoint = (IYieldPoint?)method.Invoke(controllerPayload.Controller, new object[] { });
else
yieldPoint = await(Task<IYieldPoint>)method.Invoke(controllerPayload.Controller, new object[] { });
{
var yieldPointTask = method.Invoke(controllerPayload.Controller, new object[] { });
if (yieldPointTask == null)
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}");
yieldPoint = await (Task<IYieldPoint>)yieldPointTask;
}
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}");
@ -297,8 +313,15 @@ namespace Tapeti.Flow.Default
{
private class RequestInfo
{
public object Message { get; init; }
public ResponseHandlerInfo ResponseHandlerInfo { get; init; }
public object Message { get; }
public ResponseHandlerInfo ResponseHandlerInfo { get; }
public RequestInfo(object message, ResponseHandlerInfo responseHandlerInfo)
{
Message = message;
ResponseHandlerInfo = responseHandlerInfo;
}
}
@ -314,32 +337,32 @@ namespace Tapeti.Flow.Default
}
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler)
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask> responseHandler)
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler)
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, ValueTask> responseHandler)
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, ValueTask> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler)
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse, IFlowParallelRequest> responseHandler)
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse, IFlowParallelRequest> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
@ -347,12 +370,7 @@ namespace Tapeti.Flow.Default
private IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler)
{
requests.Add(new RequestInfo
{
Message = message,
ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler)
});
requests.Add(new RequestInfo(message, GetResponseHandlerInfo(config, message, responseHandler)));
return this;
}
@ -381,12 +399,12 @@ namespace Tapeti.Flow.Default
};
}
if (convergeMethod?.Method == null)
if (convergeMethod.Method == null)
throw new ArgumentNullException(nameof(convergeMethod));
return new DelegateYieldPoint(async context =>
{
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType())
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller?.GetType())
throw new YieldPointException("Converge method must be in the same controller class");
await Task.WhenAll(requests.Select(requestInfo =>
@ -419,19 +437,19 @@ namespace Tapeti.Flow.Default
}
public Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler)
public Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler)
public Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public Task AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler)
public Task AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
@ -441,6 +459,9 @@ namespace Tapeti.Flow.Default
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
if (flowContext.ContinuationMetadata == null)
throw new InvalidOperationException("No ContinuationMetadata in FlowContext");
return flowProvider.SendRequest(
flowContext,
message,
@ -454,9 +475,17 @@ namespace Tapeti.Flow.Default
internal class ResponseHandlerInfo
{
public string MethodName { get; set; }
public string ReplyToQueue { get; set; }
public bool IsDurableQueue { get; set; }
public string MethodName { get; }
public string ReplyToQueue { get; }
public bool IsDurableQueue { get; }
public ResponseHandlerInfo(string methodName, string replyToQueue, bool isDurableQueue)
{
MethodName = methodName;
ReplyToQueue = replyToQueue;
IsDurableQueue = isDurableQueue;
}
}
}
}

View File

@ -25,39 +25,38 @@ namespace Tapeti.Flow.Default
/// <inheritdoc />
public async Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class
{
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] { });
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), Array.Empty<object?>());
}
/// <inheritdoc />
public async Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class
{
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, Array.Empty<object?>());
}
/// <inheritdoc />
public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class
{
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] {parameter});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object?[] {parameter});
}
/// <inheritdoc />
public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class
{
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {parameter});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object?[] {parameter});
}
private async Task CallControllerMethod<TController>(MethodInfo method, Func<object, Task<IYieldPoint>> getYieldPointResult, object[] parameters) where TController : class
private async Task CallControllerMethod<TController>(MethodInfo method, Func<object, Task<IYieldPoint>> getYieldPointResult, object?[] parameters) where TController : class
{
var controller = config.DependencyResolver.Resolve<TController>();
var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters));
var result = method.Invoke(controller, parameters);
if (result == null)
throw new InvalidOperationException($"Method {method.Name} must return an IYieldPoint or Task<IYieldPoint>, got null");
var context = new FlowHandlerContext
{
Config = config,
Controller = controller,
Method = method
};
var yieldPoint = await getYieldPointResult(result);
var context = new FlowHandlerContext(config, controller, method);
var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(context, yieldPoint);

View File

@ -9,8 +9,8 @@ namespace Tapeti.Flow.Default
/// </summary>
public class FlowState
{
private FlowMetadata metadata;
private Dictionary<Guid, ContinuationMetadata> continuations;
private FlowMetadata? metadata;
private Dictionary<Guid, ContinuationMetadata>? continuations;
/// <summary>
@ -18,7 +18,7 @@ namespace Tapeti.Flow.Default
/// </summary>
public FlowMetadata Metadata
{
get => metadata ??= new FlowMetadata();
get => metadata ??= new FlowMetadata(null);
set => metadata = value;
}
@ -26,7 +26,7 @@ namespace Tapeti.Flow.Default
/// <summary>
/// Contains the serialized state which is restored when a flow continues.
/// </summary>
public string Data { get; set; }
public string? Data { get; set; }
/// <summary>
@ -45,7 +45,7 @@ namespace Tapeti.Flow.Default
public FlowState Clone()
{
return new FlowState {
metadata = metadata.Clone(),
metadata = metadata?.Clone(),
Data = Data,
continuations = continuations?.ToDictionary(kv => kv.Key, kv => kv.Value.Clone())
};
@ -61,18 +61,22 @@ namespace Tapeti.Flow.Default
/// <summary>
/// Contains information about the expected response for this flow.
/// </summary>
public ReplyMetadata Reply { get; set; }
public ReplyMetadata? Reply { get; }
/// <inheritdoc cref="FlowMetadata"/>
public FlowMetadata(ReplyMetadata? reply)
{
Reply = reply;
}
/// <summary>
/// Creates a deep clone of this FlowMetadata.
/// </summary>
public FlowMetadata Clone()
{
return new FlowMetadata
{
Reply = Reply?.Clone()
};
return new FlowMetadata(Reply);
}
}
@ -85,17 +89,17 @@ namespace Tapeti.Flow.Default
/// <summary>
/// The queue to which the response should be sent.
/// </summary>
public string ReplyTo { get; set; }
public string? ReplyTo { get; set; }
/// <summary>
/// The correlation ID included in the original request.
/// </summary>
public string CorrelationId { get; set; }
public string? CorrelationId { get; set; }
/// <summary>
/// The expected response message class.
/// </summary>
public string ResponseTypeName { get; set; }
public string? ResponseTypeName { get; set; }
/// <summary>
/// Indicates whether the response should be sent a mandatory.
@ -128,12 +132,12 @@ namespace Tapeti.Flow.Default
/// <summary>
/// The name of the method which will handle the response.
/// </summary>
public string MethodName { get; set; }
public string? MethodName { get; set; }
/// <summary>
/// The name of the method which is called when all responses have been processed.
/// </summary>
public string ConvergeMethodName { get; set; }
public string? ConvergeMethodName { get; set; }
/// <summary>
/// Determines if the converge method is synchronous or asynchronous.

View File

@ -16,11 +16,11 @@ namespace Tapeti.Flow.Default
{
private class CachedFlowState
{
public readonly FlowState FlowState;
public readonly FlowState? FlowState;
public readonly DateTime CreationTime;
public readonly bool IsPersistent;
public CachedFlowState(FlowState flowState, DateTime creationTime, bool isPersistent)
public CachedFlowState(FlowState? flowState, DateTime creationTime, bool isPersistent)
{
FlowState = flowState;
CreationTime = creationTime;
@ -31,7 +31,7 @@ namespace Tapeti.Flow.Default
private readonly ConcurrentDictionary<Guid, CachedFlowState> flowStates = new();
private readonly ConcurrentDictionary<Guid, Guid> continuationLookup = new();
private readonly LockCollection<Guid> locks = new(EqualityComparer<Guid>.Default);
private HashSet<string> validatedMethods;
private HashSet<string>? validatedMethods;
private readonly IFlowRepository repository;
private readonly ITapetiConfig config;
@ -85,10 +85,13 @@ namespace Tapeti.Flow.Default
private void ValidateContinuation(Guid flowId, Guid continuationId, ContinuationMetadata metadata)
{
if (string.IsNullOrEmpty(metadata.MethodName))
return;
// We could check all the things that are required for a continuation or converge method, but this should suffice
// for the common scenario where you change code without realizing that it's signature has been persisted
// ReSharper disable once InvertIf
if (validatedMethods.Add(metadata.MethodName))
if (validatedMethods!.Add(metadata.MethodName))
{
var methodInfo = MethodSerializer.Deserialize(metadata.MethodName);
if (methodInfo == null)
@ -150,8 +153,8 @@ namespace Tapeti.Flow.Default
private class FlowStateLock : IFlowStateLock
{
private readonly FlowStore owner;
private volatile IDisposable flowLock;
private CachedFlowState cachedFlowState;
private volatile IDisposable? flowLock;
private CachedFlowState? cachedFlowState;
public Guid FlowID { get; }
@ -172,12 +175,12 @@ namespace Tapeti.Flow.Default
l?.Dispose();
}
public ValueTask<FlowState> GetFlowState()
public ValueTask<FlowState?> GetFlowState()
{
if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock");
return new ValueTask<FlowState>(cachedFlowState?.FlowState?.Clone());
return new ValueTask<FlowState?>(cachedFlowState?.FlowState?.Clone());
}
public async ValueTask StoreFlowState(FlowState newFlowState, bool persistent)
@ -189,13 +192,13 @@ namespace Tapeti.Flow.Default
newFlowState = newFlowState.Clone();
// Update the lookup dictionary for the ContinuationIDs
if (cachedFlowState != null)
if (cachedFlowState?.FlowState != null)
{
foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k)))
owner.continuationLookup.TryRemove(removedContinuation, out _);
}
foreach (var addedContinuation in newFlowState.Continuations.Where(c => cachedFlowState == null || !cachedFlowState.FlowState.Continuations.ContainsKey(c.Key)))
foreach (var addedContinuation in newFlowState.Continuations.Where(c => cachedFlowState?.FlowState == null || !cachedFlowState.FlowState.Continuations.ContainsKey(c.Key)))
{
owner.continuationLookup.TryAdd(addedContinuation.Key, FlowID);
}
@ -203,7 +206,7 @@ namespace Tapeti.Flow.Default
var isNew = cachedFlowState == null;
var wasPersistent = cachedFlowState?.IsPersistent ?? false;
cachedFlowState = new CachedFlowState(newFlowState, isNew ? DateTime.UtcNow : cachedFlowState.CreationTime, persistent);
cachedFlowState = new CachedFlowState(newFlowState, isNew ? DateTime.UtcNow : cachedFlowState!.CreationTime, persistent);
owner.flowStates[FlowID] = cachedFlowState;
if (persistent)
@ -231,7 +234,7 @@ namespace Tapeti.Flow.Default
if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock");
if (cachedFlowState != null)
if (cachedFlowState?.FlowState != null)
{
foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys)
owner.continuationLookup.TryRemove(removedContinuation, out _);
@ -239,7 +242,7 @@ namespace Tapeti.Flow.Default
owner.flowStates.TryRemove(FlowID, out var removedFlowState);
cachedFlowState = null;
if (removedFlowState.IsPersistent)
if (removedFlowState is { IsPersistent: true })
await owner.repository.DeleteState(FlowID);
}
}

View File

@ -9,11 +9,11 @@ namespace Tapeti.Flow
/// </summary>
public class FlowExtension : ITapetiExtension
{
private readonly IFlowRepository flowRepository;
private readonly IFlowRepository? flowRepository;
/// <summary>
/// </summary>
public FlowExtension(IFlowRepository flowRepository)
public FlowExtension(IFlowRepository? flowRepository)
{
this.flowRepository = flowRepository;
}

View File

@ -7,7 +7,7 @@ namespace Tapeti.Flow.FlowHelpers
/// <summary>
/// Implementation of an asynchronous locking mechanism.
/// </summary>
public class LockCollection<T>
public class LockCollection<T> where T : notnull
{
private readonly Dictionary<T, LockItem> locks;
@ -57,7 +57,7 @@ namespace Tapeti.Flow.FlowHelpers
private class LockItem : IDisposable
{
internal volatile LockItem Next;
internal volatile LockItem? Next;
private readonly Dictionary<T, LockItem> locks;
private readonly TaskCompletionSource<IDisposable> tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);

View File

@ -25,7 +25,7 @@ namespace Tapeti.Flow.FlowHelpers
/// Deserializes the serialized method representation back into it's MethodInfo, or null if not found.
/// </summary>
/// <param name="serializedMethod"></param>
public static MethodInfo Deserialize(string serializedMethod)
public static MethodInfo? Deserialize(string serializedMethod)
{
var match = DeserializeRegex.Match(serializedMethod);
if (!match.Success)

View File

@ -16,9 +16,8 @@ namespace Tapeti.Flow
/// parallel flow is done and the convergeMethod will be called.
/// Temporarily disables storing the flow state.
/// </summary>
public bool FlowIsConverging => FlowContext != null &&
FlowContext.FlowState.Continuations.Count == 0 &&
FlowContext.ContinuationMetadata.ConvergeMethodName != null;
public bool FlowIsConverging => FlowContext.FlowState.Continuations.Count == 0 &&
FlowContext.ContinuationMetadata?.ConvergeMethodName != null;
public FlowMessageContextPayload(FlowContext flowContext)
@ -29,7 +28,7 @@ namespace Tapeti.Flow
public void Dispose()
{
FlowContext?.Dispose();
FlowContext.Dispose();
}
}
}

View File

@ -18,7 +18,7 @@ namespace Tapeti.Flow
/// <summary>
/// An instance of the controller which starts or continues the flow.
/// </summary>
object Controller { get; }
object? Controller { get; }
/// <summary>
@ -31,6 +31,6 @@ namespace Tapeti.Flow
/// Access to the message context if this is a continuated flow.
/// Will be null when in a starting flow.
/// </summary>
IMessageContext MessageContext { get; }
IMessageContext? MessageContext { get; }
}
}

View File

@ -20,7 +20,7 @@ namespace Tapeti.Flow
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler);
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
@ -32,7 +32,7 @@ namespace Tapeti.Flow
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask<IYieldPoint>> responseHandler);
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
@ -51,7 +51,7 @@ namespace Tapeti.Flow
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler);
IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
@ -67,7 +67,7 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="message"></param>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint EndWithResponse<TResponse>(TResponse message);
IYieldPoint EndWithResponse<TResponse>(TResponse message) where TResponse : class;
/// <summary>
@ -126,7 +126,7 @@ namespace Tapeti.Flow
/// <summary>
/// Returns the parallel request for the given message context.
/// </summary>
IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context);
IFlowParallelRequest? GetParallelRequest(IFlowHandlerContext context);
/// <summary>
@ -174,7 +174,7 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
@ -184,21 +184,21 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask> responseHandler);
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask> responseHandler) where TRequest : class where TResponse : class;
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,Task})"/>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler);
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler) where TRequest : class where TResponse : class;
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,ValueTask})"/>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, ValueTask> responseHandler);
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, ValueTask> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
@ -208,14 +208,14 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler) where TRequest : class where TResponse : class;
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequestSync{TRequest,TResponse}(TRequest,Action{TResponse})"/>
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse, IFlowParallelRequest> responseHandler);
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse, IFlowParallelRequest> responseHandler) where TRequest : class where TResponse : class;
/// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are
/// async, so you should always await them.
@ -260,14 +260,14 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler) where TRequest : class where TResponse : class;
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,Task})"/>
Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler);
Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
@ -277,7 +277,7 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
Task AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
Task AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler) where TRequest : class where TResponse : class;
}

View File

@ -55,7 +55,7 @@ namespace Tapeti.Flow
/// <summary>
/// Acquires a copy of the flow state.
/// </summary>
ValueTask<FlowState> GetFlowState();
ValueTask<FlowState?> GetFlowState();
/// <summary>
/// Stores the new flow state.

View File

@ -170,11 +170,11 @@ namespace JetBrains.Annotations
{
public PublicAPIAttribute() { }
public PublicAPIAttribute([NotNull] string comment)
public PublicAPIAttribute(string comment)
{
Comment = comment;
}
[CanBeNull] public string Comment { get; }
public string? Comment { get; }
}
}

View File

@ -12,6 +12,7 @@
<PackageIcon>Tapeti.Flow.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">

View File

@ -12,6 +12,7 @@
<PackageIcon>Tapeti.SimpleInjector.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -43,7 +43,7 @@ namespace Tapeti.Serilog.Middleware
}
private static object DiagnosticContextFactory(IMessageContext context)
private static object? DiagnosticContextFactory(IMessageContext context)
{
return context.TryGet<DiagnosticContextPayload>(out var diagnosticContextPayload)
? diagnosticContextPayload.DiagnosticContext

View File

@ -12,6 +12,7 @@
<PackageIcon>Tapeti.Serilog.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">

View File

@ -87,7 +87,7 @@ namespace Tapeti.Serilog
public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
var message = new StringBuilder("Tapeti: exception in message handler");
var messageParams = new List<object>();
var messageParams = new List<object?>();
var contextLogger = seriLogger
.ForContext("consumeResult", consumeResult)
@ -130,7 +130,7 @@ namespace Tapeti.Serilog
}
/// <inheritdoc />
public void QueueExistsWarning(string queueName, IRabbitMQArguments existingArguments, IRabbitMQArguments arguments)
public void QueueExistsWarning(string queueName, IRabbitMQArguments? existingArguments, IRabbitMQArguments? arguments)
{
seriLogger.Warning("Tapeti: durable queue {queueName} exists with incompatible x-arguments ({existingArguments} vs. {arguments}) and will not be redeclared, queue will be consumed as-is",
queueName,

View File

@ -10,12 +10,12 @@ namespace Tapeti.SimpleInjector
public class SimpleInjectorDependencyResolver : IDependencyContainer
{
private readonly Container container;
private readonly Lifestyle defaultsLifestyle;
private readonly Lifestyle controllersLifestyle;
private readonly Lifestyle? defaultsLifestyle;
private readonly Lifestyle? controllersLifestyle;
/// <summary>
/// </summary>
public SimpleInjectorDependencyResolver(Container container, Lifestyle defaultsLifestyle = null, Lifestyle controllersLifestyle = null)
public SimpleInjectorDependencyResolver(Container container, Lifestyle? defaultsLifestyle = null, Lifestyle? controllersLifestyle = null)
{
this.container = container;
this.defaultsLifestyle = defaultsLifestyle;

View File

@ -12,6 +12,7 @@
<PackageIcon>Tapeti.SimpleInjector.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">

View File

@ -27,7 +27,7 @@ namespace Tapeti.Tests.Client
public ushort RabbitMQManagementPort { get; private set; }
private TestcontainerMessageBroker testcontainers;
private TestcontainerMessageBroker? testcontainers;
private const int DefaultRabbitMQPort = 5672;
private const int DefaultRabbitMQManagementPort = 15672;

View File

@ -19,7 +19,7 @@ namespace Tapeti.Tests.Client
private readonly RabbitMQFixture fixture;
private readonly MockDependencyResolver dependencyResolver = new();
private TapetiClient client;
private TapetiClient client = null!;
public TapetiClientTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper)
@ -41,7 +41,6 @@ namespace Tapeti.Tests.Client
public async Task DisposeAsync()
{
await client.Close();
client = null;
}

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
@ -13,6 +14,16 @@ using Xunit;
namespace Tapeti.Tests.Config
{
internal static class UTF8StringExtensions
{
public static string AsUTF8String(this object value)
{
value.Should().BeOfType<byte[]>();
return Encoding.UTF8.GetString((byte[])value);
}
}
public class QueueArgumentsTest : BaseControllerTest
{
private static readonly MockRepository MoqRepository = new(MockBehavior.Strict);
@ -88,12 +99,12 @@ namespace Tapeti.Tests.Config
declaredQueues.Should().HaveCount(1);
var arguments = declaredQueues["queue-1"];
arguments.Should().ContainKey("x-custom").WhoseValue.Should().Be("custom value");
arguments.Should().ContainKey("x-custom").WhoseValue.AsUTF8String().Should().Be("custom value");
arguments.Should().ContainKey("x-another").WhoseValue.Should().Be(true);
arguments.Should().ContainKey("x-max-length").WhoseValue.Should().Be(100);
arguments.Should().ContainKey("x-max-length-bytes").WhoseValue.Should().Be(100000);
arguments.Should().ContainKey("x-message-ttl").WhoseValue.Should().Be(4269);
arguments.Should().ContainKey("x-overflow").WhoseValue.Should().Be("reject-publish");
arguments.Should().ContainKey("x-overflow").WhoseValue.AsUTF8String().Should().Be("reject-publish");
}
@ -133,7 +144,7 @@ namespace Tapeti.Tests.Config
}
}
// ReSharper disable all
#pragma warning disable

View File

@ -8,7 +8,7 @@ namespace Tapeti.Tests.Mock
private readonly Dictionary<Type, object> container = new();
public void Set<TInterface>(TInterface instance)
public void Set<TInterface>(TInterface instance) where TInterface : class
{
container.Add(typeof(TInterface), instance);
}

View File

@ -49,8 +49,17 @@ namespace Tapeti.Tests.Mock
: $"Declaring {(durable ? "durable" : "dynamic")} queue {queueName}");
}
public void QueueExistsWarning(string queueName, IRabbitMQArguments existingArguments, IRabbitMQArguments arguments)
public void QueueExistsWarning(string queueName, IRabbitMQArguments? existingArguments, IRabbitMQArguments? arguments)
{
testOutputHelper.WriteLine($"[Tapeti] Durable queue {queueName} exists with incompatible x-arguments ({GetArgumentsText(existingArguments)} vs. {GetArgumentsText(arguments)}) and will not be redeclared, queue will be consumed as-is");
}
private static string GetArgumentsText(IRabbitMQArguments? arguments)
{
if (arguments == null || arguments.Count == 0)
return "empty";
var argumentsText = new StringBuilder();
foreach (var pair in arguments)
{
@ -60,9 +69,10 @@ namespace Tapeti.Tests.Mock
argumentsText.Append($"{pair.Key} = {pair.Value}");
}
testOutputHelper.WriteLine($"Durable queue {queueName} exists with incompatible x-arguments ({argumentsText}) and will not be redeclared, queue will be consumed as-is");
return argumentsText.ToString();
}
public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
{
testOutputHelper.WriteLine($"Binding {queueName} to exchange {exchange} with routing key {routingKey}");

View File

@ -2,6 +2,7 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">

View File

@ -15,6 +15,6 @@ namespace Tapeti.Transient
/// <param name="request"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request);
Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request) where TRequest : class where TResponse : class;
}
}

View File

@ -12,6 +12,7 @@
<PackageIcon>Tapeti.Flow.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Tapeti.Config;
namespace Tapeti.Transient
@ -31,7 +32,7 @@ namespace Tapeti.Transient
/// <inheritdoc />
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
{
return null;
return Enumerable.Empty<object>();
}

View File

@ -14,10 +14,10 @@ namespace Tapeti.Transient
private readonly string dynamicQueuePrefix;
/// <inheritdoc />
public string QueueName { get; private set; }
public string? QueueName { get; private set; }
/// <inheritdoc />
public QueueType QueueType => QueueType.Dynamic;
public QueueType? QueueType => Config.QueueType.Dynamic;
/// <summary>

View File

@ -21,7 +21,7 @@ namespace Tapeti.Transient
/// <inheritdoc />
public async Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request)
public async Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request) where TRequest : class where TResponse : class
{
return (TResponse)await router.RequestResponse(publisher, request);
}

View File

@ -18,7 +18,7 @@ namespace Tapeti.Transient
/// <summary>
/// The generated name of the dynamic queue to which responses should be sent.
/// </summary>
public string TransientResponseQueueName { get; set; }
public string? TransientResponseQueueName { get; set; }
/// <summary>
@ -41,8 +41,13 @@ namespace Tapeti.Transient
if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID))
return;
if (map.TryRemove(continuationID, out var tcs))
tcs.TrySetResult(context.Message);
if (!map.TryRemove(continuationID, out var tcs))
return;
if (context.Message == null)
throw new InvalidOperationException();
tcs.TrySetResult(context.Message);
}
@ -72,7 +77,7 @@ namespace Tapeti.Transient
{
// Simple cleanup of the task and map dictionary.
if (map.TryRemove(correlation, out tcs))
tcs.TrySetResult(null);
tcs.TrySetResult(null!);
throw;
}
@ -84,8 +89,10 @@ namespace Tapeti.Transient
}
private void TimeoutResponse(object tcs)
private void TimeoutResponse(object? tcs)
{
ArgumentNullException.ThrowIfNull(tcs, nameof(tcs));
((TaskCompletionSource<object>)tcs).TrySetException(new TimeoutException("Transient RequestResponse timed out at (ms) " + defaultTimeoutMs));
}
}

View File

@ -8,7 +8,7 @@
/// <summary>
/// An instance of the controller referenced by the binding. Note: can be null during Cleanup or when bound to static methods.
/// </summary>
public object Controller { get; }
public object? Controller { get; }
/// <remarks>
@ -22,7 +22,7 @@
/// </summary>
/// <param name="controller">An instance of the controller referenced by the binding</param>
/// <param name="binding">The binding which is currently processing the message</param>
public ControllerMessageContextPayload(object controller, IControllerMethodBinding binding)
public ControllerMessageContextPayload(object? controller, IControllerMethodBinding binding)
{
Controller = controller;
Binding = binding;

View File

@ -29,13 +29,13 @@ namespace Tapeti.Config
/// <summary>
/// The name of the queue the binding is consuming. May change after a reconnect for dynamic queues.
/// </summary>
string QueueName { get; }
string? QueueName { get; }
/// <summary>
/// Determines the type of queue the binding registers
/// </summary>
QueueType QueueType { get; }
QueueType? QueueType { get; }
/// <summary>
@ -82,7 +82,7 @@ namespace Tapeti.Config
/// <param name="messageClass">The message class to be bound to the queue</param>
/// <param name="queueName">The name of the durable queue</param>
/// <param name="arguments">Optional arguments</param>
ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments arguments);
ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments);
/// <summary>
/// Binds the messageClass to a dynamic auto-delete queue.
@ -95,7 +95,7 @@ namespace Tapeti.Config
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <param name="arguments">Optional arguments</param>
/// <returns>The generated name of the dynamic queue</returns>
ValueTask<string> BindDynamic(Type messageClass, string queuePrefix, IRabbitMQArguments arguments);
ValueTask<string> BindDynamic(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments);
/// <summary>
/// Declares a durable queue but does not add a binding for a messageClass' routing key.
@ -103,7 +103,7 @@ namespace Tapeti.Config
/// </summary>
/// <param name="queueName">The name of the durable queue</param>
/// <param name="arguments">Optional arguments</param>
ValueTask BindDurableDirect(string queueName, IRabbitMQArguments arguments);
ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments);
/// <summary>
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
@ -113,7 +113,7 @@ namespace Tapeti.Config
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <param name="arguments">Optional arguments</param>
/// <returns>The generated name of the dynamic queue</returns>
ValueTask<string> BindDynamicDirect(Type messageClass, string queuePrefix, IRabbitMQArguments arguments);
ValueTask<string> BindDynamicDirect(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments);
/// <summary>
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
@ -122,7 +122,7 @@ namespace Tapeti.Config
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <param name="arguments">Optional arguments</param>
/// <returns>The generated name of the dynamic queue</returns>
ValueTask<string> BindDynamicDirect(string queuePrefix, IRabbitMQArguments arguments);
ValueTask<string> BindDynamicDirect(string? queuePrefix, IRabbitMQArguments? arguments);
/// <summary>
/// Marks the specified durable queue as having an obsolete binding. If after all bindings have subscribed, the queue only contains obsolete

View File

@ -11,7 +11,7 @@ namespace Tapeti.Config
/// Injects a value for a controller method parameter.
/// </summary>
/// <param name="context"></param>
public delegate object ValueFactory(IMessageContext context);
public delegate object? ValueFactory(IMessageContext context);
/// <summary>
@ -19,7 +19,7 @@ namespace Tapeti.Config
/// </summary>
/// <param name="context"></param>
/// <param name="value"></param>
public delegate ValueTask ResultHandler(IMessageContext context, object value);
public delegate ValueTask ResultHandler(IMessageContext context, object? value);
/// <summary>
@ -48,7 +48,7 @@ namespace Tapeti.Config
/// The message class for this method. Can be null if not yet set by the default MessageBinding or other middleware.
/// If required, call next first to ensure it is available.
/// </summary>
Type MessageClass { get; }
Type? MessageClass { get; }
/// <summary>
/// Determines if SetMessageClass has already been called.

View File

@ -7,12 +7,12 @@ namespace Tapeti.Config
/// </summary>
public interface IControllerBindingMiddleware : IControllerMiddlewareBase
{
/// <summary>
/// Called before a Controller method is registered. Can change the way parameters and return values are handled,
/// and can inject message middleware specific to a method.
/// </summary>
/// <param name="context"></param>
/// <param name="next">Must be called to activate the new layer of middleware.</param>
void Handle(IControllerBindingContext context, Action next);
/// <summary>
/// Called before a Controller method is registered. Can change the way parameters and return values are handled,
/// and can inject message middleware specific to a method.
/// </summary>
/// <param name="context"></param>
/// <param name="next">Must be called to activate the new layer of middleware.</param>
void Handle(IControllerBindingContext context, Action next);
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
// ReSharper disable UnusedMemberInSuper.Global - public API
@ -34,12 +35,12 @@ namespace Tapeti.Config
/// <summary>
/// Contains the raw body of the message.
/// </summary>
byte[] RawBody { get; }
byte[]? RawBody { get; }
/// <summary>
/// Contains the decoded message instance.
/// </summary>
object Message { get; }
object? Message { get; }
/// <summary>
/// Provides access to the message metadata.
@ -55,7 +56,7 @@ namespace Tapeti.Config
/// Contains a CancellationToken which is cancelled when the connection to the RabbitMQ server is closed.
/// Note that this token is cancelled regardless of whether the connection will be reestablished, as any
/// messages still in the queue will be redelivered with a new token.
/// </summary>
/// </summary>
CancellationToken ConnectionClosed { get; }
/// <summary>
@ -87,7 +88,7 @@ namespace Tapeti.Config
/// Returns true and the payload value if this message context was previously enriched with the payload T.
/// </summary>
/// <typeparam name="T">The payload type as passed to Enrich</typeparam>
bool TryGet<T>(out T payload) where T : IMessageContextPayload;
bool TryGet<T>([NotNullWhen(true)] out T? payload) where T : IMessageContextPayload;
/// <summary>
/// Stores a key-value pair in the context for passing information between the various
@ -105,7 +106,7 @@ namespace Tapeti.Config
/// <param name="value"></param>
/// <returns>True if the value was found, False otherwise</returns>
[Obsolete("For backwards compatibility only. Use Get<T> payload overload for typed properties instead")]
bool Get<T>(string key, out T value) where T : class;
bool Get<T>(string key, out T? value) where T : class;
}

View File

@ -9,13 +9,13 @@ namespace Tapeti.Config
public interface IMessageProperties
{
/// <summary></summary>
string ContentType { get; set; }
string? ContentType { get; set; }
/// <summary></summary>
string CorrelationId { get; set; }
string? CorrelationId { get; set; }
/// <summary></summary>
string ReplyTo { get; set; }
string? ReplyTo { get; set; }
/// <summary></summary>
bool? Persistent { get; set; }
@ -37,7 +37,7 @@ namespace Tapeti.Config
/// </summary>
/// <param name="name"></param>
/// <returns>The value if found, null otherwise</returns>
string GetHeader(string name);
string? GetHeader(string name);
/// <summary>

View File

@ -16,7 +16,7 @@ namespace Tapeti.Config
/// <summary>
/// The exchange to which the message will be published.
/// </summary>
string Exchange { get; set; }
string? Exchange { get; set; }
/// <summary>
/// The routing key which will be included with the message.
@ -31,6 +31,6 @@ namespace Tapeti.Config
/// <summary>
/// Provides access to the message metadata.
/// </summary>
IMessageProperties Properties { get; }
IMessageProperties? Properties { get; }
}
}

View File

@ -87,40 +87,13 @@ namespace Tapeti.Config
/// </summary>
/// <param name="method"></param>
/// <returns>The binding if found, null otherwise</returns>
IControllerMethodBinding ForMethod(Delegate method);
IControllerMethodBinding? ForMethod(Delegate method);
/// <summary>
/// Searches for a binding linked to the specified method.
/// </summary>
/// <param name="method"></param>
/// <returns>The binding if found, null otherwise</returns>
IControllerMethodBinding ForMethod(MethodInfo method);
IControllerMethodBinding? ForMethod(MethodInfo method);
}
/*
public interface IBinding
{
Type Controller { get; }
MethodInfo Method { get; }
Type MessageClass { get; }
string QueueName { get; }
QueueBindingMode QueueBindingMode { get; set; }
IReadOnlyList<IMessageFilterMiddleware> MessageFilterMiddleware { get; }
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
bool Accept(Type messageClass);
bool Accept(IMessageContext context, object message);
Task Invoke(IMessageContext context, object message);
}
*/
/*
public interface IBuildBinding : IBinding
{
void SetQueueName(string queueName);
}
*/
}

View File

@ -37,9 +37,9 @@ namespace Tapeti.Connection
}
/// <inheritdoc />
public override bool Equals(object obj)
public override bool Equals(object? obj)
{
if (ReferenceEquals(null, obj)) return false;
if (obj is null) return false;
return obj is QueueBinding other && Equals(other);
}
@ -79,7 +79,7 @@ namespace Tapeti.Connection
/// <param name="exchange">The exchange to publish the message to, or empty to send it directly to a queue</param>
/// <param name="routingKey">The routing key for the message, or queue name if exchange is empty</param>
/// <param name="mandatory">If true, an exception will be raised if the message can not be delivered to at least one queue</param>
Task Publish(byte[] body, IMessageProperties properties, string exchange, string routingKey, bool mandatory);
Task Publish(byte[] body, IMessageProperties properties, string? exchange, string routingKey, bool mandatory);
/// <summary>
@ -89,7 +89,7 @@ namespace Tapeti.Connection
/// <param name="consumer">The consumer implementation which will receive the messages from the queue</param>
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
/// <returns>The consumer tag as returned by BasicConsume.</returns>
Task<TapetiConsumerTag> Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken);
Task<TapetiConsumerTag?> Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken);
/// <summary>
/// Stops the consumer with the specified tag.
@ -104,7 +104,7 @@ namespace Tapeti.Connection
/// <param name="bindings">A list of bindings. Any bindings already on the queue which are not in this list will be removed</param>
/// <param name="arguments">Optional arguments</param>
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
Task DurableQueueDeclare(string queueName, IEnumerable<QueueBinding> bindings, IRabbitMQArguments arguments, CancellationToken cancellationToken);
Task DurableQueueDeclare(string queueName, IEnumerable<QueueBinding> bindings, IRabbitMQArguments? arguments, CancellationToken cancellationToken);
/// <summary>
/// Verifies a durable queue exists. Will raise an exception if it does not.
@ -112,7 +112,7 @@ namespace Tapeti.Connection
/// <param name="queueName">The name of the queue to verify</param>
/// <param name="arguments">Optional arguments</param>
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
Task DurableQueueVerify(string queueName, IRabbitMQArguments arguments, CancellationToken cancellationToken);
Task DurableQueueVerify(string queueName, IRabbitMQArguments? arguments, CancellationToken cancellationToken);
/// <summary>
/// Deletes a durable queue.
@ -128,7 +128,7 @@ namespace Tapeti.Connection
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <param name="arguments">Optional arguments</param>
/// <param name="cancellationToken">Cancelled when the connection is lost</param>
Task<string> DynamicQueueDeclare(string queuePrefix, IRabbitMQArguments arguments, CancellationToken cancellationToken);
Task<string> DynamicQueueDeclare(string? queuePrefix, IRabbitMQArguments? arguments, CancellationToken cancellationToken);
/// <summary>
/// Add a binding to a dynamic queue.

View File

@ -15,17 +15,10 @@ namespace Tapeti.Connection
{
}
#if NETSTANDARD2_1_OR_GREATER
public RabbitMQArguments(IReadOnlyDictionary<string, object> values) : base(values)
{
}
#else
public RabbitMQArguments(IReadOnlyDictionary<string, object> values)
{
foreach (var pair in values)
Add(pair.Key, pair.Value);
}
#endif
public void AddUTF8(string key, string value)

View File

@ -21,7 +21,7 @@ namespace Tapeti.Connection
{
private readonly Func<IModel> modelFactory;
private readonly object taskQueueLock = new();
private SingleThreadTaskQueue taskQueue;
private SingleThreadTaskQueue? taskQueue;
private readonly ModelProvider modelProvider;
@ -34,7 +34,7 @@ namespace Tapeti.Connection
public async Task Reset()
{
SingleThreadTaskQueue capturedTaskQueue;
SingleThreadTaskQueue? capturedTaskQueue;
lock (taskQueueLock)
{

View File

@ -43,7 +43,7 @@ namespace Tapeti.Connection
/// <summary>
/// Receives events when the connection state changes.
/// </summary>
public IConnectionEventListener ConnectionEventListener { get; set; }
public IConnectionEventListener? ConnectionEventListener { get; set; }
private readonly TapetiChannel consumeChannel;
@ -53,9 +53,9 @@ namespace Tapeti.Connection
// These fields must be locked using connectionLock
private readonly object connectionLock = new();
private long connectionReference;
private RabbitMQ.Client.IConnection connection;
private IModel consumeChannelModel;
private IModel publishChannelModel;
private RabbitMQ.Client.IConnection? connection;
private IModel? consumeChannelModel;
private IModel? publishChannelModel;
private bool isClosing;
private bool isReconnect;
private DateTime connectedDateTime;
@ -72,8 +72,15 @@ namespace Tapeti.Connection
private class ConfirmMessageInfo
{
public string ReturnKey;
public TaskCompletionSource<int> CompletionSource;
public string ReturnKey { get; }
public TaskCompletionSource<int> CompletionSource { get; }
public ConfirmMessageInfo(string returnKey, TaskCompletionSource<int> completionSource)
{
ReturnKey = returnKey;
CompletionSource = completionSource;
}
}
@ -110,7 +117,7 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task Publish(byte[] body, IMessageProperties properties, string exchange, string routingKey, bool mandatory)
public async Task Publish(byte[] body, IMessageProperties properties, string? exchange, string routingKey, bool mandatory)
{
if (string.IsNullOrEmpty(routingKey))
throw new ArgumentNullException(nameof(routingKey));
@ -118,17 +125,14 @@ namespace Tapeti.Connection
await GetTapetiChannel(TapetiChannelType.Publish).QueueWithProvider(async channelProvider =>
{
Task<int> publishResultTask = null;
var messageInfo = new ConfirmMessageInfo
{
ReturnKey = GetReturnKey(exchange, routingKey),
CompletionSource = new TaskCompletionSource<int>()
};
Task<int>? publishResultTask = null;
var messageInfo = new ConfirmMessageInfo(GetReturnKey(exchange ?? string.Empty, routingKey), new TaskCompletionSource<int>());
channelProvider.WithRetryableChannel(channel =>
{
DeclareExchange(channel, exchange);
if (exchange != null)
DeclareExchange(channel, exchange);
// The delivery tag is lost after a reconnect, register under the new tag
if (config.Features.PublisherConfirms)
@ -153,7 +157,7 @@ namespace Tapeti.Connection
try
{
var publishProperties = new RabbitMQMessageProperties(channel.CreateBasicProperties(), properties);
channel.BasicPublish(exchange ?? "", routingKey, mandatory, publishProperties.BasicProperties, body);
channel.BasicPublish(exchange ?? string.Empty, routingKey, mandatory, publishProperties.BasicProperties, body);
}
catch
{
@ -202,7 +206,7 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task<TapetiConsumerTag> Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken)
public async Task<TapetiConsumerTag?> Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken)
{
if (deletedQueues.Contains(queueName))
return null;
@ -212,7 +216,7 @@ namespace Tapeti.Connection
long capturedConnectionReference = -1;
string consumerTag = null;
string? consumerTag = null;
await GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(channel =>
{
@ -224,7 +228,9 @@ namespace Tapeti.Connection
consumerTag = channel.BasicConsume(queueName, false, basicConsumer);
});
return new TapetiConsumerTag(capturedConnectionReference, consumerTag);
return consumerTag == null
? null
: new TapetiConsumerTag(capturedConnectionReference, consumerTag);
}
@ -289,7 +295,7 @@ namespace Tapeti.Connection
}
private async Task<bool> GetDurableQueueDeclareRequired(string queueName, IRabbitMQArguments arguments)
private async Task<bool> GetDurableQueueDeclareRequired(string queueName, IRabbitMQArguments? arguments)
{
var existingQueue = await GetQueueInfo(queueName);
if (existingQueue == null)
@ -298,9 +304,6 @@ namespace Tapeti.Connection
if (!existingQueue.Durable || existingQueue.AutoDelete || existingQueue.Exclusive)
throw new InvalidOperationException($"Durable queue {queueName} already exists with incompatible parameters, durable = {existingQueue.Durable} (expected True), autoDelete = {existingQueue.AutoDelete} (expected False), exclusive = {existingQueue.Exclusive} (expected False)");
if (arguments == null && existingQueue.Arguments.Count == 0)
return true;
var existingArguments = ConvertJsonArguments(existingQueue.Arguments);
if (existingArguments.NullSafeSameValues(arguments))
return true;
@ -310,7 +313,7 @@ namespace Tapeti.Connection
}
private static RabbitMQArguments ConvertJsonArguments(IReadOnlyDictionary<string, JObject> arguments)
private static RabbitMQArguments? ConvertJsonArguments(IReadOnlyDictionary<string, JObject>? arguments)
{
if (arguments == null)
return null;
@ -325,7 +328,6 @@ namespace Tapeti.Connection
JTokenType.Float => pair.Value.Value<double>(),
JTokenType.String => Encoding.UTF8.GetBytes(pair.Value.Value<string>() ?? string.Empty),
JTokenType.Boolean => pair.Value.Value<bool>(),
JTokenType.Null => null,
_ => throw new ArgumentOutOfRangeException(nameof(arguments))
};
@ -338,7 +340,7 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task DurableQueueDeclare(string queueName, IEnumerable<QueueBinding> bindings, IRabbitMQArguments arguments, CancellationToken cancellationToken)
public async Task DurableQueueDeclare(string queueName, IEnumerable<QueueBinding> bindings, IRabbitMQArguments? arguments, CancellationToken cancellationToken)
{
var declareRequired = await GetDurableQueueDeclareRequired(queueName, arguments);
@ -373,7 +375,7 @@ namespace Tapeti.Connection
}
private static IDictionary<string, object> GetDeclareArguments(IRabbitMQArguments arguments)
private static IDictionary<string, object>? GetDeclareArguments(IRabbitMQArguments? arguments)
{
return arguments == null || arguments.Count == 0
? null
@ -382,7 +384,7 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task DurableQueueVerify(string queueName, IRabbitMQArguments arguments, CancellationToken cancellationToken)
public async Task DurableQueueVerify(string queueName, IRabbitMQArguments? arguments, CancellationToken cancellationToken)
{
if (!await GetDurableQueueDeclareRequired(queueName, arguments))
return;
@ -484,9 +486,9 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task<string> DynamicQueueDeclare(string queuePrefix, IRabbitMQArguments arguments, CancellationToken cancellationToken)
public async Task<string> DynamicQueueDeclare(string? queuePrefix, IRabbitMQArguments? arguments, CancellationToken cancellationToken)
{
string queueName = null;
string? queueName = null;
var bindingLogger = logger as IBindingLogger;
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
@ -507,6 +509,10 @@ namespace Tapeti.Connection
}
});
cancellationToken.ThrowIfCancellationRequested();
if (queueName == null)
throw new InvalidOperationException("Failed to declare dynamic queue");
return queueName;
}
@ -528,9 +534,9 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task Close()
{
IModel capturedConsumeModel;
IModel capturedPublishModel;
RabbitMQ.Client.IConnection capturedConnection;
IModel? capturedConsumeModel;
IModel? capturedPublishModel;
RabbitMQ.Client.IConnection? capturedConnection;
lock (connectionLock)
{
@ -578,10 +584,10 @@ namespace Tapeti.Connection
private class ManagementQueueInfo
{
[JsonProperty("name")]
public string Name { get; set; }
public string? Name { get; set; }
[JsonProperty("vhost")]
public string VHost { get; set; }
public string? VHost { get; set; }
[JsonProperty("durable")]
public bool Durable { get; set; }
@ -593,7 +599,7 @@ namespace Tapeti.Connection
public bool Exclusive { get; set; }
[JsonProperty("arguments")]
public Dictionary<string, JObject> Arguments { get; set; }
public Dictionary<string, JObject>? Arguments { get; set; }
[JsonProperty("messages")]
public uint Messages { get; set; }
@ -601,7 +607,7 @@ namespace Tapeti.Connection
private async Task<ManagementQueueInfo> GetQueueInfo(string queueName)
private async Task<ManagementQueueInfo?> GetQueueInfo(string queueName)
{
var virtualHostPath = Uri.EscapeDataString(connectionParams.VirtualHost);
var queuePath = Uri.EscapeDataString(queueName);
@ -622,25 +628,25 @@ namespace Tapeti.Connection
private class ManagementBinding
{
[JsonProperty("source")]
public string Source { get; set; }
public string? Source { get; set; }
[JsonProperty("vhost")]
public string Vhost { get; set; }
public string? Vhost { get; set; }
[JsonProperty("destination")]
public string Destination { get; set; }
public string? Destination { get; set; }
[JsonProperty("destination_type")]
public string DestinationType { get; set; }
public string? DestinationType { get; set; }
[JsonProperty("routing_key")]
public string RoutingKey { get; set; }
public string? RoutingKey { get; set; }
[JsonProperty("arguments")]
public Dictionary<string, string> Arguments { get; set; }
public Dictionary<string, string>? Arguments { get; set; }
[JsonProperty("properties_key")]
public string PropertiesKey { get; set; }
public string? PropertiesKey { get; set; }
}
@ -658,8 +664,8 @@ namespace Tapeti.Connection
// Filter out the binding to an empty source, which is always present for direct-to-queue routing
return bindings?
.Where(binding => !string.IsNullOrEmpty(binding.Source))
.Select(binding => new QueueBinding(binding.Source, binding.RoutingKey))
.Where(binding => !string.IsNullOrEmpty(binding.Source) && !string.IsNullOrEmpty(binding.RoutingKey))
.Select(binding => new QueueBinding(binding.Source!, binding.RoutingKey!))
?? Enumerable.Empty<QueueBinding>();
});
}
@ -723,9 +729,6 @@ namespace Tapeti.Connection
private void DeclareExchange(IModel channel, string exchange)
{
if (string.IsNullOrEmpty(exchange))
return;
if (declaredExchanges.Contains(exchange))
return;
@ -791,9 +794,9 @@ namespace Tapeti.Connection
{
try
{
RabbitMQ.Client.IConnection capturedConnection;
IModel capturedConsumeChannelModel;
IModel capturedPublishChannelModel;
RabbitMQ.Client.IConnection? capturedConnection;
IModel? capturedConsumeChannelModel;
IModel? capturedPublishChannelModel;
lock (connectionLock)
@ -805,7 +808,7 @@ namespace Tapeti.Connection
{
try
{
if (connection.IsOpen)
if (connection is { IsOpen: true })
connection.Close();
}
catch (AlreadyClosedException)
@ -813,7 +816,7 @@ namespace Tapeti.Connection
}
finally
{
connection.Dispose();
connection?.Dispose();
}
connection = null;
@ -873,12 +876,7 @@ namespace Tapeti.Connection
consumeChannelModel = null;
}
ConnectionEventListener?.Disconnected(new DisconnectedEventArgs
{
ReplyCode = e.ReplyCode,
ReplyText = e.ReplyText
});
ConnectionEventListener?.Disconnected(new DisconnectedEventArgs(e.ReplyCode, e.ReplyText));
logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText));
// Reconnect if the disconnect was unexpected
@ -906,11 +904,7 @@ namespace Tapeti.Connection
connectedDateTime = DateTime.UtcNow;
var connectedEventArgs = new ConnectedEventArgs
{
ConnectionParams = connectionParams,
LocalPort = capturedConnection.LocalPort
};
var connectedEventArgs = new ConnectedEventArgs(connectionParams, capturedConnection.LocalPort);
if (isReconnect)
ConnectionEventListener?.Reconnected(connectedEventArgs);
@ -938,7 +932,7 @@ namespace Tapeti.Connection
}
private void HandleBasicReturn(object sender, BasicReturnEventArgs e)
private void HandleBasicReturn(object? sender, BasicReturnEventArgs e)
{
/*
* "If the message is also published as mandatory, the basic.return is sent to the client before basic.ack."
@ -968,7 +962,7 @@ namespace Tapeti.Connection
}
private void HandleBasicAck(object sender, BasicAckEventArgs e)
private void HandleBasicAck(object? sender, BasicAckEventArgs e)
{
Monitor.Enter(confirmLock);
try
@ -999,7 +993,7 @@ namespace Tapeti.Connection
}
private void HandleBasicNack(object sender, BasicNackEventArgs e)
private void HandleBasicNack(object? sender, BasicNackEventArgs e)
{
Monitor.Enter(confirmLock);
try
@ -1048,10 +1042,10 @@ namespace Tapeti.Connection
public TapetiConnectionParams ConnectionParams { get; }
public bool IsReconnect { get; }
public int LocalPort { get; }
public Exception Exception { get; }
public Exception? Exception { get; }
public ConnectContext(TapetiConnectionParams connectionParams, bool isReconnect, int localPort = 0, Exception exception = null)
public ConnectContext(TapetiConnectionParams connectionParams, bool isReconnect, int localPort = 0, Exception? exception = null)
{
ConnectionParams = connectionParams;
IsReconnect = isReconnect;

View File

@ -40,7 +40,7 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task<ConsumeResult> Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body)
{
object message = null;
object? message = null;
try
{
try
@ -73,7 +73,7 @@ namespace Tapeti.Connection
RawBody = body,
Message = message,
Properties = properties,
Binding = null,
Binding = new ExceptionContextBinding(queueName),
ConnectionClosed = CancellationToken.None
};
@ -184,5 +184,42 @@ namespace Tapeti.Connection
public string RoutingKey;
public IMessageProperties Properties;
}
private class ExceptionContextBinding : IBinding
{
public string? QueueName { get; }
public QueueType? QueueType => null;
public ExceptionContextBinding(string? queueName)
{
QueueName = queueName;
}
public ValueTask Apply(IBindingTarget target)
{
throw new InvalidOperationException("Apply method should not be called on a binding in an Exception context");
}
public bool Accept(Type messageClass)
{
throw new InvalidOperationException("Accept method should not be called on a binding in an Exception context");
}
public ValueTask Invoke(IMessageContext context)
{
throw new InvalidOperationException("Invoke method should not be called on a binding in an Exception context");
}
public ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult)
{
throw new InvalidOperationException("Cleanup method should not be called on a binding in an Exception context");
}
}
}
}

View File

@ -38,14 +38,14 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Action<TResponse>>> responseMethodSelector) where TController : class
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Action<TResponse>>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class
{
await PublishRequest(message, responseMethodSelector.Body);
}
/// <inheritdoc />
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Func<TResponse, Task>>> responseMethodSelector) where TController : class
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Func<TResponse, Task>>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class
{
await PublishRequest(message, responseMethodSelector.Body);
}
@ -97,7 +97,7 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task Publish(object message, IMessageProperties properties, bool mandatory)
public async Task Publish(object message, IMessageProperties? properties, bool mandatory)
{
var messageClass = message.GetType();
var exchange = exchangeStrategy.GetExchange(messageClass);
@ -108,13 +108,13 @@ namespace Tapeti.Connection
/// <inheritdoc />
public async Task PublishDirect(object message, string queueName, IMessageProperties properties, bool mandatory)
public async Task PublishDirect(object message, string queueName, IMessageProperties? properties, bool mandatory)
{
await Publish(message, properties, null, queueName, mandatory);
}
private async Task Publish(object message, IMessageProperties properties, string exchange, string routingKey, bool mandatory)
private async Task Publish(object message, IMessageProperties? properties, string? exchange, string routingKey, bool mandatory)
{
var writableProperties = new MessageProperties(properties);
@ -151,11 +151,11 @@ namespace Tapeti.Connection
private class PublishContext : IPublishContext
{
public ITapetiConfig Config { get; init; }
public string Exchange { get; set; }
public string RoutingKey { get; init; }
public object Message { get; init; }
public IMessageProperties Properties { get; init; }
public ITapetiConfig Config { get; init; } = null!;
public string? Exchange { get; set; }
public string RoutingKey { get; init; } = null!;
public object Message { get; init; } = null!;
public IMessageProperties? Properties { get; init; }
}
}
}

View File

@ -16,7 +16,7 @@ namespace Tapeti.Connection
private bool consuming;
private readonly List<TapetiConsumerTag> consumerTags = new();
private CancellationTokenSource initializeCancellationTokenSource;
private CancellationTokenSource? initializeCancellationTokenSource;
public TapetiSubscriber(Func<ITapetiClient> clientFactory, ITapetiConfig config)
@ -141,15 +141,24 @@ namespace Tapeti.Connection
private async Task ConsumeQueues(CancellationToken cancellationToken)
{
var queues = config.Bindings.GroupBy(binding => binding.QueueName);
consumerTags.AddRange((await Task.WhenAll(queues.Select(async group =>
var queues = config.Bindings.GroupBy(binding =>
{
var queueName = group.Key;
var consumer = new TapetiConsumer(cancellationToken, config, queueName, group);
if (string.IsNullOrEmpty(binding.QueueName))
throw new InvalidOperationException("QueueName must not be empty");
return await clientFactory().Consume(queueName, consumer, cancellationToken);
}))).Where(t => t != null));
return binding.QueueName;
});
consumerTags.AddRange(
(await Task.WhenAll(queues.Select(async group =>
{
var queueName = group.Key;
var consumer = new TapetiConsumer(cancellationToken, config, queueName, group);
return await clientFactory().Consume(queueName, consumer, cancellationToken);
})))
.Where(t => t?.ConsumerTag != null)
.Cast<TapetiConsumerTag>());
}
@ -164,7 +173,7 @@ namespace Tapeti.Connection
{
public string QueueName;
public List<Type> MessageClasses;
public IRabbitMQArguments Arguments;
public IRabbitMQArguments? Arguments;
}
private readonly Dictionary<string, List<DynamicQueueInfo>> dynamicQueues = new();
@ -185,12 +194,12 @@ namespace Tapeti.Connection
}
public abstract ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments arguments);
public abstract ValueTask BindDurableDirect(string queueName, IRabbitMQArguments arguments);
public abstract ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments);
public abstract ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments);
public abstract ValueTask BindDurableObsolete(string queueName);
public async ValueTask<string> BindDynamic(Type messageClass, string queuePrefix, IRabbitMQArguments arguments)
public async ValueTask<string> BindDynamic(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments)
{
var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments);
if (!result.IsNewMessageClass)
@ -205,14 +214,14 @@ namespace Tapeti.Connection
}
public async ValueTask<string> BindDynamicDirect(Type messageClass, string queuePrefix, IRabbitMQArguments arguments)
public async ValueTask<string> BindDynamicDirect(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments)
{
var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments);
return result.QueueName;
}
public async ValueTask<string> BindDynamicDirect(string queuePrefix, IRabbitMQArguments arguments)
public async ValueTask<string> BindDynamicDirect(string? queuePrefix, IRabbitMQArguments? arguments)
{
// If we don't know the routing key, always create a new queue to ensure there is no overlap.
// Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either.
@ -226,7 +235,7 @@ namespace Tapeti.Connection
public bool IsNewMessageClass;
}
private async Task<DeclareDynamicQueueResult> DeclareDynamicQueue(Type messageClass, string queuePrefix, IRabbitMQArguments arguments)
private async Task<DeclareDynamicQueueResult> DeclareDynamicQueue(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments)
{
// Group by prefix
var key = queuePrefix ?? "";
@ -282,7 +291,7 @@ namespace Tapeti.Connection
private struct DurableQueueInfo
{
public List<Type> MessageClasses;
public IRabbitMQArguments Arguments;
public IRabbitMQArguments? Arguments;
}
@ -295,7 +304,7 @@ namespace Tapeti.Connection
}
public override ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments arguments)
public override ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments)
{
// Collect the message classes per queue so we can determine afterwards
// if any of the bindings currently set on the durable queue are no
@ -324,7 +333,7 @@ namespace Tapeti.Connection
}
public override ValueTask BindDurableDirect(string queueName, IRabbitMQArguments arguments)
public override ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments)
{
if (!durableQueues.TryGetValue(queueName, out var durableQueueInfo))
{
@ -396,12 +405,12 @@ namespace Tapeti.Connection
}
public override async ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments arguments)
public override async ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments)
{
await VerifyDurableQueue(queueName, arguments);
}
public override async ValueTask BindDurableDirect(string queueName, IRabbitMQArguments arguments)
public override async ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments)
{
await VerifyDurableQueue(queueName, arguments);
}
@ -412,7 +421,7 @@ namespace Tapeti.Connection
}
private async Task VerifyDurableQueue(string queueName, IRabbitMQArguments arguments)
private async Task VerifyDurableQueue(string queueName, IRabbitMQArguments? arguments)
{
if (!durableQueues.Add(queueName))
return;
@ -429,12 +438,12 @@ namespace Tapeti.Connection
}
public override ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments arguments)
public override ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments)
{
return default;
}
public override ValueTask BindDurableDirect(string queueName, IRabbitMQArguments arguments)
public override ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments)
{
return default;
}

View File

@ -80,14 +80,17 @@ namespace Tapeti.Default
}
/// <inheritdoc />
public void QueueExistsWarning(string queueName, IRabbitMQArguments existingArguments, IRabbitMQArguments arguments)
public void QueueExistsWarning(string queueName, IRabbitMQArguments? existingArguments, IRabbitMQArguments? arguments)
{
Console.WriteLine($"[Tapeti] Durable queue {queueName} exists with incompatible x-arguments ({GetArgumentsText(existingArguments)} vs. {GetArgumentsText(arguments)}) and will not be redeclared, queue will be consumed as-is");
}
private static string GetArgumentsText(IRabbitMQArguments arguments)
private static string GetArgumentsText(IRabbitMQArguments? arguments)
{
if (arguments == null || arguments.Count == 0)
return "empty";
var argumentsText = new StringBuilder();
foreach (var pair in arguments)
{

View File

@ -26,7 +26,7 @@ namespace Tapeti.Default
/// <inheritdoc />
public Type MessageClass { get; set; }
public Type? MessageClass { get; set; }
/// <inheritdoc />
public bool HasMessageClass => MessageClass != null;
@ -44,10 +44,12 @@ namespace Tapeti.Default
public IBindingResult Result => result;
public ControllerBindingContext(IEnumerable<ParameterInfo> parameters, ParameterInfo result)
public ControllerBindingContext(Type controller, MethodInfo method, IEnumerable<ParameterInfo> parameters, ParameterInfo result)
{
this.parameters = parameters.Select(parameter => new ControllerBindingParameter(parameter)).ToList();
Controller = controller;
Method = method;
this.parameters = parameters.Select(parameter => new ControllerBindingParameter(parameter)).ToList();
this.result = new ControllerBindingResult(result);
}
@ -84,7 +86,13 @@ namespace Tapeti.Default
/// </summary>
public IEnumerable<ValueFactory> GetParameterHandlers()
{
return parameters.Select(p => p.Binding);
return parameters.Select(p =>
{
if (p.Binding == null)
throw new TopologyConfigurationException($"No Binding for parameter {p.Info.Name}");
return p.Binding;
});
}
@ -92,7 +100,7 @@ namespace Tapeti.Default
/// Returns the configured result handler.
/// </summary>
/// <returns></returns>
public ResultHandler GetResultHandler()
public ResultHandler? GetResultHandler()
{
return result.Handler;
}
@ -107,7 +115,7 @@ namespace Tapeti.Default
/// <summary>
/// Provides access to the configured binding.
/// </summary>
public ValueFactory Binding { get; set; }
public ValueFactory? Binding { get; set; }
/// <inheritdoc />
@ -146,7 +154,7 @@ namespace Tapeti.Default
/// <summary>
/// Provides access to the configured handler.
/// </summary>
public ResultHandler Handler { get; set; }
public ResultHandler? Handler { get; set; }
/// <inheritdoc />

View File

@ -60,7 +60,7 @@ namespace Tapeti.Default
/// <summary>
/// The return value handler.
/// </summary>
public ResultHandler ResultHandler;
public ResultHandler? ResultHandler;
/// <summary>
@ -87,10 +87,10 @@ namespace Tapeti.Default
/// <inheritdoc />
public string QueueName { get; private set; }
public string? QueueName { get; private set; }
/// <inheritdoc />
public QueueType QueueType => bindingInfo.QueueInfo.QueueType;
public QueueType? QueueType => bindingInfo.QueueInfo.QueueType;
/// <inheritdoc />
public Type Controller => bindingInfo.ControllerType;
@ -116,7 +116,7 @@ namespace Tapeti.Default
switch (bindingInfo.BindingTargetMode)
{
case BindingTargetMode.Default:
if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic)
if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Dynamic)
QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments);
else
{
@ -127,7 +127,7 @@ namespace Tapeti.Default
break;
case BindingTargetMode.Direct:
if (bindingInfo.QueueInfo.QueueType == QueueType.Dynamic)
if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Dynamic)
QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments);
else
{
@ -141,7 +141,7 @@ namespace Tapeti.Default
throw new ArgumentOutOfRangeException(nameof(bindingInfo.BindingTargetMode), bindingInfo.BindingTargetMode, "Invalid BindingTargetMode");
}
}
else if (bindingInfo.QueueInfo.QueueType == QueueType.Durable)
else if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Durable)
{
await target.BindDurableObsolete(bindingInfo.QueueInfo.Name);
QueueName = bindingInfo.QueueInfo.Name;
@ -159,8 +159,11 @@ namespace Tapeti.Default
/// <inheritdoc />
public async ValueTask Invoke(IMessageContext context)
{
if (context.Binding == null)
throw new InvalidOperationException("Invoke should not be called on a context without a binding");
var controller = Method.IsStatic ? null : dependencyResolver.Resolve(bindingInfo.ControllerType);
context.Store(new ControllerMessageContextPayload(controller, context.Binding as IControllerMethodBinding));
context.Store(new ControllerMessageContextPayload(controller, (IControllerMethodBinding)context.Binding));
if (!await FilterAllowed(context))
return;
@ -202,7 +205,7 @@ namespace Tapeti.Default
private delegate ValueTask MessageHandlerFunc(IMessageContext context);
private MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameterFactories, ResultHandler resultHandler)
private MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameterFactories, ResultHandler? resultHandler)
{
if (resultHandler != null)
return WrapResultHandlerMethod(method.CreateExpressionInvoke(), parameterFactories, resultHandler);
@ -245,7 +248,7 @@ namespace Tapeti.Default
{
var controllerPayload = context.Get<ControllerMessageContextPayload>();
try
{
{
invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray());
return default;
}
@ -296,8 +299,8 @@ namespace Tapeti.Default
private void AddExceptionData(Exception exception)
{
exception.Data["Tapeti.Controller.Name"] = bindingInfo.ControllerType?.FullName;
exception.Data["Tapeti.Controller.Method"] = bindingInfo.Method?.Name;
exception.Data["Tapeti.Controller.Name"] = bindingInfo.ControllerType.FullName;
exception.Data["Tapeti.Controller.Method"] = bindingInfo.Method.Name;
}
@ -319,12 +322,19 @@ namespace Tapeti.Default
/// <summary>
/// Optional arguments (x-arguments) passed when declaring the queue.
/// </summary>
public IRabbitMQArguments QueueArguments { get; set; }
public IRabbitMQArguments? QueueArguments { get; set; }
/// <summary>
/// Determines if the QueueInfo properties contain a valid combination.
/// </summary>
public bool IsValid => QueueType == QueueType.Dynamic || !string.IsNullOrEmpty(Name);
public QueueInfo(QueueType queueType, string name)
{
QueueType = queueType;
Name = name;
}
}
}
}

View File

@ -24,7 +24,7 @@ namespace Tapeti.Default
/// <inheritdoc />
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer)
{
if (value == null)
{
@ -41,7 +41,7 @@ namespace Tapeti.Default
/// <inheritdoc />
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer)
{
var isNullable = IsNullableType(objectType);

View File

@ -46,7 +46,7 @@ namespace Tapeti.Default
/// <inheritdoc />
public object Deserialize(byte[] body, IMessageProperties properties)
public object? Deserialize(byte[] body, IMessageProperties properties)
{
if (properties.ContentType is not ContentType)
throw new ArgumentException($"content_type must be {ContentType}");

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Tapeti.Config;
@ -12,28 +13,28 @@ namespace Tapeti.Default
/// <inheritdoc />
public ITapetiConfig Config { get; set; }
public ITapetiConfig Config { get; set; } = null!;
/// <inheritdoc />
public string Queue { get; set; }
public string Queue { get; set; } = null!;
/// <inheritdoc />
public string Exchange { get; set; }
public string Exchange { get; set; } = null!;
/// <inheritdoc />
public string RoutingKey { get; set; }
public string RoutingKey { get; set; } = null!;
/// <inheritdoc />
public byte[] RawBody { get; set; }
public byte[] RawBody { get; set; } = null!;
/// <inheritdoc />
public object Message { get; set; }
public object? Message { get; set; }
/// <inheritdoc />
public IMessageProperties Properties { get; set; }
public IMessageProperties Properties { get; set; } = null!;
/// <inheritdoc />
public IBinding Binding { get; set; }
public IBinding Binding { get; set; } = null!;
/// <inheritdoc />
public CancellationToken ConnectionClosed { get; set; }
@ -57,7 +58,7 @@ namespace Tapeti.Default
return (T)payloads[typeof(T)];
}
public bool TryGet<T>(out T payload) where T : IMessageContextPayload
public bool TryGet<T>([NotNullWhen(true)] out T? payload) where T : IMessageContextPayload
{
if (payloads.TryGetValue(typeof(T), out var payloadValue))
{
@ -100,7 +101,7 @@ namespace Tapeti.Default
/// <inheritdoc />
public bool Get<T>(string key, out T value) where T : class
public bool Get<T>(string key, out T? value) where T : class
{
if (!TryGet<KeyValuePayload>(out var payload) ||
!payload.TryGetValue(key, out var objectValue))
@ -109,7 +110,7 @@ namespace Tapeti.Default
return false;
}
value = (T)objectValue;
value = (T?)objectValue;
return true;
}
@ -132,7 +133,7 @@ namespace Tapeti.Default
}
public bool TryGetValue(string key, out object value)
public bool TryGetValue(string key, out object? value)
{
return items.TryGetValue(key, out value);
}

View File

@ -13,13 +13,13 @@ namespace Tapeti.Default
/// <inheritdoc />
public string ContentType { get; set; }
public string? ContentType { get; set; }
/// <inheritdoc />
public string CorrelationId { get; set; }
public string? CorrelationId { get; set; }
/// <inheritdoc />
public string ReplyTo { get; set; }
public string? ReplyTo { get; set; }
/// <inheritdoc />
public bool? Persistent { get; set; }
@ -37,7 +37,7 @@ namespace Tapeti.Default
/// <summary>
/// </summary>
public MessageProperties(IMessageProperties source)
public MessageProperties(IMessageProperties? source)
{
if (source == null)
return;
@ -64,7 +64,7 @@ namespace Tapeti.Default
}
/// <inheritdoc />
public string GetHeader(string name)
public string? GetHeader(string name)
{
return headers.TryGetValue(name, out var value) ? value : null;
}

View File

@ -31,7 +31,7 @@ namespace Tapeti.Default
// Tapeti 1.2: if you just want to publish another message as a result of the incoming message, explicitly call IPublisher.Publish.
// ReSharper disable once ConvertIfStatementToSwitchStatement
if (!hasClassResult && expectedClassResult != null || hasClassResult && expectedClassResult != actualType)
throw new ArgumentException($"Message handler for non-request message type {context.MessageClass?.FullName} must return type {expectedClassResult?.FullName ?? "void"} in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}, found: {actualType?.FullName ?? "void"}");
throw new ArgumentException($"Message handler for non-request message type {context.MessageClass?.FullName} must return type {expectedClassResult?.FullName ?? "void"} in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}, found: {actualType.FullName ?? "void"}");
if (!hasClassResult)
return;
@ -48,14 +48,22 @@ namespace Tapeti.Default
var handler = GetType().GetMethod(nameof(PublishGenericTaskResult), BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType);
Debug.Assert(handler != null, nameof(handler) + " != null");
context.Result.SetHandler((messageContext, value) => (ValueTask)handler.Invoke(null, new[] { messageContext, value }));
context.Result.SetHandler((messageContext, value) =>
{
var result = handler.Invoke(null, new[] { messageContext, value });
return result != null ? (ValueTask)result : ValueTask.CompletedTask;
});
break;
case TaskType.ValueTask:
var valueTaskHandler = GetType().GetMethod(nameof(PublishGenericValueTaskResult), BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType);
Debug.Assert(valueTaskHandler != null, nameof(handler) + " != null");
context.Result.SetHandler((messageContext, value) => (ValueTask)valueTaskHandler.Invoke(null, new[] { messageContext, value }));
context.Result.SetHandler((messageContext, value) =>
{
var result = valueTaskHandler.Invoke(null, new[] { messageContext, value });
return result != null ? (ValueTask)result : ValueTask.CompletedTask;
});
break;
default:
@ -79,7 +87,7 @@ namespace Tapeti.Default
}
private static async ValueTask Reply(object message, IMessageContext messageContext)
private static async ValueTask Reply(object? message, IMessageContext messageContext)
{
if (message == null)
throw new ArgumentException("Return value of a request message handler must not be null");

View File

@ -15,21 +15,21 @@ namespace Tapeti.Default
/// <inheritdoc />
public string ContentType
public string? ContentType
{
get => BasicProperties.IsContentTypePresent() ? BasicProperties.ContentType : null;
set { if (!string.IsNullOrEmpty(value)) BasicProperties.ContentType = value; else BasicProperties.ClearContentType(); }
}
/// <inheritdoc />
public string CorrelationId
public string? CorrelationId
{
get => BasicProperties.IsCorrelationIdPresent() ? BasicProperties.CorrelationId : null;
set { if (!string.IsNullOrEmpty(value)) BasicProperties.CorrelationId = value; else BasicProperties.ClearCorrelationId(); }
}
/// <inheritdoc />
public string ReplyTo
public string? ReplyTo
{
get => BasicProperties.IsReplyToPresent() ? BasicProperties.ReplyTo : null;
set { if (!string.IsNullOrEmpty(value)) BasicProperties.ReplyTo = value; else BasicProperties.ClearReplyTo(); }
@ -66,7 +66,7 @@ namespace Tapeti.Default
/// <summary>
/// </summary>
public RabbitMQMessageProperties(IBasicProperties basicProperties, IMessageProperties source)
public RabbitMQMessageProperties(IBasicProperties basicProperties, IMessageProperties? source)
{
BasicProperties = basicProperties;
if (source == null)
@ -97,7 +97,7 @@ namespace Tapeti.Default
/// <inheritdoc />
public string GetHeader(string name)
public string? GetHeader(string name)
{
if (BasicProperties.Headers == null)
return null;

View File

@ -60,7 +60,7 @@ namespace Tapeti.Default
}
private static List<string> SplitPascalCase(string value)
private static List<string>? SplitPascalCase(string value)
{
var split = SeparatorRegex.Split(value);
if (split.Length == 0)

View File

@ -10,7 +10,7 @@ namespace Tapeti.Helpers
/// </summary>
public class ConnectionStringParser
{
private readonly TapetiConnectionParams result = new TapetiAppSettingsConnectionParams();
private readonly TapetiConnectionParams result = new();
private readonly string connectionstring;
private int pos = -1;

View File

@ -10,7 +10,7 @@ namespace Tapeti.Helpers
/// <summary>
/// Checks if two dictionaries are considered compatible. If either is null they are considered empty.
/// </summary>
public static bool NullSafeSameValues(this IReadOnlyDictionary<string, object> arguments1, IReadOnlyDictionary<string, object> arguments2)
public static bool NullSafeSameValues(this IReadOnlyDictionary<string, object>? arguments1, IReadOnlyDictionary<string, object>? arguments2)
{
if (arguments1 == null || arguments2 == null)
return (arguments1 == null || arguments1.Count == 0) && (arguments2 == null || arguments2.Count == 0);

View File

@ -16,7 +16,7 @@ namespace Tapeti.Helpers
/// </summary>
/// <param name="target">The instance on which the method should be called.</param>
/// <param name="args">The arguments passed to the method.</param>
public delegate object ExpressionInvoke(object target, params object[] args);
public delegate object ExpressionInvoke(object? target, params object?[] args);
/// <summary>

View File

@ -16,7 +16,7 @@ namespace Tapeti.Helpers
/// <param name="handle">Receives the middleware which should be called and a reference to the action which will call the next. Pass this on to the middleware.</param>
/// <param name="lastHandler">The action to execute when the innermost middleware calls next.</param>
/// <typeparam name="T"></typeparam>
public static void Go<T>(IReadOnlyList<T> middleware, Action<T, Action> handle, Action lastHandler)
public static void Go<T>(IReadOnlyList<T>? middleware, Action<T, Action> handle, Action lastHandler)
{
var handlerIndex = middleware?.Count - 1 ?? -1;
if (middleware == null || handlerIndex == -1)
@ -45,7 +45,7 @@ namespace Tapeti.Helpers
/// <param name="handle">Receives the middleware which should be called and a reference to the action which will call the next. Pass this on to the middleware.</param>
/// <param name="lastHandler">The action to execute when the innermost middleware calls next.</param>
/// <typeparam name="T"></typeparam>
public static async ValueTask GoAsync<T>(IReadOnlyList<T> middleware, Func<T, Func<ValueTask>, ValueTask> handle, Func<ValueTask> lastHandler)
public static async ValueTask GoAsync<T>(IReadOnlyList<T>? middleware, Func<T, Func<ValueTask>, ValueTask> handle, Func<ValueTask> lastHandler)
{
var handlerIndex = middleware?.Count - 1 ?? -1;
if (middleware == null || handlerIndex == -1)

View File

@ -7,19 +7,27 @@ using System.Threading.Tasks;
namespace Tapeti
{
/// <summary>
///
/// Contains information about the established connection.
/// </summary>
public class ConnectedEventArgs
{
/// <summary>
/// The connection parameters used to establish the connection.
/// </summary>
public TapetiConnectionParams ConnectionParams;
public TapetiConnectionParams ConnectionParams { get; }
/// <summary>
/// The local port for the connection. Useful for identifying the connection in the management interface.
/// </summary>
public int LocalPort;
public int LocalPort { get; }
/// <summary></summary>
public ConnectedEventArgs(TapetiConnectionParams connectionParams, int localPort)
{
ConnectionParams = connectionParams;
LocalPort = localPort;
}
}
@ -31,12 +39,20 @@ namespace Tapeti
/// <summary>
/// The ReplyCode as indicated by the client library
/// </summary>
public ushort ReplyCode;
public ushort ReplyCode { get; }
/// <summary>
/// The ReplyText as indicated by the client library
/// </summary>
public string ReplyText;
public string ReplyText { get; }
/// <summary></summary>
public DisconnectedEventArgs(ushort replyCode, string replyText)
{
ReplyCode = replyCode;
ReplyText = replyText;
}
}

View File

@ -32,7 +32,7 @@ namespace Tapeti
/// <summary>
/// The exception that caused the connection to fail.
/// </summary>
Exception Exception { get; }
Exception? Exception { get; }
}
@ -138,7 +138,7 @@ namespace Tapeti
/// <param name="queueName">The name of the queue that is declared</param>
/// <param name="existingArguments">The x-arguments of the existing queue</param>
/// <param name="arguments">The x-arguments of the queue that would be declared</param>
void QueueExistsWarning(string queueName, IRabbitMQArguments existingArguments, IRabbitMQArguments arguments);
void QueueExistsWarning(string queueName, IRabbitMQArguments? existingArguments, IRabbitMQArguments? arguments);
/// <summary>
/// Called before a binding is added to a queue.

View File

@ -21,6 +21,6 @@ namespace Tapeti
/// <param name="body">The encoded message</param>
/// <param name="properties">The properties as sent along with the message</param>
/// <returns>A decoded instance of the message</returns>
object Deserialize(byte[] body, IMessageProperties properties);
object? Deserialize(byte[] body, IMessageProperties properties);
}
}

View File

@ -29,7 +29,7 @@ namespace Tapeti
/// </remarks>
/// <param name="responseMethodSelector">An expression defining the method which handles the response. Example: c => c.HandleResponse</param>
/// <param name="message">The message to send</param>
Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Action<TResponse>>> responseMethodSelector) where TController : class;
Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Action<TResponse>>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class;
/// <summary>
@ -42,7 +42,7 @@ namespace Tapeti
/// </remarks>
/// <param name="responseMethodSelector">An expression defining the method which handles the response. Example: c => c.HandleResponse</param>
/// <param name="message">The message to send</param>
Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Func<TResponse, Task>>> responseMethodSelector) where TController : class;
Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Func<TResponse, Task>>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class;
/// <summary>
@ -69,7 +69,7 @@ namespace Tapeti
/// <param name="message">An instance of a message class</param>
/// <param name="properties">Metadata to include in the message</param>
/// <param name="mandatory">If true, an exception will be raised if the message can not be delivered to at least one queue</param>
Task Publish(object message, IMessageProperties properties, bool mandatory);
Task Publish(object message, IMessageProperties? properties, bool mandatory);
/// <summary>
@ -80,6 +80,6 @@ namespace Tapeti
/// <param name="properties">Metadata to include in the message</param>
/// <param name="mandatory">If true, an exception will be raised if the message can not be delivered to the queue</param>
/// <returns></returns>
Task PublishDirect(object message, string queueName, IMessageProperties properties, bool mandatory);
Task PublishDirect(object message, string queueName, IMessageProperties? properties, bool mandatory);
}
}

View File

@ -12,6 +12,7 @@
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<PackageIcon>Tapeti.png</PackageIcon>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">

View File

@ -17,7 +17,7 @@ namespace Tapeti
/// </summary>
public class TapetiConfig : ITapetiConfigBuilderAccess
{
private Config config;
private Config? config;
private readonly List<IControllerBindingMiddleware> bindingMiddleware = new();
@ -92,29 +92,25 @@ namespace Tapeti
var configInstance = GetConfig();
var middlewareBundle = extension.GetMiddleware(DependencyResolver);
if (middlewareBundle != null)
foreach (var middleware in extension.GetMiddleware(DependencyResolver))
{
foreach (var middleware in middlewareBundle)
switch (middleware)
{
switch (middleware)
{
case IControllerBindingMiddleware bindingExtension:
Use(bindingExtension);
break;
case IControllerBindingMiddleware bindingExtension:
Use(bindingExtension);
break;
case IMessageMiddleware messageExtension:
configInstance.Use(messageExtension);
break;
case IMessageMiddleware messageExtension:
configInstance.Use(messageExtension);
break;
case IPublishMiddleware publishExtension:
configInstance.Use(publishExtension);
break;
case IPublishMiddleware publishExtension:
configInstance.Use(publishExtension);
break;
default:
throw new ArgumentException(
$"Unsupported middleware implementation: {middleware?.GetType().Name ?? "null"}");
}
default:
throw new ArgumentException(
$"Unsupported middleware implementation: {middleware.GetType().Name}");
}
}
@ -123,7 +119,7 @@ namespace Tapeti
return this;
foreach (var binding in bindingBundle)
config.RegisterBinding(binding);
GetConfig().RegisterBinding(binding);
return this;
}
@ -313,17 +309,23 @@ namespace Tapeti
internal class ConfigBindings : List<IBinding>, ITapetiConfigBindings
{
private Dictionary<MethodInfo, IControllerMethodBinding> methodLookup;
private Dictionary<MethodInfo, IControllerMethodBinding>? methodLookup;
public IControllerMethodBinding ForMethod(Delegate method)
public IControllerMethodBinding? ForMethod(Delegate method)
{
if (methodLookup == null)
throw new InvalidOperationException("Lock must be called first");
return methodLookup.TryGetValue(method.Method, out var binding) ? binding : null;
}
public IControllerMethodBinding ForMethod(MethodInfo method)
public IControllerMethodBinding? ForMethod(MethodInfo method)
{
if (methodLookup == null)
throw new InvalidOperationException("Lock must be called first");
return methodLookup.TryGetValue(method, out var binding) ? binding : null;
}

View File

@ -1,6 +1,7 @@
using System;
using System.Linq;
using System.Reflection;
using System.Text;
using Tapeti.Annotations;
using Tapeti.Config;
using Tapeti.Connection;
@ -49,12 +50,7 @@ namespace Tapeti
{
var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute<ObsoleteAttribute>() != null;
var context = new ControllerBindingContext(method.GetParameters(), method.ReturnParameter)
{
Controller = controller,
Method = method
};
var context = new ControllerBindingContext(controller, method, method.GetParameters(), method.ReturnParameter);
if (method.GetCustomAttribute<ResponseHandlerAttribute>() != null)
context.SetBindingTargetMode(BindingTargetMode.Direct);
@ -124,11 +120,15 @@ namespace Tapeti
/// <param name="builder"></param>
public static ITapetiConfigBuilder RegisterAllControllers(this ITapetiConfigBuilder builder)
{
return RegisterAllControllers(builder, Assembly.GetEntryAssembly());
var assembly = Assembly.GetEntryAssembly();
if (assembly == null)
throw new InvalidOperationException("No EntryAssembly");
return RegisterAllControllers(builder, assembly);
}
private static ControllerMethodBinding.QueueInfo GetQueueInfo(MemberInfo member, ControllerMethodBinding.QueueInfo fallbackQueueInfo)
private static ControllerMethodBinding.QueueInfo? GetQueueInfo(MemberInfo member, ControllerMethodBinding.QueueInfo? fallbackQueueInfo)
{
var dynamicQueueAttribute = member.GetCustomAttribute<DynamicQueueAttribute>();
var durableQueueAttribute = member.GetCustomAttribute<DurableQueueAttribute>();
@ -157,26 +157,33 @@ namespace Tapeti
}
else
{
queueType = fallbackQueueInfo.QueueType;
queueType = fallbackQueueInfo!.QueueType;
name = fallbackQueueInfo.Name;
}
return new ControllerMethodBinding.QueueInfo
return new ControllerMethodBinding.QueueInfo(queueType, name)
{
QueueType = queueType,
Name = name,
QueueArguments = GetQueueArguments(queueArgumentsAttribute) ?? fallbackQueueInfo?.QueueArguments
};
}
private static IRabbitMQArguments GetQueueArguments(QueueArgumentsAttribute queueArgumentsAttribute)
private static IRabbitMQArguments? GetQueueArguments(QueueArgumentsAttribute? queueArgumentsAttribute)
{
if (queueArgumentsAttribute == null)
return null;
var arguments = new RabbitMQArguments(queueArgumentsAttribute.CustomArguments);
var arguments = new RabbitMQArguments(queueArgumentsAttribute.CustomArguments.ToDictionary(
p => p.Key,
p => p.Value switch
{
string stringValue => Encoding.UTF8.GetBytes(stringValue),
_ => p.Value
}
))
{
};
if (queueArgumentsAttribute.MaxLength > 0)
arguments.Add(@"x-max-length", queueArgumentsAttribute.MaxLength);

View File

@ -24,10 +24,10 @@ namespace Tapeti
/// This property must be set before first subscribing or publishing, otherwise it
/// will use the default connection parameters.
/// </remarks>
public TapetiConnectionParams Params { get; set; }
public TapetiConnectionParams? Params { get; set; }
private readonly Lazy<ITapetiClient> client;
private TapetiSubscriber subscriber;
private TapetiSubscriber? subscriber;
private bool disposed;
@ -48,13 +48,13 @@ namespace Tapeti
}
/// <inheritdoc />
public event ConnectedEventHandler Connected;
public event ConnectedEventHandler? Connected;
/// <inheritdoc />
public event DisconnectedEventHandler Disconnected;
public event DisconnectedEventHandler? Disconnected;
/// <inheritdoc />
public event ConnectedEventHandler Reconnected;
public event ConnectedEventHandler? Reconnected;
/// <inheritdoc />

View File

@ -10,7 +10,7 @@ namespace Tapeti
/// </summary>
public class TapetiConnectionParams
{
private IDictionary<string, string> clientProperties;
private IDictionary<string, string>? clientProperties;
/// <summary>
@ -59,7 +59,7 @@ namespace Tapeti
/// If any of the default keys used by the RabbitMQ Client library (product, version) are specified their value
/// will be overwritten. See DefaultClientProperties in Connection.cs in the RabbitMQ .NET client source for the default values.
/// </remarks>
public IDictionary<string, string> ClientProperties {
public IDictionary<string, string>? ClientProperties {
get => clientProperties ??= new Dictionary<string, string>();
set => clientProperties = value;
}

View File

@ -120,7 +120,7 @@ namespace Tapeti.Tasks
{
while (true)
{
Task task;
Task? task;
lock (scheduledTasks)
{
task = WaitAndDequeueTask();
@ -133,7 +133,7 @@ namespace Tapeti.Tasks
}
}
private Task WaitAndDequeueTask()
private Task? WaitAndDequeueTask()
{
while (!scheduledTasks.Any() && !disposed)
Monitor.Wait(scheduledTasks);