Added NoRequestsBehaviour to ParallelFlow.Yield

This commit is contained in:
Mark van Renswoude 2021-12-10 11:45:09 +01:00
parent bc00d476bd
commit 58d1908047
6 changed files with 146 additions and 68 deletions

View File

@ -85,6 +85,15 @@ namespace _03_FlowRequestResponse
Console.WriteLine("[ParallelFlowController] Second quote: " + SecondQuote); Console.WriteLine("[ParallelFlowController] Second quote: " + SecondQuote);
Console.WriteLine("[ParallelFlowController] Third quote: " + ThirdQuote); Console.WriteLine("[ParallelFlowController] Third quote: " + ThirdQuote);
return flowProvider.YieldWithParallelRequest()
.YieldSync(ImmediateConvergeTest, FlowNoRequestsBehaviour.Converge);
}
private IYieldPoint ImmediateConvergeTest()
{
Console.WriteLine("[ParallelFlowController] Second parallel flow immediately converged");
exampleState.Done(); exampleState.Done();
return flowProvider.End(); return flowProvider.End();
} }

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
@ -12,13 +13,13 @@ namespace Tapeti.Flow.Default
public Guid ContinuationID { get; set; } public Guid ContinuationID { get; set; }
public ContinuationMetadata ContinuationMetadata { get; set; } public ContinuationMetadata ContinuationMetadata { get; set; }
private bool storeCalled; private int storeCalled;
private bool deleteCalled; private int deleteCalled;
public async Task Store(bool persistent) public async Task Store(bool persistent)
{ {
storeCalled = true; storeCalled++;
if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext)); if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext));
if (FlowState == null) throw new ArgumentNullException(nameof(FlowState)); if (FlowState == null) throw new ArgumentNullException(nameof(FlowState));
@ -30,7 +31,7 @@ namespace Tapeti.Flow.Default
public async Task Delete() public async Task Delete()
{ {
deleteCalled = true; deleteCalled++;
if (FlowStateLock != null) if (FlowStateLock != null)
await FlowStateLock.DeleteFlowState(); await FlowStateLock.DeleteFlowState();
@ -38,13 +39,16 @@ namespace Tapeti.Flow.Default
public bool IsStoredOrDeleted() public bool IsStoredOrDeleted()
{ {
return storeCalled || deleteCalled; return storeCalled > 0 || deleteCalled > 0;
} }
public void EnsureStoreOrDeleteIsCalled() public void EnsureStoreOrDeleteIsCalled()
{ {
if (!IsStoredOrDeleted()) if (!IsStoredOrDeleted())
throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID); throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID);
Debug.Assert(storeCalled <= 1, "Store called more than once!");
Debug.Assert(deleteCalled <= 1, "Delete called more than once!");
} }
public void Dispose() public void Dispose()

View File

@ -44,9 +44,10 @@ namespace Tapeti.Flow.Default
await next(); await next();
if (flowPayload.FlowIsConverging) if (flowPayload.FlowIsConverging)
await CallConvergeMethod(context, controllerPayload, {
flowContext.ContinuationMetadata.ConvergeMethodName, var flowHandler = flowContext.HandlerContext.Config.DependencyResolver.Resolve<IFlowHandler>();
flowContext.ContinuationMetadata.ConvergeMethodSync); await flowHandler.Converge(new FlowHandlerContext(context));
}
} }
else else
await next(); await next();
@ -121,28 +122,5 @@ namespace Tapeti.Flow.Default
context.Store(new FlowMessageContextPayload(flowContext)); context.Store(new FlowMessageContextPayload(flowContext));
return flowContext; return flowContext;
} }
private static async Task CallConvergeMethod(IMessageContext context, ControllerMessageContextPayload controllerPayload, string methodName, bool sync)
{
IYieldPoint yieldPoint;
var method = controllerPayload.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {methodName}");
if (sync)
yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] {});
else
yieldPoint = await (Task<IYieldPoint>)method.Invoke(controllerPayload.Controller, new object[] { });
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {methodName}");
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);
}
} }
} }

View File

@ -48,7 +48,7 @@ namespace Tapeti.Flow.Default
/// <inheritdoc /> /// <inheritdoc />
public IFlowParallelRequestBuilder YieldWithParallelRequest() public IFlowParallelRequestBuilder YieldWithParallelRequest()
{ {
return new ParallelRequestBuilder(config, SendRequest); return new ParallelRequestBuilder(config, this);
} }
/// <inheritdoc /> /// <inheritdoc />
@ -64,8 +64,8 @@ namespace Tapeti.Flow.Default
} }
private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false) string convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true)
{ {
if (context.FlowState == null) if (context.FlowState == null)
{ {
@ -89,7 +89,8 @@ namespace Tapeti.Flow.Default
ReplyTo = responseHandlerInfo.ReplyToQueue ReplyTo = responseHandlerInfo.ReplyToQueue
}; };
await context.Store(responseHandlerInfo.IsDurableQueue); if (store)
await context.Store(responseHandlerInfo.IsDurableQueue);
await publisher.Publish(message, properties, true); await publisher.Publish(message, properties, true);
} }
@ -122,7 +123,7 @@ namespace Tapeti.Flow.Default
} }
private static async Task EndFlow(FlowContext context) internal static async Task EndFlow(FlowContext context)
{ {
await context.Delete(); await context.Delete();
@ -246,18 +247,43 @@ namespace Tapeti.Flow.Default
/// <inheritdoc /> /// <inheritdoc />
public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context) public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context)
{ {
if (!context.MessageContext.TryGet<FlowMessageContextPayload>(out var flowPayload)) return context.MessageContext.TryGet<FlowMessageContextPayload>(out var flowPayload)
return null; ? new ParallelRequest(config, this, flowPayload.FlowContext)
: null;
return new ParallelRequest(config, SendRequest, flowPayload.FlowContext);
} }
private delegate Task SendRequestFunc(FlowContext context, /// <inheritdoc />
object message, public Task Converge(IFlowHandlerContext context)
ResponseHandlerInfo responseHandlerInfo, {
string convergeMethodName, return Execute(context, new DelegateYieldPoint(flowContext =>
bool convergeMethodSync); Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync)));
}
internal async Task Converge(FlowContext flowContext, string convergeMethodName, bool convergeMethodSync)
{
IYieldPoint yieldPoint;
if (!flowContext.HandlerContext.MessageContext.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
throw new ArgumentException("Context does not contain a controller payload", nameof(flowContext));
var method = controllerPayload.Controller.GetType().GetMethod(convergeMethodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {convergeMethodName}");
if (convergeMethodSync)
yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] { });
else
yieldPoint = await(Task<IYieldPoint>)method.Invoke(controllerPayload.Controller, new object[] { });
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}");
await Execute(flowContext.HandlerContext, yieldPoint);
}
private class ParallelRequestBuilder : IFlowParallelRequestBuilder private class ParallelRequestBuilder : IFlowParallelRequestBuilder
@ -270,14 +296,14 @@ namespace Tapeti.Flow.Default
private readonly ITapetiConfig config; private readonly ITapetiConfig config;
private readonly SendRequestFunc sendRequest; private readonly FlowProvider flowProvider;
private readonly List<RequestInfo> requests = new List<RequestInfo>(); private readonly List<RequestInfo> requests = new List<RequestInfo>();
public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest) public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider)
{ {
this.config = config; this.config = config;
this.sendRequest = sendRequest; this.flowProvider = flowProvider;
} }
@ -311,36 +337,57 @@ namespace Tapeti.Flow.Default
} }
public IYieldPoint Yield(Func<Task<IYieldPoint>> continuation) public IYieldPoint Yield(Func<Task<IYieldPoint>> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception)
{ {
return BuildYieldPoint(continuation, false); return BuildYieldPoint(continuation, false, noRequestsBehaviour);
} }
public IYieldPoint YieldSync(Func<IYieldPoint> continuation) public IYieldPoint YieldSync(Func<IYieldPoint> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception)
{ {
return BuildYieldPoint(continuation, true); return BuildYieldPoint(continuation, true, noRequestsBehaviour);
} }
private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync) private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception)
{ {
if (requests.Count == 0) if (requests.Count == 0)
throw new YieldPointException("At least one request must be added before yielding a parallel request"); {
switch (noRequestsBehaviour)
{
case FlowNoRequestsBehaviour.Exception:
throw new YieldPointException("At least one request must be added before yielding a parallel request");
case FlowNoRequestsBehaviour.Converge:
return new DelegateYieldPoint(context =>
flowProvider.Converge(context, convergeMethod.Method.Name, convergeMethodSync));
case FlowNoRequestsBehaviour.EndFlow:
return new DelegateYieldPoint(EndFlow);
default:
throw new ArgumentOutOfRangeException(nameof(noRequestsBehaviour), noRequestsBehaviour, null);
}
}
if (convergeMethod?.Method == null) if (convergeMethod?.Method == null)
throw new ArgumentNullException(nameof(convergeMethod)); throw new ArgumentNullException(nameof(convergeMethod));
return new DelegateYieldPoint(context => return new DelegateYieldPoint(async context =>
{ {
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType()) if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType())
throw new YieldPointException("Converge method must be in the same controller class"); throw new YieldPointException("Converge method must be in the same controller class");
return Task.WhenAll(requests.Select(requestInfo => await Task.WhenAll(requests.Select(requestInfo =>
sendRequest(context, requestInfo.Message, flowProvider.SendRequest(
context,
requestInfo.Message,
requestInfo.ResponseHandlerInfo, requestInfo.ResponseHandlerInfo,
convergeMethod.Method.Name, convergeMethod.Method.Name,
convergeMethodSync))); convergeMethodSync,
false)));
await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue));
}); });
} }
} }
@ -349,14 +396,14 @@ namespace Tapeti.Flow.Default
private class ParallelRequest : IFlowParallelRequest private class ParallelRequest : IFlowParallelRequest
{ {
private readonly ITapetiConfig config; private readonly ITapetiConfig config;
private readonly SendRequestFunc sendRequest; private readonly FlowProvider flowProvider;
private readonly FlowContext flowContext; private readonly FlowContext flowContext;
public ParallelRequest(ITapetiConfig config, SendRequestFunc sendRequest, FlowContext flowContext) public ParallelRequest(ITapetiConfig config, FlowProvider flowProvider, FlowContext flowContext)
{ {
this.config = config; this.config = config;
this.sendRequest = sendRequest; this.flowProvider = flowProvider;
this.flowContext = flowContext; this.flowContext = flowContext;
} }
@ -382,7 +429,14 @@ namespace Tapeti.Flow.Default
private Task InternalAddRequest(object message, Delegate responseHandler) private Task InternalAddRequest(object message, Delegate responseHandler)
{ {
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return sendRequest(flowContext, message, responseHandlerInfo, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync);
return flowProvider.SendRequest(
flowContext,
message,
responseHandlerInfo,
flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync,
false);
} }
} }

View File

@ -115,9 +115,40 @@ namespace Tapeti.Flow
/// Returns the parallel request for the given message context. /// Returns the parallel request for the given message context.
/// </summary> /// </summary>
IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context); IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context);
/// <summary>
/// Calls the converge method for a parallel flow.
/// </summary>
Task Converge(IFlowHandlerContext context);
} }
/// <summary>
/// Determines how the Yield method of a parallel request behaves when no requests have been added.
/// Useful in cases where requests are sent conditionally.
/// </summary>
public enum FlowNoRequestsBehaviour
{
/// <summary>
/// Throw an exception. This is the default behaviour to prevent subtle bugs when not specifying the behaviour explicitly,
/// as well as for backwards compatibility.
/// </summary>
Exception,
/// <summary>
/// Immediately call the continuation method.
/// </summary>
Converge,
/// <summary>
/// End the flow without calling the converge method.
/// </summary>
EndFlow
}
/// <summary> /// <summary>
/// Builder to publish one or more request messages and continuing the flow when the responses arrive. /// Builder to publish one or more request messages and continuing the flow when the responses arrive.
/// </summary> /// </summary>
@ -152,7 +183,6 @@ namespace Tapeti.Flow
/// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are /// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are
/// async, so you should always await them. /// async, so you should always await them.
/// <summary> /// <summary>
/// Constructs an IYieldPoint to continue the flow when responses arrive. /// Constructs an IYieldPoint to continue the flow when responses arrive.
/// The continuation method is called when all responses have arrived. /// The continuation method is called when all responses have arrived.
@ -160,8 +190,9 @@ namespace Tapeti.Flow
/// controller and can store state. /// controller and can store state.
/// Used for asynchronous continuation methods. /// Used for asynchronous continuation methods.
/// </summary> /// </summary>
/// <param name="continuation"></param> /// <param name="continuation">The converge continuation method to be called when all responses have been handled.</param>
IYieldPoint Yield(Func<Task<IYieldPoint>> continuation); /// <param name="noRequestsBehaviour">How the Yield method should behave when no requests have been added to the parallel request builder.</param>
IYieldPoint Yield(Func<Task<IYieldPoint>> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception);
/// <summary> /// <summary>
/// Constructs an IYieldPoint to continue the flow when responses arrive. /// Constructs an IYieldPoint to continue the flow when responses arrive.
@ -170,8 +201,9 @@ namespace Tapeti.Flow
/// controller and can store state. /// controller and can store state.
/// Used for synchronous continuation methods. /// Used for synchronous continuation methods.
/// </summary> /// </summary>
/// <param name="continuation"></param> /// <param name="continuation">The converge continuation method to be called when all responses have been handled.</param>
IYieldPoint YieldSync(Func<IYieldPoint> continuation); /// <param name="noRequestsBehaviour">How the Yield method should behave when no requests have been added to the parallel request builder.</param>
IYieldPoint YieldSync(Func<IYieldPoint> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception);
} }

View File

@ -945,8 +945,9 @@ namespace Tapeti.Connection
if (returnInfo.RefCount == 0) if (returnInfo.RefCount == 0)
returnRoutingKeys.Remove(messageInfo.ReturnKey); returnRoutingKeys.Remove(messageInfo.ReturnKey);
} }
else
messageInfo.CompletionSource.SetResult(0);
messageInfo.CompletionSource.SetResult(0);
confirmMessages.Remove(deliveryTag); confirmMessages.Remove(deliveryTag);
} }
} }