1
0
mirror of synced 2024-12-22 17:23:07 +01:00

Implemented #10: YieldWithParallelRequest

This commit is contained in:
Mark van Renswoude 2017-02-12 19:04:26 +01:00
parent 6b2a562d62
commit 120108bd41
13 changed files with 177 additions and 41 deletions

View File

@ -3,15 +3,7 @@ using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
internal interface IExecutableYieldPoint : IYieldPoint
{
bool StoreState { get; }
Task Execute(FlowContext context);
}
internal class DelegateYieldPoint : IYieldPoint
internal class DelegateYieldPoint : IStateYieldPoint, IExecutableYieldPoint
{
public bool StoreState { get; }

View File

@ -12,8 +12,8 @@ namespace Tapeti.Flow.Default
{
public void Handle(IBindingContext context, Action next)
{
RegisterContinuationFilter(context);
RegisterYieldPointResult(context);
RegisterContinuationFilter(context);
next();
@ -29,6 +29,26 @@ namespace Tapeti.Flow.Default
context.Use(new FlowMessageFilterMiddleware());
context.Use(new FlowMessageMiddleware());
if (context.Result.HasHandler)
return;
// Continuation without IYieldPoint indicates a ParallelRequestBuilder response handler,
// make sure to store it's state as well
if (context.Result.Info.ParameterType == typeof(Task))
{
context.Result.SetHandler(async (messageContext, value) =>
{
await (Task)value;
await HandleParallelResponse(messageContext);
});
}
else if (context.Result.Info.ParameterType == typeof(void))
{
context.Result.SetHandler((messageContext, value) => HandleParallelResponse(messageContext));
}
else
throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}");
}
@ -59,6 +79,13 @@ namespace Tapeti.Flow.Default
}
private static Task HandleParallelResponse(IMessageContext context)
{
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(context, new StateYieldPoint(true));
}
private static void ValidateRequestResponse(IBindingContext context)
{
var request = context.MessageClass?.GetCustomAttribute<RequestAttribute>();

View File

@ -1,4 +1,5 @@
using System;
using System.Reflection;
using System.Threading.Tasks;
using Tapeti.Config;
@ -13,12 +14,41 @@ namespace Tapeti.Flow.Default
{
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller);
// Remove Continuation now because the IYieldPoint result handler will store the new state
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
var converge = flowContext.FlowState.Continuations.Count == 0 &&
flowContext.ContinuationMetadata.ConvergeMethodName != null;
await next();
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
if (converge)
await CallConvergeMethod(context,
flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync);
}
else
await next();
}
private static async Task CallConvergeMethod(IMessageContext context, string methodName, bool sync)
{
IYieldPoint yieldPoint;
var method = context.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
throw new ArgumentException($"Unknown converge method in controller {context.Controller.GetType().Name}: {methodName}");
if (sync)
yieldPoint = (IYieldPoint)method.Invoke(context.Controller, new object[] {});
else
yieldPoint = await (Task<IYieldPoint>)method.Invoke(context.Controller, new object[] { });
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for converge method {methodName}");
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(context, yieldPoint);
}
}
}

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;
@ -36,8 +37,7 @@ namespace Tapeti.Flow.Default
public IFlowParallelRequestBuilder YieldWithParallelRequest()
{
throw new NotImplementedException();
//return new ParallelRequestBuilder();
return new ParallelRequestBuilder(config, SendRequest);
}
public IYieldPoint EndWithResponse<TResponse>(TResponse message)
@ -51,7 +51,8 @@ namespace Tapeti.Flow.Default
}
private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo)
private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false)
{
var continuationID = Guid.NewGuid();
@ -59,7 +60,8 @@ namespace Tapeti.Flow.Default
new ContinuationMetadata
{
MethodName = responseHandlerInfo.MethodName,
ConvergeMethodName = null
ConvergeMethodName = convergeMethodName,
ConvergeMethodSync = convergeMethodTaskSync
});
var properties = new BasicProperties
@ -144,8 +146,10 @@ namespace Tapeti.Flow.Default
public async Task Execute(IMessageContext context, IYieldPoint yieldPoint)
{
var delegateYieldPoint = (DelegateYieldPoint)yieldPoint;
var storeState = delegateYieldPoint.StoreState;
var stateYieldPoint = yieldPoint as IStateYieldPoint;
var executableYieldPoint = yieldPoint as IExecutableYieldPoint;
var storeState = stateYieldPoint?.StoreState ?? false;
FlowContext flowContext;
object flowContextItem;
@ -181,7 +185,8 @@ namespace Tapeti.Flow.Default
try
{
await delegateYieldPoint.Execute(flowContext);
if (executableYieldPoint != null)
await executableYieldPoint.Execute(flowContext);
}
catch (YieldPointException e)
{
@ -203,10 +208,16 @@ namespace Tapeti.Flow.Default
}
/*
private class ParallelRequestBuilder : IFlowParallelRequestBuilder
{
internal class RequestInfo
public delegate Task SendRequestFunc(FlowContext context,
object message,
ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName,
bool convergeMethodSync);
private class RequestInfo
{
public object Message { get; set; }
public ResponseHandlerInfo ResponseHandlerInfo { get; set; }
@ -214,15 +225,13 @@ namespace Tapeti.Flow.Default
private readonly IConfig config;
private readonly IFlowStore flowStore;
private readonly Func<FlowContext, object, ResponseHandlerInfo, Task> sendRequest;
private readonly SendRequestFunc sendRequest;
private readonly List<RequestInfo> requests = new List<RequestInfo>();
public ParallelRequestBuilder(IConfig config, IFlowStore flowStore, Func<FlowContext, object, ResponseHandlerInfo, Task> sendRequest)
public ParallelRequestBuilder(IConfig config, SendRequestFunc sendRequest)
{
this.config = config;
this.flowStore = flowStore;
this.sendRequest = sendRequest;
}
@ -253,15 +262,34 @@ namespace Tapeti.Flow.Default
public IYieldPoint Yield(Func<Task<IYieldPoint>> continuation)
{
return new YieldPoint(flowStore, true, context => Task.WhenAll(requests.Select(requestInfo => sendRequest(context, requestInfo.Message, requestInfo.ResponseHandlerInfo))));
return BuildYieldPoint(continuation, false);
}
public IYieldPoint Yield(Func<IYieldPoint> continuation)
public IYieldPoint YieldSync(Func<IYieldPoint> continuation)
{
return new YieldPoint(flowStore, true, context => Task.WhenAll(requests.Select(requestInfo => sendRequest(context, requestInfo.Message, requestInfo.ResponseHandlerInfo))));
return BuildYieldPoint(continuation, true);
}
}*/
private IYieldPoint BuildYieldPoint(Delegate convergeMethod, bool convergeMethodSync)
{
if (convergeMethod?.Method == null)
throw new ArgumentNullException(nameof(convergeMethod));
return new DelegateYieldPoint(true, context =>
{
if (convergeMethod.Method.DeclaringType != context.MessageContext.Controller.GetType())
throw new YieldPointException("Converge method must be in the same controller class");
return Task.WhenAll(requests.Select(requestInfo =>
sendRequest(context, requestInfo.Message,
requestInfo.ResponseHandlerInfo,
convergeMethod.Method.Name,
convergeMethodSync)));
});
}
}
internal class ResponseHandlerInfo

View File

@ -81,6 +81,7 @@ namespace Tapeti.Flow.Default
{
public string MethodName { get; set; }
public string ConvergeMethodName { get; set; }
public bool ConvergeMethodSync { get; set; }
public ContinuationMetadata Clone()
@ -88,7 +89,8 @@ namespace Tapeti.Flow.Default
return new ContinuationMetadata
{
MethodName = MethodName,
ConvergeMethodName = ConvergeMethodName
ConvergeMethodName = ConvergeMethodName,
ConvergeMethodSync = ConvergeMethodSync
};
}
}

View File

@ -0,0 +1,15 @@
using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
internal interface IStateYieldPoint : IYieldPoint
{
bool StoreState { get; }
}
internal interface IExecutableYieldPoint : IYieldPoint
{
Task Execute(FlowContext context);
}
}

View File

@ -0,0 +1,13 @@
namespace Tapeti.Flow.Default
{
internal class StateYieldPoint : IStateYieldPoint
{
public bool StoreState { get; }
public StateYieldPoint(bool storeState)
{
StoreState = storeState;
}
}
}

View File

@ -30,7 +30,7 @@ namespace Tapeti.Flow
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
IYieldPoint Yield(Func<Task<IYieldPoint>> continuation);
IYieldPoint Yield(Func<IYieldPoint> continuation);
IYieldPoint YieldSync(Func<IYieldPoint> continuation);
}
public interface IYieldPoint

View File

@ -56,9 +56,11 @@
<Compile Include="Default\FlowContext.cs" />
<Compile Include="Default\FlowMessageMiddleware.cs" />
<Compile Include="Default\FlowState.cs" />
<Compile Include="Default\IInternalYieldPoint.cs" />
<Compile Include="Default\NonPersistentFlowRepository.cs" />
<Compile Include="Default\DelegateYieldPoint.cs" />
<Compile Include="ConfigExtensions.cs" />
<Compile Include="Default\StateYieldPoint.cs" />
<Compile Include="FlowHelpers\MethodSerializer.cs" />
<Compile Include="FlowMiddleware.cs" />
<Compile Include="Default\FlowStore.cs" />

View File

@ -36,23 +36,42 @@ namespace Test
Console.WriteLine(">> Marco (yielding with request)");
await myVisualizer.VisualizeMarco();
StateTestGuid = Guid.NewGuid();
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(
new PoloConfirmationRequestMessage()
return flowProvider.YieldWithParallelRequest()
.AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
{
StoredInState = StateTestGuid
},
HandlePoloConfirmationResponse);
}, HandlePoloConfirmationResponse1)
.AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
{
StoredInState = StateTestGuid
}, HandlePoloConfirmationResponse2)
.YieldSync(ContinuePoloConfirmation);
}
[Continuation]
public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage message)
public void HandlePoloConfirmationResponse1(PoloConfirmationResponseMessage message)
{
Console.WriteLine(">> HandlePoloConfirmationResponse (ending flow)");
Console.WriteLine(">> HandlePoloConfirmationResponse1");
Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!");
}
[Continuation]
public void HandlePoloConfirmationResponse2(PoloConfirmationResponseMessage message)
{
Console.WriteLine(">> HandlePoloConfirmationResponse2");
Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!");
}
private IYieldPoint ContinuePoloConfirmation()
{
Console.WriteLine("> ConvergePoloConfirmation (ending flow)");
return flowProvider.EndWithResponse(new PoloMessage());
}
@ -77,7 +96,6 @@ namespace Test
public void Polo(PoloMessage message)
{
Console.WriteLine(">> Polo");
StateTestGuid = Guid.NewGuid();
}
}

View File

@ -17,8 +17,9 @@ namespace Test
public async Task Run()
{
// await publisher.Publish(new MarcoMessage());
await publisher.Publish(new MarcoMessage());
/*
var concurrent = new SemaphoreSlim(20);
while (true)
@ -38,12 +39,12 @@ namespace Test
await Task.Delay(200);
}
*/
/*
while (true)
{
await Task.Delay(1000);
}*/
}
}
}
}

View File

@ -33,7 +33,11 @@ namespace Test
})
{
Console.WriteLine("Subscribing...");
connection.Subscribe().Wait();
var subscriber = connection.Subscribe(false).Result;
Console.WriteLine("Consuming...");
subscriber.Resume().Wait();
Console.WriteLine("Done!");
var emitter = container.GetInstance<MarcoEmitter>();

View File

@ -59,6 +59,10 @@
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti.Annotations\Tapeti.Annotations.csproj">
<Project>{c4897d64-d04e-4ae9-bd98-d64295d1d13a}</Project>
<Name>Tapeti.Annotations</Name>
</ProjectReference>
<ProjectReference Include="..\Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj">
<Project>{6de7b122-eb6a-46b8-aeaf-f84dde18f9c7}</Project>
<Name>Tapeti.Flow.SQL</Name>