Merge branch 'release/2.9'

This commit is contained in:
Mark van Renswoude 2022-12-22 11:50:14 +01:00
commit 081bf7a012
12 changed files with 323 additions and 84 deletions

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Threading.Tasks;
using ExampleLib; using ExampleLib;
using Messaging.TapetiExample; using Messaging.TapetiExample;
using Tapeti.Annotations; using Tapeti.Annotations;
@ -16,6 +17,7 @@ namespace _03_FlowRequestResponse
public string FirstQuote; public string FirstQuote;
public string SecondQuote; public string SecondQuote;
public string ThirdQuote;
public ParallelFlowController(IFlowProvider flowProvider, IExampleState exampleState) public ParallelFlowController(IFlowProvider flowProvider, IExampleState exampleState)
@ -35,7 +37,7 @@ namespace _03_FlowRequestResponse
Amount = 1 Amount = 1
}, },
HandleFirstQuoteResponse) HandleFirstQuoteResponse)
.AddRequestSync<QuoteRequestMessage, QuoteResponseMessage>( .AddRequest<QuoteRequestMessage, QuoteResponseMessage>(
new QuoteRequestMessage new QuoteRequestMessage
{ {
Amount = 2 Amount = 2
@ -54,10 +56,26 @@ namespace _03_FlowRequestResponse
[Continuation] [Continuation]
public void HandleSecondQuoteResponse(QuoteResponseMessage message) public async Task HandleSecondQuoteResponse(QuoteResponseMessage message, IFlowParallelRequest parallelRequest)
{ {
Console.WriteLine("[ParallelFlowController] Second quote response received"); Console.WriteLine("[ParallelFlowController] Second quote response received");
SecondQuote = message.Quote; SecondQuote = message.Quote;
// Example of adding a request to an ongoing parallel request
await parallelRequest.AddRequestSync<QuoteRequestMessage, QuoteResponseMessage>(
new QuoteRequestMessage
{
Amount = 3
},
HandleThirdQuoteResponse);
}
[Continuation]
public void HandleThirdQuoteResponse(QuoteResponseMessage message)
{
Console.WriteLine("[ParallelFlowController] First quote response received");
ThirdQuote = message.Quote;
} }
@ -65,6 +83,16 @@ namespace _03_FlowRequestResponse
{ {
Console.WriteLine("[ParallelFlowController] First quote: " + FirstQuote); Console.WriteLine("[ParallelFlowController] First quote: " + FirstQuote);
Console.WriteLine("[ParallelFlowController] Second quote: " + SecondQuote); Console.WriteLine("[ParallelFlowController] Second quote: " + SecondQuote);
Console.WriteLine("[ParallelFlowController] Third quote: " + ThirdQuote);
return flowProvider.YieldWithParallelRequest()
.YieldSync(ImmediateConvergeTest, FlowNoRequestsBehaviour.Converge);
}
private IYieldPoint ImmediateConvergeTest()
{
Console.WriteLine("[ParallelFlowController] Second parallel flow immediately converged");
exampleState.Done(); exampleState.Done();
return flowProvider.End(); return flowProvider.End();

View File

@ -25,8 +25,7 @@ namespace _03_FlowRequestResponse
break; break;
default: default:
// We have to return a response. quote = new string('\'', message.Amount);
quote = null;
break; break;
} }

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Linq;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Annotations; using Tapeti.Annotations;
@ -52,6 +53,10 @@ namespace Tapeti.Flow.Default
} }
else else
throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}"); throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}");
foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(IFlowParallelRequest)))
parameter.SetBinding(ParallelRequestParameterFactory);
} }
@ -103,5 +108,12 @@ namespace Tapeti.Flow.Default
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out _)) if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out _))
throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}"); throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}");
} }
private static object ParallelRequestParameterFactory(IMessageContext context)
{
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.GetParallelRequest(new FlowHandlerContext(context));
}
} }
} }

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
@ -12,13 +13,13 @@ namespace Tapeti.Flow.Default
public Guid ContinuationID { get; set; } public Guid ContinuationID { get; set; }
public ContinuationMetadata ContinuationMetadata { get; set; } public ContinuationMetadata ContinuationMetadata { get; set; }
private bool storeCalled; private int storeCalled;
private bool deleteCalled; private int deleteCalled;
public async Task Store(bool persistent) public async Task Store(bool persistent)
{ {
storeCalled = true; storeCalled++;
if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext)); if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext));
if (FlowState == null) throw new ArgumentNullException(nameof(FlowState)); if (FlowState == null) throw new ArgumentNullException(nameof(FlowState));
@ -30,7 +31,7 @@ namespace Tapeti.Flow.Default
public async Task Delete() public async Task Delete()
{ {
deleteCalled = true; deleteCalled++;
if (FlowStateLock != null) if (FlowStateLock != null)
await FlowStateLock.DeleteFlowState(); await FlowStateLock.DeleteFlowState();
@ -38,13 +39,16 @@ namespace Tapeti.Flow.Default
public bool IsStoredOrDeleted() public bool IsStoredOrDeleted()
{ {
return storeCalled || deleteCalled; return storeCalled > 0 || deleteCalled > 0;
} }
public void EnsureStoreOrDeleteIsCalled() public void EnsureStoreOrDeleteIsCalled()
{ {
if (!IsStoredOrDeleted()) if (!IsStoredOrDeleted())
throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID); throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID);
Debug.Assert(storeCalled <= 1, "Store called more than once!");
Debug.Assert(deleteCalled <= 1, "Delete called more than once!");
} }
public void Dispose() public void Dispose()

View File

@ -40,19 +40,14 @@ namespace Tapeti.Flow.Default
// Remove Continuation now because the IYieldPoint result handler will store the new state // Remove Continuation now because the IYieldPoint result handler will store the new state
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
var converge = flowContext.FlowState.Continuations.Count == 0 &&
flowContext.ContinuationMetadata.ConvergeMethodName != null;
if (converge)
// Indicate to the FlowBindingMiddleware that the state must not to be stored
flowPayload.FlowIsConverging = true;
await next(); await next();
if (converge) if (flowPayload.FlowIsConverging)
await CallConvergeMethod(context, controllerPayload, {
flowContext.ContinuationMetadata.ConvergeMethodName, var flowHandler = flowContext.HandlerContext.Config.DependencyResolver.Resolve<IFlowHandler>();
flowContext.ContinuationMetadata.ConvergeMethodSync); await flowHandler.Converge(new FlowHandlerContext(context));
}
} }
else else
await next(); await next();
@ -127,28 +122,5 @@ namespace Tapeti.Flow.Default
context.Store(new FlowMessageContextPayload(flowContext)); context.Store(new FlowMessageContextPayload(flowContext));
return flowContext; return flowContext;
} }
private static async Task CallConvergeMethod(IMessageContext context, ControllerMessageContextPayload controllerPayload, string methodName, bool sync)
{
IYieldPoint yieldPoint;
var method = controllerPayload.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {methodName}");
if (sync)
yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] {});
else
yieldPoint = await (Task<IYieldPoint>)method.Invoke(controllerPayload.Controller, new object[] { });
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {methodName}");
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);
}
} }
} }

View File

@ -48,7 +48,7 @@ namespace Tapeti.Flow.Default
/// <inheritdoc /> /// <inheritdoc />
public IFlowParallelRequestBuilder YieldWithParallelRequest() public IFlowParallelRequestBuilder YieldWithParallelRequest()
{ {
return new ParallelRequestBuilder(config, SendRequest); return new ParallelRequestBuilder(config, this);
} }
/// <inheritdoc /> /// <inheritdoc />
@ -64,8 +64,8 @@ namespace Tapeti.Flow.Default
} }
private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false) string convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true)
{ {
if (context.FlowState == null) if (context.FlowState == null)
{ {
@ -89,7 +89,8 @@ namespace Tapeti.Flow.Default
ReplyTo = responseHandlerInfo.ReplyToQueue ReplyTo = responseHandlerInfo.ReplyToQueue
}; };
await context.Store(responseHandlerInfo.IsDurableQueue); if (store)
await context.Store(responseHandlerInfo.IsDurableQueue);
await publisher.Publish(message, properties, true); await publisher.Publish(message, properties, true);
} }
@ -122,7 +123,7 @@ namespace Tapeti.Flow.Default
} }
private static async Task EndFlow(FlowContext context) internal static async Task EndFlow(FlowContext context)
{ {
await context.Delete(); await context.Delete();
@ -243,16 +244,50 @@ namespace Tapeti.Flow.Default
} }
/// <inheritdoc />
public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context)
{
return context.MessageContext.TryGet<FlowMessageContextPayload>(out var flowPayload)
? new ParallelRequest(config, this, flowPayload.FlowContext)
: null;
}
/// <inheritdoc />
public Task Converge(IFlowHandlerContext context)
{
return Execute(context, new DelegateYieldPoint(flowContext =>
Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync)));
}
internal async Task Converge(FlowContext flowContext, string convergeMethodName, bool convergeMethodSync)
{
IYieldPoint yieldPoint;
if (!flowContext.HandlerContext.MessageContext.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
throw new ArgumentException("Context does not contain a controller payload", nameof(flowContext));
var method = controllerPayload.Controller.GetType().GetMethod(convergeMethodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {convergeMethodName}");
if (convergeMethodSync)
yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] { });
else
yieldPoint = await(Task<IYieldPoint>)method.Invoke(controllerPayload.Controller, new object[] { });
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}");
await Execute(flowContext.HandlerContext, yieldPoint);
}
private class ParallelRequestBuilder : IFlowParallelRequestBuilder private class ParallelRequestBuilder : IFlowParallelRequestBuilder
{ {
public delegate Task SendRequestFunc(FlowContext context,
object message,
ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName,
bool convergeMethodSync);
private class RequestInfo private class RequestInfo
{ {
public object Message { get; set; } public object Message { get; set; }
@ -261,30 +296,36 @@ namespace Tapeti.Flow.Default
private readonly ITapetiConfig config; private readonly ITapetiConfig config;
private readonly SendRequestFunc sendRequest; private readonly FlowProvider flowProvider;
private readonly List<RequestInfo> requests = new List<RequestInfo>(); private readonly List<RequestInfo> requests = new List<RequestInfo>();
public ParallelRequestBuilder(ITapetiConfig config, SendRequestFunc sendRequest) public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider)
{ {
this.config = config; this.config = config;
this.sendRequest = sendRequest; this.flowProvider = flowProvider;
} }
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler) public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler)
{ {
requests.Add(new RequestInfo return InternalAddRequest(message, responseHandler);
{ }
Message = message,
ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler)
});
return this;
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler)
{
return InternalAddRequest(message, responseHandler);
} }
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler) public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler)
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler)
{ {
requests.Add(new RequestInfo requests.Add(new RequestInfo
{ {
@ -296,41 +337,110 @@ namespace Tapeti.Flow.Default
} }
public IYieldPoint Yield(Func<Task<IYieldPoint>> continuation) public IYieldPoint Yield(Func<Task<IYieldPoint>> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception)
{ {
return BuildYieldPoint(continuation, false); return BuildYieldPoint(continuation, false, noRequestsBehaviour);
} }
public IYieldPoint YieldSync(Func<IYieldPoint> continuation) public IYieldPoint YieldSync(Func<IYieldPoint> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception)
{ {
return BuildYieldPoint(continuation, true); return BuildYieldPoint(continuation, true, noRequestsBehaviour);
} }
private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync) private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception)
{ {
if (requests.Count == 0) if (requests.Count == 0)
throw new YieldPointException("At least one request must be added before yielding a parallel request"); {
switch (noRequestsBehaviour)
{
case FlowNoRequestsBehaviour.Exception:
throw new YieldPointException("At least one request must be added before yielding a parallel request");
case FlowNoRequestsBehaviour.Converge:
return new DelegateYieldPoint(context =>
flowProvider.Converge(context, convergeMethod.Method.Name, convergeMethodSync));
case FlowNoRequestsBehaviour.EndFlow:
return new DelegateYieldPoint(EndFlow);
default:
throw new ArgumentOutOfRangeException(nameof(noRequestsBehaviour), noRequestsBehaviour, null);
}
}
if (convergeMethod?.Method == null) if (convergeMethod?.Method == null)
throw new ArgumentNullException(nameof(convergeMethod)); throw new ArgumentNullException(nameof(convergeMethod));
return new DelegateYieldPoint(context => return new DelegateYieldPoint(async context =>
{ {
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType()) if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType())
throw new YieldPointException("Converge method must be in the same controller class"); throw new YieldPointException("Converge method must be in the same controller class");
return Task.WhenAll(requests.Select(requestInfo => await Task.WhenAll(requests.Select(requestInfo =>
sendRequest(context, requestInfo.Message, flowProvider.SendRequest(
context,
requestInfo.Message,
requestInfo.ResponseHandlerInfo, requestInfo.ResponseHandlerInfo,
convergeMethod.Method.Name, convergeMethod.Method.Name,
convergeMethodSync))); convergeMethodSync,
false)));
await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue));
}); });
} }
} }
private class ParallelRequest : IFlowParallelRequest
{
private readonly ITapetiConfig config;
private readonly FlowProvider flowProvider;
private readonly FlowContext flowContext;
public ParallelRequest(ITapetiConfig config, FlowProvider flowProvider, FlowContext flowContext)
{
this.config = config;
this.flowProvider = flowProvider;
this.flowContext = flowContext;
}
public Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler)
{
return InternalAddRequest(message, responseHandler);
}
public Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler)
{
return InternalAddRequest(message, responseHandler);
}
public Task AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler)
{
return InternalAddRequest(message, responseHandler);
}
private Task InternalAddRequest(object message, Delegate responseHandler)
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return flowProvider.SendRequest(
flowContext,
message,
responseHandlerInfo,
flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync,
false);
}
}
internal class ResponseHandlerInfo internal class ResponseHandlerInfo
{ {
public string MethodName { get; set; } public string MethodName { get; set; }

View File

@ -10,13 +10,15 @@ namespace Tapeti.Flow
internal class FlowMessageContextPayload : IMessageContextPayload, IDisposable internal class FlowMessageContextPayload : IMessageContextPayload, IDisposable
{ {
public FlowContext FlowContext { get; } public FlowContext FlowContext { get; }
/// <summary> /// <summary>
/// Indicates if the current message handler is the last one to be called before a /// Indicates if the current message handler is the last one to be called before a
/// parallel flow is done and the convergeMethod will be called. /// parallel flow is done and the convergeMethod will be called.
/// Temporarily disables storing the flow state. /// Temporarily disables storing the flow state.
/// </summary> /// </summary>
public bool FlowIsConverging { get; set; } public bool FlowIsConverging => FlowContext != null &&
FlowContext.FlowState.Continuations.Count == 0 &&
FlowContext.ContinuationMetadata.ConvergeMethodName != null;
public FlowMessageContextPayload(FlowContext flowContext) public FlowMessageContextPayload(FlowContext flowContext)

View File

@ -109,9 +109,46 @@ namespace Tapeti.Flow
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="yieldPoint"></param> /// <param name="yieldPoint"></param>
Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint); Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint);
/// <summary>
/// Returns the parallel request for the given message context.
/// </summary>
IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context);
/// <summary>
/// Calls the converge method for a parallel flow.
/// </summary>
Task Converge(IFlowHandlerContext context);
} }
/// <summary>
/// Determines how the Yield method of a parallel request behaves when no requests have been added.
/// Useful in cases where requests are sent conditionally.
/// </summary>
public enum FlowNoRequestsBehaviour
{
/// <summary>
/// Throw an exception. This is the default behaviour to prevent subtle bugs when not specifying the behaviour explicitly,
/// as well as for backwards compatibility.
/// </summary>
Exception,
/// <summary>
/// Immediately call the continuation method.
/// </summary>
Converge,
/// <summary>
/// End the flow without calling the converge method.
/// </summary>
EndFlow
}
/// <summary> /// <summary>
/// Builder to publish one or more request messages and continuing the flow when the responses arrive. /// Builder to publish one or more request messages and continuing the flow when the responses arrive.
/// </summary> /// </summary>
@ -127,6 +164,13 @@ namespace Tapeti.Flow
/// <param name="responseHandler"></param> /// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler); IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,Task})"/>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler);
/// <summary> /// <summary>
/// Publish a request message and continue the flow when the response arrives. /// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint. /// Note that the response handler can not influence the flow as it does not return a YieldPoint.
@ -137,6 +181,8 @@ namespace Tapeti.Flow
/// <param name="responseHandler"></param> /// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler); IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
/// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are
/// async, so you should always await them.
/// <summary> /// <summary>
/// Constructs an IYieldPoint to continue the flow when responses arrive. /// Constructs an IYieldPoint to continue the flow when responses arrive.
/// The continuation method is called when all responses have arrived. /// The continuation method is called when all responses have arrived.
@ -144,8 +190,9 @@ namespace Tapeti.Flow
/// controller and can store state. /// controller and can store state.
/// Used for asynchronous continuation methods. /// Used for asynchronous continuation methods.
/// </summary> /// </summary>
/// <param name="continuation"></param> /// <param name="continuation">The converge continuation method to be called when all responses have been handled.</param>
IYieldPoint Yield(Func<Task<IYieldPoint>> continuation); /// <param name="noRequestsBehaviour">How the Yield method should behave when no requests have been added to the parallel request builder.</param>
IYieldPoint Yield(Func<Task<IYieldPoint>> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception);
/// <summary> /// <summary>
/// Constructs an IYieldPoint to continue the flow when responses arrive. /// Constructs an IYieldPoint to continue the flow when responses arrive.
@ -154,8 +201,47 @@ namespace Tapeti.Flow
/// controller and can store state. /// controller and can store state.
/// Used for synchronous continuation methods. /// Used for synchronous continuation methods.
/// </summary> /// </summary>
/// <param name="continuation"></param> /// <param name="continuation">The converge continuation method to be called when all responses have been handled.</param>
IYieldPoint YieldSync(Func<IYieldPoint> continuation); /// <param name="noRequestsBehaviour">How the Yield method should behave when no requests have been added to the parallel request builder.</param>
IYieldPoint YieldSync(Func<IYieldPoint> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception);
}
/// <summary>
/// Provides means of adding one or more requests to a parallel request.
/// </summary>
/// <remarks>
/// Add a parameter of this type to a parallel request's response handler to gain access to it's functionality.
/// Not available in other contexts.
/// </remarks>
public interface IFlowParallelRequest
{
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint.
/// It can instead store state in the controller for the continuation passed to the Yield method.
/// Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,Task})"/>
Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler);
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint.
/// It can instead store state in the controller for the continuation passed to the Yield method.
/// Used for synchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
Task AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
} }

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework> <TargetFramework>net5.0</TargetFramework>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">

View File

@ -945,8 +945,9 @@ namespace Tapeti.Connection
if (returnInfo.RefCount == 0) if (returnInfo.RefCount == 0)
returnRoutingKeys.Remove(messageInfo.ReturnKey); returnRoutingKeys.Remove(messageInfo.ReturnKey);
} }
else
messageInfo.CompletionSource.SetResult(0);
messageInfo.CompletionSource.SetResult(0);
confirmMessages.Remove(deliveryTag); confirmMessages.Remove(deliveryTag);
} }
} }

View File

@ -76,8 +76,8 @@ namespace Tapeti.Default
public void QueueDeclare(string queueName, bool durable, bool passive) public void QueueDeclare(string queueName, bool durable, bool passive)
{ {
Console.WriteLine(passive Console.WriteLine(passive
? $"[Tapeti] Declaring {(durable ? "durable" : "dynamic")} queue {queueName}" ? $"[Tapeti] Verifying durable queue {queueName}"
: $"[Tapeti] Verifying durable queue {queueName}"); : $"[Tapeti] Declaring {(durable ? "durable" : "dynamic")} queue {queueName}");
} }
/// <inheritdoc /> /// <inheritdoc />

View File

@ -251,11 +251,36 @@ A few things to note:
#) The response handlers do not return an IYieldPoint themselves, but void (for AddRequestSync) or Task (for AddRequest). Therefore they can not influence the flow. Instead the converge method as passed to Yield or YieldSync determines how the flow continues. It is called immediately after the last response handler. #) The response handlers do not return an IYieldPoint themselves, but void (for AddRequestSync) or Task (for AddRequest). Therefore they can not influence the flow. Instead the converge method as passed to Yield or YieldSync determines how the flow continues. It is called immediately after the last response handler.
#) The converge method must be private, as it is not a valid message handler in itself. #) The converge method must be private, as it is not a valid message handler in itself.
#) You must add at least one request. #) You must add at least one request, or specify the NoRequestsBehaviour parameter for Yield/YieldSync explicitly.
Note that you do not have to perform all the operations in one go. You can store the result of ``YieldWithParallelRequest`` and conditionally call ``AddRequest`` or ``AddRequestSync`` as many times as required. Note that you do not have to perform all the operations in one go. You can store the result of ``YieldWithParallelRequest`` and conditionally call ``AddRequest`` or ``AddRequestSync`` as many times as required.
Adding requests to a parallel flow
----------------------------------
As mentioned above, you can not start a new parallel request in the same flow while the current one has not converged yet. This is enforced by the response handlers not returning an IYieldPoint.
You can however add requests to the current parallel request while handling one of the responses. This is equivalent to adding the request to the parallel flow builder initially, and will delay calling the converge method until a response has been received to this new request as well.
To add an additional request, include a second parameter in the continuation method of type IFlowParallelRequest. The continuation method also needs to be async to be able to await the IFlowParallelRequest.AddRequest[Sync] methods. For example:
::
[Continuation]
public async Task HandleDoctorAppointmentResponse(DoctorAppointmentResponseMessage appointment,
IFlowParallelRequest parallelRequest)
{
// Now that we have the appointment details, we can query the patient data
await parallelRequest.AddRequestSync<PatientRequestMessage, PatientResponseMessage>(
new PatientRequestMessage
{
PatientID = appointment.PatientID
},
HandlePatientResponse);
}
Persistent state Persistent state
---------------- ----------------
By default flow state is only preserved while the service is running. To persist the flow state across restarts and reboots, provide an implementation of IFlowRepository to ``WithFlow()``. By default flow state is only preserved while the service is running. To persist the flow state across restarts and reboots, provide an implementation of IFlowRepository to ``WithFlow()``.