1
0
mirror of synced 2025-01-23 00:23:06 +01:00

Implemented #10: YieldWithParallelRequest

This commit is contained in:
Mark van Renswoude 2017-02-12 19:04:26 +01:00
parent 458af0640d
commit 70f1896fe9
13 changed files with 177 additions and 41 deletions

View File

@ -3,15 +3,7 @@ using System.Threading.Tasks;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
{ {
internal interface IExecutableYieldPoint : IYieldPoint internal class DelegateYieldPoint : IStateYieldPoint, IExecutableYieldPoint
{
bool StoreState { get; }
Task Execute(FlowContext context);
}
internal class DelegateYieldPoint : IYieldPoint
{ {
public bool StoreState { get; } public bool StoreState { get; }

View File

@ -12,8 +12,8 @@ namespace Tapeti.Flow.Default
{ {
public void Handle(IBindingContext context, Action next) public void Handle(IBindingContext context, Action next)
{ {
RegisterContinuationFilter(context);
RegisterYieldPointResult(context); RegisterYieldPointResult(context);
RegisterContinuationFilter(context);
next(); next();
@ -29,6 +29,26 @@ namespace Tapeti.Flow.Default
context.Use(new FlowMessageFilterMiddleware()); context.Use(new FlowMessageFilterMiddleware());
context.Use(new FlowMessageMiddleware()); context.Use(new FlowMessageMiddleware());
if (context.Result.HasHandler)
return;
// Continuation without IYieldPoint indicates a ParallelRequestBuilder response handler,
// make sure to store it's state as well
if (context.Result.Info.ParameterType == typeof(Task))
{
context.Result.SetHandler(async (messageContext, value) =>
{
await (Task)value;
await HandleParallelResponse(messageContext);
});
}
else if (context.Result.Info.ParameterType == typeof(void))
{
context.Result.SetHandler((messageContext, value) => HandleParallelResponse(messageContext));
}
else
throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}");
} }
@ -59,6 +79,13 @@ namespace Tapeti.Flow.Default
} }
private static Task HandleParallelResponse(IMessageContext context)
{
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(context, new StateYieldPoint(true));
}
private static void ValidateRequestResponse(IBindingContext context) private static void ValidateRequestResponse(IBindingContext context)
{ {
var request = context.MessageClass?.GetCustomAttribute<RequestAttribute>(); var request = context.MessageClass?.GetCustomAttribute<RequestAttribute>();

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
@ -13,12 +14,41 @@ namespace Tapeti.Flow.Default
{ {
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller); Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller);
// Remove Continuation now because the IYieldPoint result handler will store the new state
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
var converge = flowContext.FlowState.Continuations.Count == 0 &&
flowContext.ContinuationMetadata.ConvergeMethodName != null;
await next(); await next();
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); if (converge)
await CallConvergeMethod(context,
flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync);
} }
else else
await next(); await next();
} }
private static async Task CallConvergeMethod(IMessageContext context, string methodName, bool sync)
{
IYieldPoint yieldPoint;
var method = context.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
throw new ArgumentException($"Unknown converge method in controller {context.Controller.GetType().Name}: {methodName}");
if (sync)
yieldPoint = (IYieldPoint)method.Invoke(context.Controller, new object[] {});
else
yieldPoint = await (Task<IYieldPoint>)method.Invoke(context.Controller, new object[] { });
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for converge method {methodName}");
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(context, yieldPoint);
}
} }
} }

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using RabbitMQ.Client.Framing; using RabbitMQ.Client.Framing;
@ -36,8 +37,7 @@ namespace Tapeti.Flow.Default
public IFlowParallelRequestBuilder YieldWithParallelRequest() public IFlowParallelRequestBuilder YieldWithParallelRequest()
{ {
throw new NotImplementedException(); return new ParallelRequestBuilder(config, SendRequest);
//return new ParallelRequestBuilder();
} }
public IYieldPoint EndWithResponse<TResponse>(TResponse message) public IYieldPoint EndWithResponse<TResponse>(TResponse message)
@ -51,7 +51,8 @@ namespace Tapeti.Flow.Default
} }
private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo) private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false)
{ {
var continuationID = Guid.NewGuid(); var continuationID = Guid.NewGuid();
@ -59,7 +60,8 @@ namespace Tapeti.Flow.Default
new ContinuationMetadata new ContinuationMetadata
{ {
MethodName = responseHandlerInfo.MethodName, MethodName = responseHandlerInfo.MethodName,
ConvergeMethodName = null ConvergeMethodName = convergeMethodName,
ConvergeMethodSync = convergeMethodTaskSync
}); });
var properties = new BasicProperties var properties = new BasicProperties
@ -144,8 +146,10 @@ namespace Tapeti.Flow.Default
public async Task Execute(IMessageContext context, IYieldPoint yieldPoint) public async Task Execute(IMessageContext context, IYieldPoint yieldPoint)
{ {
var delegateYieldPoint = (DelegateYieldPoint)yieldPoint; var stateYieldPoint = yieldPoint as IStateYieldPoint;
var storeState = delegateYieldPoint.StoreState; var executableYieldPoint = yieldPoint as IExecutableYieldPoint;
var storeState = stateYieldPoint?.StoreState ?? false;
FlowContext flowContext; FlowContext flowContext;
object flowContextItem; object flowContextItem;
@ -181,7 +185,8 @@ namespace Tapeti.Flow.Default
try try
{ {
await delegateYieldPoint.Execute(flowContext); if (executableYieldPoint != null)
await executableYieldPoint.Execute(flowContext);
} }
catch (YieldPointException e) catch (YieldPointException e)
{ {
@ -203,10 +208,16 @@ namespace Tapeti.Flow.Default
} }
/*
private class ParallelRequestBuilder : IFlowParallelRequestBuilder private class ParallelRequestBuilder : IFlowParallelRequestBuilder
{ {
internal class RequestInfo public delegate Task SendRequestFunc(FlowContext context,
object message,
ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName,
bool convergeMethodSync);
private class RequestInfo
{ {
public object Message { get; set; } public object Message { get; set; }
public ResponseHandlerInfo ResponseHandlerInfo { get; set; } public ResponseHandlerInfo ResponseHandlerInfo { get; set; }
@ -214,15 +225,13 @@ namespace Tapeti.Flow.Default
private readonly IConfig config; private readonly IConfig config;
private readonly IFlowStore flowStore; private readonly SendRequestFunc sendRequest;
private readonly Func<FlowContext, object, ResponseHandlerInfo, Task> sendRequest;
private readonly List<RequestInfo> requests = new List<RequestInfo>(); private readonly List<RequestInfo> requests = new List<RequestInfo>();
public ParallelRequestBuilder(IConfig config, IFlowStore flowStore, Func<FlowContext, object, ResponseHandlerInfo, Task> sendRequest) public ParallelRequestBuilder(IConfig config, SendRequestFunc sendRequest)
{ {
this.config = config; this.config = config;
this.flowStore = flowStore;
this.sendRequest = sendRequest; this.sendRequest = sendRequest;
} }
@ -253,15 +262,34 @@ namespace Tapeti.Flow.Default
public IYieldPoint Yield(Func<Task<IYieldPoint>> continuation) public IYieldPoint Yield(Func<Task<IYieldPoint>> continuation)
{ {
return new YieldPoint(flowStore, true, context => Task.WhenAll(requests.Select(requestInfo => sendRequest(context, requestInfo.Message, requestInfo.ResponseHandlerInfo)))); return BuildYieldPoint(continuation, false);
} }
public IYieldPoint Yield(Func<IYieldPoint> continuation) public IYieldPoint YieldSync(Func<IYieldPoint> continuation)
{ {
return new YieldPoint(flowStore, true, context => Task.WhenAll(requests.Select(requestInfo => sendRequest(context, requestInfo.Message, requestInfo.ResponseHandlerInfo)))); return BuildYieldPoint(continuation, true);
} }
}*/
private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync)
{
if (convergeMethod?.Method == null)
throw new ArgumentNullException(nameof(convergeMethod));
return new DelegateYieldPoint(true, context =>
{
if (convergeMethod.Method.DeclaringType != context.MessageContext.Controller.GetType())
throw new YieldPointException("Converge method must be in the same controller class");
return Task.WhenAll(requests.Select(requestInfo =>
sendRequest(context, requestInfo.Message,
requestInfo.ResponseHandlerInfo,
convergeMethod.Method.Name,
convergeMethodSync)));
});
}
}
internal class ResponseHandlerInfo internal class ResponseHandlerInfo

View File

@ -81,6 +81,7 @@ namespace Tapeti.Flow.Default
{ {
public string MethodName { get; set; } public string MethodName { get; set; }
public string ConvergeMethodName { get; set; } public string ConvergeMethodName { get; set; }
public bool ConvergeMethodSync { get; set; }
public ContinuationMetadata Clone() public ContinuationMetadata Clone()
@ -88,7 +89,8 @@ namespace Tapeti.Flow.Default
return new ContinuationMetadata return new ContinuationMetadata
{ {
MethodName = MethodName, MethodName = MethodName,
ConvergeMethodName = ConvergeMethodName ConvergeMethodName = ConvergeMethodName,
ConvergeMethodSync = ConvergeMethodSync
}; };
} }
} }

View File

@ -0,0 +1,15 @@
using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
internal interface IStateYieldPoint : IYieldPoint
{
bool StoreState { get; }
}
internal interface IExecutableYieldPoint : IYieldPoint
{
Task Execute(FlowContext context);
}
}

View File

@ -0,0 +1,13 @@
namespace Tapeti.Flow.Default
{
internal class StateYieldPoint : IStateYieldPoint
{
public bool StoreState { get; }
public StateYieldPoint(bool storeState)
{
StoreState = storeState;
}
}
}

View File

@ -30,7 +30,7 @@ namespace Tapeti.Flow
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler); IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
IYieldPoint Yield(Func<Task<IYieldPoint>> continuation); IYieldPoint Yield(Func<Task<IYieldPoint>> continuation);
IYieldPoint Yield(Func<IYieldPoint> continuation); IYieldPoint YieldSync(Func<IYieldPoint> continuation);
} }
public interface IYieldPoint public interface IYieldPoint

View File

@ -56,9 +56,11 @@
<Compile Include="Default\FlowContext.cs" /> <Compile Include="Default\FlowContext.cs" />
<Compile Include="Default\FlowMessageMiddleware.cs" /> <Compile Include="Default\FlowMessageMiddleware.cs" />
<Compile Include="Default\FlowState.cs" /> <Compile Include="Default\FlowState.cs" />
<Compile Include="Default\IInternalYieldPoint.cs" />
<Compile Include="Default\NonPersistentFlowRepository.cs" /> <Compile Include="Default\NonPersistentFlowRepository.cs" />
<Compile Include="Default\DelegateYieldPoint.cs" /> <Compile Include="Default\DelegateYieldPoint.cs" />
<Compile Include="ConfigExtensions.cs" /> <Compile Include="ConfigExtensions.cs" />
<Compile Include="Default\StateYieldPoint.cs" />
<Compile Include="FlowHelpers\MethodSerializer.cs" /> <Compile Include="FlowHelpers\MethodSerializer.cs" />
<Compile Include="FlowMiddleware.cs" /> <Compile Include="FlowMiddleware.cs" />
<Compile Include="Default\FlowStore.cs" /> <Compile Include="Default\FlowStore.cs" />

View File

@ -36,23 +36,42 @@ namespace Test
Console.WriteLine(">> Marco (yielding with request)"); Console.WriteLine(">> Marco (yielding with request)");
await myVisualizer.VisualizeMarco(); await myVisualizer.VisualizeMarco();
StateTestGuid = Guid.NewGuid();
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>( return flowProvider.YieldWithParallelRequest()
new PoloConfirmationRequestMessage() .AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
{ {
StoredInState = StateTestGuid StoredInState = StateTestGuid
}, }, HandlePoloConfirmationResponse1)
HandlePoloConfirmationResponse);
.AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
{
StoredInState = StateTestGuid
}, HandlePoloConfirmationResponse2)
.YieldSync(ContinuePoloConfirmation);
} }
[Continuation] [Continuation]
public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage message) public void HandlePoloConfirmationResponse1(PoloConfirmationResponseMessage message)
{ {
Console.WriteLine(">> HandlePoloConfirmationResponse (ending flow)"); Console.WriteLine(">> HandlePoloConfirmationResponse1");
Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!");
}
[Continuation]
public void HandlePoloConfirmationResponse2(PoloConfirmationResponseMessage message)
{
Console.WriteLine(">> HandlePoloConfirmationResponse2");
Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!"); Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!");
}
private IYieldPoint ContinuePoloConfirmation()
{
Console.WriteLine("> ConvergePoloConfirmation (ending flow)");
return flowProvider.EndWithResponse(new PoloMessage()); return flowProvider.EndWithResponse(new PoloMessage());
} }
@ -77,7 +96,6 @@ namespace Test
public void Polo(PoloMessage message) public void Polo(PoloMessage message)
{ {
Console.WriteLine(">> Polo"); Console.WriteLine(">> Polo");
StateTestGuid = Guid.NewGuid();
} }
} }

View File

@ -17,8 +17,9 @@ namespace Test
public async Task Run() public async Task Run()
{ {
// await publisher.Publish(new MarcoMessage()); await publisher.Publish(new MarcoMessage());
/*
var concurrent = new SemaphoreSlim(20); var concurrent = new SemaphoreSlim(20);
while (true) while (true)
@ -38,12 +39,12 @@ namespace Test
await Task.Delay(200); await Task.Delay(200);
} }
*/
/*
while (true) while (true)
{ {
await Task.Delay(1000); await Task.Delay(1000);
}*/ }
} }
} }
} }

View File

@ -33,7 +33,11 @@ namespace Test
}) })
{ {
Console.WriteLine("Subscribing..."); Console.WriteLine("Subscribing...");
connection.Subscribe().Wait(); var subscriber = connection.Subscribe(false).Result;
Console.WriteLine("Consuming...");
subscriber.Resume().Wait();
Console.WriteLine("Done!"); Console.WriteLine("Done!");
var emitter = container.GetInstance<MarcoEmitter>(); var emitter = container.GetInstance<MarcoEmitter>();

View File

@ -59,6 +59,10 @@
<None Include="packages.config" /> <None Include="packages.config" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Tapeti.Annotations\Tapeti.Annotations.csproj">
<Project>{c4897d64-d04e-4ae9-bd98-d64295d1d13a}</Project>
<Name>Tapeti.Annotations</Name>
</ProjectReference>
<ProjectReference Include="..\Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj"> <ProjectReference Include="..\Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj">
<Project>{6de7b122-eb6a-46b8-aeaf-f84dde18f9c7}</Project> <Project>{6de7b122-eb6a-46b8-aeaf-f84dde18f9c7}</Project>
<Name>Tapeti.Flow.SQL</Name> <Name>Tapeti.Flow.SQL</Name>