1
0
mirror of synced 2024-11-14 14:33:49 +00:00

Added ValueTask support to flows

This commit is contained in:
Mark van Renswoude 2022-02-09 12:19:05 +01:00
parent 165680fd38
commit 7a25a1fd1f
5 changed files with 84 additions and 2 deletions

View File

@ -56,7 +56,7 @@ namespace _03_FlowRequestResponse
[Continuation] [Continuation]
public async Task HandleSecondQuoteResponse(QuoteResponseMessage message, IFlowParallelRequest parallelRequest) public async ValueTask HandleSecondQuoteResponse(QuoteResponseMessage message, IFlowParallelRequest parallelRequest)
{ {
Console.WriteLine("[ParallelFlowController] Second quote response received"); Console.WriteLine("[ParallelFlowController] Second quote response received");
SecondQuote = message.Quote; SecondQuote = message.Quote;

View File

@ -50,6 +50,14 @@ namespace Tapeti.Flow.Default
await HandleParallelResponse(messageContext); await HandleParallelResponse(messageContext);
}); });
} }
if (context.Result.Info.ParameterType == typeof(ValueTask))
{
context.Result.SetHandler(async (messageContext, value) =>
{
await (ValueTask)value;
await HandleParallelResponse(messageContext);
});
}
else if (context.Result.Info.ParameterType == typeof(void)) else if (context.Result.Info.ParameterType == typeof(void))
{ {
context.Result.SetHandler((messageContext, value) => HandleParallelResponse(messageContext)); context.Result.SetHandler((messageContext, value) => HandleParallelResponse(messageContext));

View File

@ -38,6 +38,13 @@ namespace Tapeti.Flow.Default
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
} }
/// <inheritdoc />
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask<IYieldPoint>> responseHandler)
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
/// <inheritdoc /> /// <inheritdoc />
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler)
{ {
@ -312,18 +319,31 @@ namespace Tapeti.Flow.Default
return InternalAddRequest(message, responseHandler); return InternalAddRequest(message, responseHandler);
} }
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask> responseHandler)
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler) public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler)
{ {
return InternalAddRequest(message, responseHandler); return InternalAddRequest(message, responseHandler);
} }
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, ValueTask> responseHandler)
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler) public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler)
{ {
return InternalAddRequest(message, responseHandler); return InternalAddRequest(message, responseHandler);
} }
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse, IFlowParallelRequest> responseHandler)
{
return InternalAddRequest(message, responseHandler);
}
private IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler) private IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler)
{ {
@ -342,6 +362,11 @@ namespace Tapeti.Flow.Default
return BuildYieldPoint(continuation, false, noRequestsBehaviour); return BuildYieldPoint(continuation, false, noRequestsBehaviour);
} }
public IYieldPoint Yield(Func<ValueTask<IYieldPoint>> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception)
{
throw new NotImplementedException();
}
public IYieldPoint YieldSync(Func<IYieldPoint> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception) public IYieldPoint YieldSync(Func<IYieldPoint> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception)
{ {

View File

@ -23,6 +23,18 @@ namespace Tapeti.Flow
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler); IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler);
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask<IYieldPoint>> responseHandler);
/// <summary> /// <summary>
/// Publish a request message and continue the flow when the response arrives. /// Publish a request message and continue the flow when the response arrives.
/// The request message must be marked with the [Request] attribute, and the /// The request message must be marked with the [Request] attribute, and the
@ -164,6 +176,16 @@ namespace Tapeti.Flow
/// <param name="responseHandler"></param> /// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler); IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint.
/// It can instead store state in the controller for the continuation passed to the Yield method.
/// Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask> responseHandler);
/// <remarks> /// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which /// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called. /// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
@ -171,6 +193,13 @@ namespace Tapeti.Flow
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,Task})"/> /// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,Task})"/>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler); IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler);
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,ValueTask})"/>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, ValueTask> responseHandler);
/// <summary> /// <summary>
/// Publish a request message and continue the flow when the response arrives. /// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint. /// Note that the response handler can not influence the flow as it does not return a YieldPoint.
@ -181,6 +210,13 @@ namespace Tapeti.Flow
/// <param name="responseHandler"></param> /// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler); IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequestSync{TRequest,TResponse}(TRequest,Action{TResponse})"/>
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse, IFlowParallelRequest> responseHandler);
/// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are /// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are
/// async, so you should always await them. /// async, so you should always await them.
/// <summary> /// <summary>
@ -194,6 +230,19 @@ namespace Tapeti.Flow
/// <param name="noRequestsBehaviour">How the Yield method should behave when no requests have been added to the parallel request builder.</param> /// <param name="noRequestsBehaviour">How the Yield method should behave when no requests have been added to the parallel request builder.</param>
IYieldPoint Yield(Func<Task<IYieldPoint>> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception); IYieldPoint Yield(Func<Task<IYieldPoint>> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception);
/// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are
/// async, so you should always await them.
/// <summary>
/// Constructs an IYieldPoint to continue the flow when responses arrive.
/// The continuation method is called when all responses have arrived.
/// Response handlers and the continuation method are guaranteed thread-safe access to the
/// controller and can store state.
/// Used for asynchronous continuation methods.
/// </summary>
/// <param name="continuation">The converge continuation method to be called when all responses have been handled.</param>
/// <param name="noRequestsBehaviour">How the Yield method should behave when no requests have been added to the parallel request builder.</param>
IYieldPoint Yield(Func<ValueTask<IYieldPoint>> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception);
/// <summary> /// <summary>
/// Constructs an IYieldPoint to continue the flow when responses arrive. /// Constructs an IYieldPoint to continue the flow when responses arrive.
/// The continuation method is called when all responses have arrived. /// The continuation method is called when all responses have arrived.

View File

@ -10,7 +10,7 @@ before_build:
- ps: build\UpdateVersion.ps1 - ps: build\UpdateVersion.ps1
environment: environment:
pack_params: -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true -p:EmbedUntrackedSources=true --output output -p:Configuration=Release -p:p:ContinuousIntegrationBuild=true pack_params: -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg -p:PublishRepositoryUrl=true -p:EmbedUntrackedSources=true --output output -p:Configuration=Release -p:p:ContinuousIntegrationBuild=true --verbosity detailed
after_build: after_build:
# Create NuGet packages # Create NuGet packages