1
0
mirror of synced 2024-11-22 01:13:49 +00:00

Added ValueTask support

- This is a breaking change for custom middleware implementations
Added validation for return type handling
- This may be breaking for incorrect implementations, but highly unlikely
This commit is contained in:
Mark van Renswoude 2022-02-09 11:26:56 +01:00
parent b816e56018
commit 165680fd38
31 changed files with 215 additions and 142 deletions

View File

@ -9,8 +9,7 @@ namespace _03_FlowRequestResponse
public class ReceivingMessageController public class ReceivingMessageController
{ {
// No publisher required, responses can simply be returned // No publisher required, responses can simply be returned
#pragma warning disable CA1822 // Mark members as static - not supported yet by Tapeti public static async Task<QuoteResponseMessage> HandleQuoteRequest(QuoteRequestMessage message)
public async Task<QuoteResponseMessage> HandleQuoteRequest(QuoteRequestMessage message)
{ {
var quote = message.Amount switch var quote = message.Amount switch
{ {
@ -29,6 +28,5 @@ namespace _03_FlowRequestResponse
Quote = quote Quote = quote
}; };
} }
#pragma warning restore CA1822
} }
} }

View File

@ -15,9 +15,11 @@ namespace _05_SpeedTest
} }
#pragma warning disable IDE0060 // Remove unused parameter
public void HandleSpeedTestMessage(SpeedTestMessage message) public void HandleSpeedTestMessage(SpeedTestMessage message)
{ {
messageCounter.Add(); messageCounter.Add();
} }
#pragma warning restore IDE0060
} }
} }

View File

@ -8,8 +8,7 @@ namespace _06_StatelessRequestResponse
public class ReceivingMessageController public class ReceivingMessageController
{ {
// No publisher required, responses can simply be returned // No publisher required, responses can simply be returned
#pragma warning disable CA1822 // Mark members as static - not supported yet by Tapeti public static QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message)
public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message)
{ {
var quote = message.Amount switch var quote = message.Amount switch
{ {
@ -25,6 +24,5 @@ namespace _06_StatelessRequestResponse
Quote = quote Quote = quote
}; };
} }
#pragma warning restore CA1822
} }
} }

View File

@ -12,12 +12,12 @@ namespace Tapeti.DataAnnotations
internal class DataAnnotationsMessageMiddleware : IMessageMiddleware internal class DataAnnotationsMessageMiddleware : IMessageMiddleware
{ {
/// <inheritdoc /> /// <inheritdoc />
public async Task Handle(IMessageContext context, Func<Task> next) public ValueTask Handle(IMessageContext context, Func<ValueTask> next)
{ {
var validationContext = new ValidationContext(context.Message); var validationContext = new ValidationContext(context.Message);
Validator.ValidateObject(context.Message, validationContext, true); Validator.ValidateObject(context.Message, validationContext, true);
await next(); return next();
} }
} }
} }

View File

@ -12,12 +12,12 @@ namespace Tapeti.DataAnnotations
internal class DataAnnotationsPublishMiddleware : IPublishMiddleware internal class DataAnnotationsPublishMiddleware : IPublishMiddleware
{ {
/// <inheritdoc /> /// <inheritdoc />
public async Task Handle(IPublishContext context, Func<Task> next) public ValueTask Handle(IPublishContext context, Func<ValueTask> next)
{ {
var validationContext = new ValidationContext(context.Message); var validationContext = new ValidationContext(context.Message);
Validator.ValidateObject(context.Message, validationContext, true); Validator.ValidateObject(context.Message, validationContext, true);
await next(); return next();
} }
} }
} }

View File

@ -5,6 +5,10 @@ using System.Data.SqlClient;
using System.Threading.Tasks; using System.Threading.Tasks;
using Newtonsoft.Json; using Newtonsoft.Json;
// Neither of these are available in language version 7 required for .NET Standard 2.0
// ReSharper disable ConvertToUsingDeclaration
// ReSharper disable UseAwaitUsing
namespace Tapeti.Flow.SQL namespace Tapeti.Flow.SQL
{ {
/// <inheritdoc /> /// <inheritdoc />
@ -37,7 +41,7 @@ namespace Tapeti.Flow.SQL
/// <inheritdoc /> /// <inheritdoc />
public async Task<IEnumerable<FlowRecord<T>>> GetStates<T>() public async ValueTask<IEnumerable<FlowRecord<T>>> GetStates<T>()
{ {
return await SqlRetryHelper.Execute(async () => return await SqlRetryHelper.Execute(async () =>
{ {
@ -64,7 +68,7 @@ namespace Tapeti.Flow.SQL
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task CreateState<T>(Guid flowID, T state, DateTime timestamp) public async ValueTask CreateState<T>(Guid flowID, T state, DateTime timestamp)
{ {
await SqlRetryHelper.Execute(async () => await SqlRetryHelper.Execute(async () =>
{ {
@ -88,7 +92,7 @@ namespace Tapeti.Flow.SQL
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task UpdateState<T>(Guid flowID, T state) public async ValueTask UpdateState<T>(Guid flowID, T state)
{ {
await SqlRetryHelper.Execute(async () => await SqlRetryHelper.Execute(async () =>
{ {
@ -108,7 +112,7 @@ namespace Tapeti.Flow.SQL
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task DeleteState(Guid flowID) public async ValueTask DeleteState(Guid flowID)
{ {
await SqlRetryHelper.Execute(async () => await SqlRetryHelper.Execute(async () =>
{ {

View File

@ -17,6 +17,11 @@
<NoWarn>1701;1702</NoWarn> <NoWarn>1701;1702</NoWarn>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)'!='netstandard2.0'">
<!-- Suppress 'using statement can be simplified' which requires language version 8 not available in .NET Standard 2.0 -->
<NoWarn>IDE0063</NoWarn>
</PropertyGroup>
<ItemGroup> <ItemGroup>
<None Remove="scripts\Flow table.sql" /> <None Remove="scripts\Flow table.sql" />
</ItemGroup> </ItemGroup>

View File

@ -31,6 +31,9 @@ namespace Tapeti.Flow.Default
if (continuationAttribute == null) if (continuationAttribute == null)
return; return;
if (context.Method.IsStatic)
throw new ArgumentException($"Continuation attribute is not valid on static methods in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}");
context.SetBindingTargetMode(BindingTargetMode.Direct); context.SetBindingTargetMode(BindingTargetMode.Direct);
context.Use(new FlowContinuationMiddleware()); context.Use(new FlowContinuationMiddleware());
@ -52,7 +55,7 @@ namespace Tapeti.Flow.Default
context.Result.SetHandler((messageContext, value) => HandleParallelResponse(messageContext)); context.Result.SetHandler((messageContext, value) => HandleParallelResponse(messageContext));
} }
else else
throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}"); throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}");
foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(IFlowParallelRequest))) foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(IFlowParallelRequest)))
@ -62,34 +65,53 @@ namespace Tapeti.Flow.Default
private static void RegisterYieldPointResult(IControllerBindingContext context) private static void RegisterYieldPointResult(IControllerBindingContext context)
{ {
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out var isTaskOf)) if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out var taskType))
return; return;
if (isTaskOf) if (context.Method.IsStatic)
throw new ArgumentException($"Yield points are not valid on static methods in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}");
switch (taskType)
{ {
case TaskType.None:
context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value));
break;
case TaskType.Task:
context.Result.SetHandler(async (messageContext, value) => context.Result.SetHandler(async (messageContext, value) =>
{ {
var yieldPoint = await (Task<IYieldPoint>)value; var yieldPoint = await (Task<IYieldPoint>)value;
if (yieldPoint != null) if (yieldPoint != null)
await HandleYieldPoint(messageContext, yieldPoint); await HandleYieldPoint(messageContext, yieldPoint);
}); });
break;
case TaskType.ValueTask:
context.Result.SetHandler(async (messageContext, value) =>
{
var yieldPoint = await (ValueTask<IYieldPoint>)value;
if (yieldPoint != null)
await HandleYieldPoint(messageContext, yieldPoint);
});
break;
default:
throw new ArgumentOutOfRangeException();
} }
else
context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value));
} }
private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint) private static ValueTask HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint)
{ {
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(new FlowHandlerContext(context), yieldPoint); return flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);
} }
private static Task HandleParallelResponse(IMessageContext context) private static ValueTask HandleParallelResponse(IMessageContext context)
{ {
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload) && flowPayload.FlowIsConverging) if (context.TryGet<FlowMessageContextPayload>(out var flowPayload) && flowPayload.FlowIsConverging)
return Task.CompletedTask; return default;
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext => return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext =>

View File

@ -17,7 +17,7 @@ namespace Tapeti.Flow.Default
private int deleteCalled; private int deleteCalled;
public async Task Store(bool persistent) public ValueTask Store(bool persistent)
{ {
storeCalled++; storeCalled++;
@ -26,15 +26,13 @@ namespace Tapeti.Flow.Default
if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock)); if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock));
FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(HandlerContext.Controller); FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(HandlerContext.Controller);
await FlowStateLock.StoreFlowState(FlowState, persistent); return FlowStateLock.StoreFlowState(FlowState, persistent);
} }
public async Task Delete() public ValueTask Delete()
{ {
deleteCalled++; deleteCalled++;
return FlowStateLock?.DeleteFlowState() ?? default;
if (FlowStateLock != null)
await FlowStateLock.DeleteFlowState();
} }
public bool IsStoredOrDeleted() public bool IsStoredOrDeleted()

View File

@ -11,7 +11,7 @@ namespace Tapeti.Flow.Default
/// </summary> /// </summary>
internal class FlowContinuationMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware internal class FlowContinuationMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware
{ {
public async Task Filter(IMessageContext context, Func<Task> next) public async ValueTask Filter(IMessageContext context, Func<ValueTask> next)
{ {
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload)) if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return; return;
@ -27,7 +27,7 @@ namespace Tapeti.Flow.Default
} }
public async Task Handle(IMessageContext context, Func<Task> next) public async ValueTask Handle(IMessageContext context, Func<ValueTask> next)
{ {
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload)) if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return; return;
@ -53,7 +53,7 @@ namespace Tapeti.Flow.Default
} }
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next) public async ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<ValueTask> next)
{ {
await next(); await next();
@ -82,7 +82,7 @@ namespace Tapeti.Flow.Default
private static async Task<FlowContext> EnrichWithFlowContext(IMessageContext context) private static async ValueTask<FlowContext> EnrichWithFlowContext(IMessageContext context)
{ {
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload)) if (context.TryGet<FlowMessageContextPayload>(out var flowPayload))
return flowPayload.FlowContext; return flowPayload.FlowContext;

View File

@ -197,7 +197,7 @@ namespace Tapeti.Flow.Default
/// <inheritdoc /> /// <inheritdoc />
public async Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint) public async ValueTask Execute(IFlowHandlerContext context, IYieldPoint yieldPoint)
{ {
if (!(yieldPoint is DelegateYieldPoint executableYieldPoint)) if (!(yieldPoint is DelegateYieldPoint executableYieldPoint))
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Method.Name}"); throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Method.Name}");
@ -254,7 +254,7 @@ namespace Tapeti.Flow.Default
/// <inheritdoc /> /// <inheritdoc />
public Task Converge(IFlowHandlerContext context) public ValueTask Converge(IFlowHandlerContext context)
{ {
return Execute(context, new DelegateYieldPoint(flowContext => return Execute(context, new DelegateYieldPoint(flowContext =>
Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync))); Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync)));

View File

@ -51,7 +51,7 @@ namespace Tapeti.Flow.Default
/// <inheritdoc /> /// <inheritdoc />
public async Task Load() public async ValueTask Load()
{ {
if (inUse) if (inUse)
throw new InvalidOperationException("Can only load the saved state once."); throw new InvalidOperationException("Can only load the saved state once.");
@ -114,17 +114,17 @@ namespace Tapeti.Flow.Default
/// <inheritdoc /> /// <inheritdoc />
public Task<Guid?> FindFlowID(Guid continuationID) public ValueTask<Guid?> FindFlowID(Guid continuationID)
{ {
if (!loaded) if (!loaded)
throw new InvalidOperationException("Flow store is not yet loaded."); throw new InvalidOperationException("Flow store is not yet loaded.");
return Task.FromResult(continuationLookup.TryGetValue(continuationID, out var result) ? result : (Guid?)null); return new ValueTask<Guid?>(continuationLookup.TryGetValue(continuationID, out var result) ? result : (Guid?)null);
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task<IFlowStateLock> LockFlowState(Guid flowID) public async ValueTask<IFlowStateLock> LockFlowState(Guid flowID)
{ {
if (!loaded) if (!loaded)
throw new InvalidOperationException("Flow store should be loaded before storing flows."); throw new InvalidOperationException("Flow store should be loaded before storing flows.");
@ -137,14 +137,14 @@ namespace Tapeti.Flow.Default
/// <inheritdoc /> /// <inheritdoc />
public Task<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge) public ValueTask<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge)
{ {
var maximumDateTime = DateTime.UtcNow - minimumAge; var maximumDateTime = DateTime.UtcNow - minimumAge;
return Task.FromResult(flowStates return new ValueTask<IEnumerable<ActiveFlow>>(flowStates
.Where(p => p.Value.CreationTime <= maximumDateTime) .Where(p => p.Value.CreationTime <= maximumDateTime)
.Select(p => new ActiveFlow(p.Key, p.Value.CreationTime)) .Select(p => new ActiveFlow(p.Key, p.Value.CreationTime))
.ToArray() as IEnumerable<ActiveFlow>); .ToArray());
} }
@ -173,15 +173,15 @@ namespace Tapeti.Flow.Default
l?.Dispose(); l?.Dispose();
} }
public Task<FlowState> GetFlowState() public ValueTask<FlowState> GetFlowState()
{ {
if (flowLock == null) if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock"); throw new ObjectDisposedException("FlowStateLock");
return Task.FromResult(cachedFlowState?.FlowState?.Clone()); return new ValueTask<FlowState>(cachedFlowState?.FlowState?.Clone());
} }
public async Task StoreFlowState(FlowState newFlowState, bool persistent) public async ValueTask StoreFlowState(FlowState newFlowState, bool persistent)
{ {
if (flowLock == null) if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock"); throw new ObjectDisposedException("FlowStateLock");
@ -227,7 +227,7 @@ namespace Tapeti.Flow.Default
} }
} }
public async Task DeleteFlowState() public async ValueTask DeleteFlowState()
{ {
if (flowLock == null) if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock"); throw new ObjectDisposedException("FlowStateLock");

View File

@ -11,27 +11,27 @@ namespace Tapeti.Flow.Default
/// </summary> /// </summary>
public class NonPersistentFlowRepository : IFlowRepository public class NonPersistentFlowRepository : IFlowRepository
{ {
Task<IEnumerable<FlowRecord<T>>> IFlowRepository.GetStates<T>() ValueTask<IEnumerable<FlowRecord<T>>> IFlowRepository.GetStates<T>()
{ {
return Task.FromResult(Enumerable.Empty<FlowRecord<T>>()); return new ValueTask<IEnumerable<FlowRecord<T>>>(Enumerable.Empty<FlowRecord<T>>());
} }
/// <inheritdoc /> /// <inheritdoc />
public Task CreateState<T>(Guid flowID, T state, DateTime timestamp) public ValueTask CreateState<T>(Guid flowID, T state, DateTime timestamp)
{ {
return Task.CompletedTask; return default;
} }
/// <inheritdoc /> /// <inheritdoc />
public Task UpdateState<T>(Guid flowID, T state) public ValueTask UpdateState<T>(Guid flowID, T state)
{ {
return Task.CompletedTask; return default;
} }
/// <inheritdoc /> /// <inheritdoc />
public Task DeleteState(Guid flowID) public ValueTask DeleteState(Guid flowID)
{ {
return Task.CompletedTask; return default;
} }
} }
} }

View File

@ -108,7 +108,7 @@ namespace Tapeti.Flow
/// </summary> /// </summary>
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="yieldPoint"></param> /// <param name="yieldPoint"></param>
Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint); ValueTask Execute(IFlowHandlerContext context, IYieldPoint yieldPoint);
/// <summary> /// <summary>
@ -120,7 +120,7 @@ namespace Tapeti.Flow
/// <summary> /// <summary>
/// Calls the converge method for a parallel flow. /// Calls the converge method for a parallel flow.
/// </summary> /// </summary>
Task Converge(IFlowHandlerContext context); ValueTask Converge(IFlowHandlerContext context);
} }

View File

@ -13,7 +13,7 @@ namespace Tapeti.Flow
/// Load the previously persisted flow states. /// Load the previously persisted flow states.
/// </summary> /// </summary>
/// <returns>A list of flow states, where the key is the unique Flow ID and the value is the deserialized T.</returns> /// <returns>A list of flow states, where the key is the unique Flow ID and the value is the deserialized T.</returns>
Task<IEnumerable<FlowRecord<T>>> GetStates<T>(); ValueTask<IEnumerable<FlowRecord<T>>> GetStates<T>();
/// <summary> /// <summary>
/// Stores a new flow state. Guaranteed to be run in a lock for the specified flow ID. /// Stores a new flow state. Guaranteed to be run in a lock for the specified flow ID.
@ -22,20 +22,20 @@ namespace Tapeti.Flow
/// <param name="state">The flow state to be stored.</param> /// <param name="state">The flow state to be stored.</param>
/// <param name="timestamp">The time when the flow was initially created.</param> /// <param name="timestamp">The time when the flow was initially created.</param>
/// <returns></returns> /// <returns></returns>
Task CreateState<T>(Guid flowID, T state, DateTime timestamp); ValueTask CreateState<T>(Guid flowID, T state, DateTime timestamp);
/// <summary> /// <summary>
/// Updates an existing flow state. Guaranteed to be run in a lock for the specified flow ID. /// Updates an existing flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary> /// </summary>
/// <param name="flowID">The unique ID of the flow.</param> /// <param name="flowID">The unique ID of the flow.</param>
/// <param name="state">The flow state to be stored.</param> /// <param name="state">The flow state to be stored.</param>
Task UpdateState<T>(Guid flowID, T state); ValueTask UpdateState<T>(Guid flowID, T state);
/// <summary> /// <summary>
/// Delete a flow state. Guaranteed to be run in a lock for the specified flow ID. /// Delete a flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary> /// </summary>
/// <param name="flowID">The unique ID of the flow.</param> /// <param name="flowID">The unique ID of the flow.</param>
Task DeleteState(Guid flowID); ValueTask DeleteState(Guid flowID);
} }

View File

@ -17,19 +17,19 @@ namespace Tapeti.Flow
/// If using an IFlowRepository that requires an update (such as creating tables) make /// If using an IFlowRepository that requires an update (such as creating tables) make
/// sure it is called before calling Load. /// sure it is called before calling Load.
/// </summary> /// </summary>
Task Load(); ValueTask Load();
/// <summary> /// <summary>
/// Looks up the FlowID corresponding to a ContinuationID. For internal use. /// Looks up the FlowID corresponding to a ContinuationID. For internal use.
/// </summary> /// </summary>
/// <param name="continuationID"></param> /// <param name="continuationID"></param>
Task<Guid?> FindFlowID(Guid continuationID); ValueTask<Guid?> FindFlowID(Guid continuationID);
/// <summary> /// <summary>
/// Acquires a lock on the flow with the specified FlowID. /// Acquires a lock on the flow with the specified FlowID.
/// </summary> /// </summary>
/// <param name="flowID"></param> /// <param name="flowID"></param>
Task<IFlowStateLock> LockFlowState(Guid flowID); ValueTask<IFlowStateLock> LockFlowState(Guid flowID);
/// <summary> /// <summary>
/// Returns information about the currently active flows. /// Returns information about the currently active flows.
@ -38,7 +38,7 @@ namespace Tapeti.Flow
/// This is intended for monitoring purposes and should be treated as a snapshot. /// This is intended for monitoring purposes and should be treated as a snapshot.
/// </remarks> /// </remarks>
/// <param name="minimumAge">The minimum age of the flow before it is included in the result. Set to TimeSpan.Zero to return all active flows.</param> /// <param name="minimumAge">The minimum age of the flow before it is included in the result. Set to TimeSpan.Zero to return all active flows.</param>
Task<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge); ValueTask<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge);
} }
@ -56,19 +56,19 @@ namespace Tapeti.Flow
/// <summary> /// <summary>
/// Acquires a copy of the flow state. /// Acquires a copy of the flow state.
/// </summary> /// </summary>
Task<FlowState> GetFlowState(); ValueTask<FlowState> GetFlowState();
/// <summary> /// <summary>
/// Stores the new flow state. /// Stores the new flow state.
/// </summary> /// </summary>
/// <param name="flowState"></param> /// <param name="flowState"></param>
/// <param name="persistent"></param> /// <param name="persistent"></param>
Task StoreFlowState(FlowState flowState, bool persistent); ValueTask StoreFlowState(FlowState flowState, bool persistent);
/// <summary> /// <summary>
/// Disposes of the flow state corresponding to this Flow ID. /// Disposes of the flow state corresponding to this Flow ID.
/// </summary> /// </summary>
Task DeleteFlowState(); ValueTask DeleteFlowState();
} }

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks> <TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
@ -18,7 +18,7 @@
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)'!='netstandard2.0'"> <PropertyGroup Condition="'$(TargetFramework)'!='netstandard2.0'">
<!-- Supress 'Use switch expression' which requires language version 8 not available in .NET Standard 2.0 --> <!-- Suppress 'Use switch expression' which requires language version 8 not available in .NET Standard 2.0 -->
<NoWarn>IDE0066</NoWarn> <NoWarn>IDE0066</NoWarn>
</PropertyGroup> </PropertyGroup>

View File

@ -29,7 +29,7 @@ namespace Tapeti.Serilog.Middleware
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task Handle(IMessageContext context, Func<Task> next) public async ValueTask Handle(IMessageContext context, Func<ValueTask> next)
{ {
var logger = context.Config.DependencyResolver.Resolve<global::Serilog.ILogger>(); var logger = context.Config.DependencyResolver.Resolve<global::Serilog.ILogger>();
@ -41,6 +41,7 @@ namespace Tapeti.Serilog.Middleware
await next(); await next();
stopwatch.Stop(); stopwatch.Stop();

View File

@ -19,7 +19,7 @@ namespace Tapeti.Config
/// </summary> /// </summary>
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="value"></param> /// <param name="value"></param>
public delegate Task ResultHandler(IMessageContext context, object value); public delegate ValueTask ResultHandler(IMessageContext context, object value);
/// <summary> /// <summary>

View File

@ -14,6 +14,6 @@ namespace Tapeti.Config
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="consumeResult"></param> /// <param name="consumeResult"></param>
/// <param name="next">Always call to allow the next in the chain to clean up</param> /// <param name="next">Always call to allow the next in the chain to clean up</param>
Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next); ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<ValueTask> next);
} }
} }

View File

@ -15,6 +15,6 @@ namespace Tapeti.Config
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="next"></param> /// <param name="next"></param>
/// <returns></returns> /// <returns></returns>
Task Filter(IMessageContext context, Func<Task> next); ValueTask Filter(IMessageContext context, Func<ValueTask> next);
} }
} }

View File

@ -14,6 +14,6 @@ namespace Tapeti.Config
/// </summary> /// </summary>
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="next">Call to pass the message to the next handler in the chain or call the controller method</param> /// <param name="next">Call to pass the message to the next handler in the chain or call the controller method</param>
Task Handle(IMessageContext context, Func<Task> next); ValueTask Handle(IMessageContext context, Func<ValueTask> next);
} }
} }

View File

@ -13,6 +13,6 @@ namespace Tapeti.Config
/// </summary> /// </summary>
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="next">Call to pass the message to the next handler in the chain</param> /// <param name="next">Call to pass the message to the next handler in the chain</param>
Task Handle(IMessageContext context, Func<Task> next); ValueTask Handle(IMessageContext context, Func<ValueTask> next);
} }
} }

View File

@ -13,6 +13,6 @@ namespace Tapeti.Config
/// </summary> /// </summary>
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="next">Call to pass the message to the next handler in the chain</param> /// <param name="next">Call to pass the message to the next handler in the chain</param>
Task Handle(IPublishContext context, Func<Task> next); ValueTask Handle(IPublishContext context, Func<ValueTask> next);
} }
} }

View File

@ -80,14 +80,13 @@ namespace Tapeti.Connection
cancellationToken = initializeCancellationTokenSource.Token; cancellationToken = initializeCancellationTokenSource.Token;
// ReSharper disable once MethodSupportsCancellation
Task.Run(async () => Task.Run(async () =>
{ {
await ApplyBindings(cancellationToken); await ApplyBindings(cancellationToken);
if (consuming && !cancellationToken.IsCancellationRequested) if (consuming && !cancellationToken.IsCancellationRequested)
await ConsumeQueues(cancellationToken); await ConsumeQueues(cancellationToken);
}); }, CancellationToken.None);
} }

View File

@ -159,7 +159,7 @@ namespace Tapeti.Default
/// <inheritdoc /> /// <inheritdoc />
public async Task Invoke(IMessageContext context) public async Task Invoke(IMessageContext context)
{ {
var controller = dependencyResolver.Resolve(bindingInfo.ControllerType); var controller = Method.IsStatic ? null : dependencyResolver.Resolve(bindingInfo.ControllerType);
context.Store(new ControllerMessageContextPayload(controller, context.Binding as IControllerMethodBinding)); context.Store(new ControllerMessageContextPayload(controller, context.Binding as IControllerMethodBinding));
if (!await FilterAllowed(context)) if (!await FilterAllowed(context))
@ -179,7 +179,7 @@ namespace Tapeti.Default
await MiddlewareHelper.GoAsync( await MiddlewareHelper.GoAsync(
bindingInfo.CleanupMiddleware, bindingInfo.CleanupMiddleware,
async (handler, next) => await handler.Cleanup(context, consumeResult, next), async (handler, next) => await handler.Cleanup(context, consumeResult, next),
() => Task.CompletedTask); () => default);
} }
@ -192,14 +192,14 @@ namespace Tapeti.Default
() => () =>
{ {
allowed = true; allowed = true;
return Task.CompletedTask; return default;
}); });
return allowed; return allowed;
} }
private delegate Task MessageHandlerFunc(IMessageContext context); private delegate ValueTask MessageHandlerFunc(IMessageContext context);
private MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameterFactories, ResultHandler resultHandler) private MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameterFactories, ResultHandler resultHandler)
@ -213,10 +213,11 @@ namespace Tapeti.Default
if (method.ReturnType == typeof(Task)) if (method.ReturnType == typeof(Task))
return WrapTaskMethod(method, parameterFactories); return WrapTaskMethod(method, parameterFactories);
if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>)) if (method.ReturnType == typeof(ValueTask))
return WrapGenericTaskMethod(method, parameterFactories); return WrapValueTaskMethod(method, parameterFactories);
return WrapObjectMethod(method, parameterFactories); // Breaking change in Tapeti 2.9: PublishResultBinding or other middleware should have taken care of the return value. If not, don't silently discard it.
throw new ArgumentException($"Method {method.Name} on controller {method.DeclaringType?.FullName} returns type {method.ReturnType.FullName}, which can not be handled by Tapeti or any registered middleware");
} }
@ -246,7 +247,7 @@ namespace Tapeti.Default
try try
{ {
method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()); method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray());
return Task.CompletedTask; return default;
} }
catch (Exception e) catch (Exception e)
{ {
@ -264,7 +265,7 @@ namespace Tapeti.Default
var controllerPayload = context.Get<ControllerMessageContextPayload>(); var controllerPayload = context.Get<ControllerMessageContextPayload>();
try try
{ {
return (Task) method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()); return new ValueTask((Task) method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()));
} }
catch (Exception e) catch (Exception e)
{ {
@ -275,32 +276,14 @@ namespace Tapeti.Default
} }
private MessageHandlerFunc WrapGenericTaskMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories) private MessageHandlerFunc WrapValueTaskMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
{ {
return context => return context =>
{ {
var controllerPayload = context.Get<ControllerMessageContextPayload>(); var controllerPayload = context.Get<ControllerMessageContextPayload>();
try try
{ {
return (Task<object>)method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()); return (ValueTask)method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray());
}
catch (Exception e)
{
AddExceptionData(e);
throw;
}
};
}
private MessageHandlerFunc WrapObjectMethod(MethodBase method, IEnumerable<ValueFactory> parameterFactories)
{
return context =>
{
var controllerPayload = context.Get<ControllerMessageContextPayload>();
try
{
return Task.FromResult(method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()));
} }
catch (Exception e) catch (Exception e)
{ {

View File

@ -1,4 +1,4 @@
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -23,7 +23,7 @@ namespace Tapeti.Default
return; return;
var hasClassResult = context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out var isTaskOf, out var actualType); var hasClassResult = context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out var taskType, out var actualType);
var request = context.MessageClass?.GetCustomAttribute<RequestAttribute>(); var request = context.MessageClass?.GetCustomAttribute<RequestAttribute>();
var expectedClassResult = request?.Response; var expectedClassResult = request?.Response;
@ -32,35 +32,55 @@ namespace Tapeti.Default
// Tapeti 1.2: if you just want to publish another message as a result of the incoming message, explicitly call IPublisher.Publish. // Tapeti 1.2: if you just want to publish another message as a result of the incoming message, explicitly call IPublisher.Publish.
// ReSharper disable once ConvertIfStatementToSwitchStatement // ReSharper disable once ConvertIfStatementToSwitchStatement
if (!hasClassResult && expectedClassResult != null || hasClassResult && expectedClassResult != actualType) if (!hasClassResult && expectedClassResult != null || hasClassResult && expectedClassResult != actualType)
throw new ArgumentException($"Message handler must return type {expectedClassResult?.FullName ?? "void"} in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}, found: {actualType?.FullName ?? "void"}"); throw new ArgumentException($"Message handler for non-request message type {context.MessageClass?.FullName} must return type {expectedClassResult?.FullName ?? "void"} in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}, found: {actualType?.FullName ?? "void"}");
if (!hasClassResult) if (!hasClassResult)
return; return;
if (isTaskOf) switch (taskType)
{ {
var handler = GetType().GetMethod("PublishGenericTaskResult", BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType); case TaskType.None:
context.Result.SetHandler((messageContext, value) => Reply(value, messageContext));
break;
case TaskType.Task:
var handler = GetType().GetMethod(nameof(PublishGenericTaskResult), BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType);
Debug.Assert(handler != null, nameof(handler) + " != null"); Debug.Assert(handler != null, nameof(handler) + " != null");
context.Result.SetHandler(async (messageContext, value) => { await (Task) handler.Invoke(null, new[] {messageContext, value }); }); context.Result.SetHandler((messageContext, value) => (ValueTask)handler.Invoke(null, new[] { messageContext, value }));
break;
case TaskType.ValueTask:
var valueTaskHandler = GetType().GetMethod(nameof(PublishGenericValueTaskResult), BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType);
Debug.Assert(valueTaskHandler != null, nameof(handler) + " != null");
context.Result.SetHandler((messageContext, value) => (ValueTask)valueTaskHandler.Invoke(null, new[] { messageContext, value }));
break;
default:
throw new ArgumentOutOfRangeException();
} }
else
context.Result.SetHandler((messageContext, value) => Reply(value, messageContext));
} }
// ReSharper disable once UnusedMember.Local - used implicitly above private static async ValueTask PublishGenericTaskResult<T>(IMessageContext messageContext, object value) where T : class
private static async Task PublishGenericTaskResult<T>(IMessageContext messageContext, object value) where T : class
{ {
var message = await (Task<T>)value; var message = await (Task<T>)value;
await Reply(message, messageContext); await Reply(message, messageContext);
} }
private static Task Reply(object message, IMessageContext messageContext) private static async ValueTask PublishGenericValueTaskResult<T>(IMessageContext messageContext, object value) where T : class
{
var message = await (ValueTask<T>)value;
await Reply(message, messageContext);
}
private static async ValueTask Reply(object message, IMessageContext messageContext)
{ {
if (message == null) if (message == null)
throw new ArgumentException("Return value of a request message handler must not be null"); throw new ArgumentException("Return value of a request message handler must not be null");
@ -71,9 +91,10 @@ namespace Tapeti.Default
CorrelationId = messageContext.Properties.CorrelationId CorrelationId = messageContext.Properties.CorrelationId
}; };
return !string.IsNullOrEmpty(messageContext.Properties.ReplyTo) if (!string.IsNullOrEmpty(messageContext.Properties.ReplyTo))
? publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, messageContext.Properties.Persistent.GetValueOrDefault(true)) await publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, messageContext.Properties.Persistent.GetValueOrDefault(true));
: publisher.Publish(message, properties, false); else
await publisher.Publish(message, properties, false);
} }
} }
} }

View File

@ -45,7 +45,7 @@ namespace Tapeti.Helpers
/// <param name="handle">Receives the middleware which should be called and a reference to the action which will call the next. Pass this on to the middleware.</param> /// <param name="handle">Receives the middleware which should be called and a reference to the action which will call the next. Pass this on to the middleware.</param>
/// <param name="lastHandler">The action to execute when the innermost middleware calls next.</param> /// <param name="lastHandler">The action to execute when the innermost middleware calls next.</param>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
public static async Task GoAsync<T>(IReadOnlyList<T> middleware, Func<T, Func<Task>, Task> handle, Func<Task> lastHandler) public static async ValueTask GoAsync<T>(IReadOnlyList<T> middleware, Func<T, Func<ValueTask>, ValueTask> handle, Func<ValueTask> lastHandler)
{ {
var handlerIndex = middleware?.Count - 1 ?? -1; var handlerIndex = middleware?.Count - 1 ?? -1;
if (middleware == null || handlerIndex == -1) if (middleware == null || handlerIndex == -1)
@ -54,7 +54,7 @@ namespace Tapeti.Helpers
return; return;
} }
async Task HandleNext() async ValueTask HandleNext()
{ {
handlerIndex--; handlerIndex--;
if (handlerIndex >= 0) if (handlerIndex >= 0)

View File

@ -3,30 +3,59 @@ using System.Threading.Tasks;
namespace Tapeti.Helpers namespace Tapeti.Helpers
{ {
/// <summary>
/// Determines if a type is a Task, ValueTask or other type.
/// </summary>
public enum TaskType
{
/// <summary>
/// Type is not a Task or ValueTask.
/// </summary>
None,
/// <summary>
/// Type is a Task or Task&lt;T&gt;
/// </summary>
Task,
/// <summary>
/// Type is a ValueTask or ValueTask&lt;T&gt;
/// </summary>
ValueTask
}
/// <summary> /// <summary>
/// Helper methods for working with synchronous and asynchronous versions of methods. /// Helper methods for working with synchronous and asynchronous versions of methods.
/// </summary> /// </summary>
public static class TaskTypeHelper public static class TaskTypeHelper
{ {
/// <summary> /// <summary>
/// Determines if the given type matches the predicate, taking Task types into account. /// Determines if the given type matches the predicate, taking Task and ValueTask types into account.
/// </summary> /// </summary>
/// <param name="type"></param> /// <param name="type"></param>
/// <param name="predicate"></param> /// <param name="predicate"></param>
/// <param name="isTaskOf"></param> /// <param name="taskType"></param>
/// <param name="actualType"></param> /// <param name="actualType"></param>
public static bool IsTypeOrTaskOf(this Type type, Func<Type, bool> predicate, out bool isTaskOf, out Type actualType) public static bool IsTypeOrTaskOf(this Type type, Func<Type, bool> predicate, out TaskType taskType, out Type actualType)
{ {
if (type == typeof(Task)) if (type == typeof(Task))
{ {
isTaskOf = false; taskType = TaskType.Task;
actualType = type;
return false;
}
if (type == typeof(ValueTask))
{
taskType = TaskType.ValueTask;
actualType = type; actualType = type;
return false; return false;
} }
if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Task<>)) if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Task<>))
{ {
isTaskOf = true; taskType = TaskType.Task;
var genericArguments = type.GetGenericArguments(); var genericArguments = type.GetGenericArguments();
if (genericArguments.Length == 1 && predicate(genericArguments[0])) if (genericArguments.Length == 1 && predicate(genericArguments[0]))
@ -36,7 +65,19 @@ namespace Tapeti.Helpers
} }
} }
isTaskOf = false; if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(ValueTask<>))
{
taskType = TaskType.ValueTask;
var genericArguments = type.GetGenericArguments();
if (genericArguments.Length == 1 && predicate(genericArguments[0]))
{
actualType = genericArguments[0];
return true;
}
}
taskType = TaskType.None;
actualType = type; actualType = type;
return predicate(type); return predicate(type);
} }
@ -47,10 +88,10 @@ namespace Tapeti.Helpers
/// </summary> /// </summary>
/// <param name="type"></param> /// <param name="type"></param>
/// <param name="predicate"></param> /// <param name="predicate"></param>
/// <param name="isTaskOf"></param> /// <param name="taskType"></param>
public static bool IsTypeOrTaskOf(this Type type, Func<Type, bool> predicate, out bool isTaskOf) public static bool IsTypeOrTaskOf(this Type type, Func<Type, bool> predicate, out TaskType taskType)
{ {
return IsTypeOrTaskOf(type, predicate, out isTaskOf, out _); return IsTypeOrTaskOf(type, predicate, out taskType, out _);
} }
@ -59,10 +100,10 @@ namespace Tapeti.Helpers
/// </summary> /// </summary>
/// <param name="type"></param> /// <param name="type"></param>
/// <param name="compareTo"></param> /// <param name="compareTo"></param>
/// <param name="isTaskOf"></param> /// <param name="taskType"></param>
public static bool IsTypeOrTaskOf(this Type type, Type compareTo, out bool isTaskOf) public static bool IsTypeOrTaskOf(this Type type, Type compareTo, out TaskType taskType)
{ {
return IsTypeOrTaskOf(type, t => t == compareTo, out isTaskOf); return IsTypeOrTaskOf(type, t => t == compareTo, out taskType);
} }
} }
} }

View File

@ -30,6 +30,7 @@
<ItemGroup Condition="'$(TargetFramework)'=='netstandard2.0'"> <ItemGroup Condition="'$(TargetFramework)'=='netstandard2.0'">
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" /> <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -43,7 +43,7 @@ namespace Tapeti
var controllerIsObsolete = controller.GetCustomAttribute<ObsoleteAttribute>() != null; var controllerIsObsolete = controller.GetCustomAttribute<ObsoleteAttribute>() != null;
foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance) foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance | BindingFlags.Static)
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false)
.Select(m => (MethodInfo)m)) .Select(m => (MethodInfo)m))
{ {