RDB-46 Volgorde bug in Flow Library
Refactoring van de FlowStore en de FlowPovider met de oplossing voor de bug
This commit is contained in:
parent
2b56d0130e
commit
ef0268d9ac
@ -3,16 +3,13 @@ using System.Threading.Tasks;
|
||||
|
||||
namespace Tapeti.Flow.Default
|
||||
{
|
||||
internal class DelegateYieldPoint : IExecutableYieldPoint
|
||||
internal class DelegateYieldPoint : IYieldPoint
|
||||
{
|
||||
public bool StoreState { get; }
|
||||
|
||||
private readonly Func<FlowContext, Task> onExecute;
|
||||
|
||||
|
||||
public DelegateYieldPoint(bool storeState, Func<FlowContext, Task> onExecute)
|
||||
public DelegateYieldPoint(Func<FlowContext, Task> onExecute)
|
||||
{
|
||||
StoreState = storeState;
|
||||
this.onExecute = onExecute;
|
||||
}
|
||||
|
||||
|
@ -87,7 +87,7 @@ namespace Tapeti.Flow.Default
|
||||
private static Task HandleParallelResponse(IMessageContext context)
|
||||
{
|
||||
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
|
||||
return flowHandler.Execute(context, new StateYieldPoint(true));
|
||||
return flowHandler.Execute(context, new DelegateYieldPoint((a) => Task.CompletedTask));
|
||||
}
|
||||
|
||||
|
||||
|
@ -16,7 +16,7 @@ namespace Tapeti.Flow.Default
|
||||
return;
|
||||
var flowContext = (FlowContext)flowContextObj;
|
||||
|
||||
if (flowContext.FlowStateLock != null)
|
||||
if (flowContext?.FlowStateLock != null)
|
||||
{
|
||||
if (response == ConsumeResponse.Nack)
|
||||
{
|
||||
|
@ -13,13 +13,13 @@ namespace Tapeti.Flow.Default
|
||||
public Guid ContinuationID { get; set; }
|
||||
public ContinuationMetadata ContinuationMetadata { get; set; }
|
||||
|
||||
private bool stored;
|
||||
private bool storeCalled;
|
||||
private bool deleteCalled;
|
||||
|
||||
|
||||
public async Task EnsureStored()
|
||||
public async Task Store()
|
||||
{
|
||||
if (stored)
|
||||
return;
|
||||
storeCalled = true;
|
||||
|
||||
if (MessageContext == null) throw new ArgumentNullException(nameof(MessageContext));
|
||||
if (FlowState == null) throw new ArgumentNullException(nameof(FlowState));
|
||||
@ -27,8 +27,20 @@ namespace Tapeti.Flow.Default
|
||||
|
||||
FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(MessageContext.Controller);
|
||||
await FlowStateLock.StoreFlowState(FlowState);
|
||||
}
|
||||
|
||||
stored = true;
|
||||
public async Task Delete()
|
||||
{
|
||||
deleteCalled = true;
|
||||
|
||||
if (FlowStateLock != null)
|
||||
await FlowStateLock.DeleteFlowState();
|
||||
}
|
||||
|
||||
public void EnsureStoreOrDeleteIsCalled()
|
||||
{
|
||||
if (!storeCalled && !deleteCalled)
|
||||
throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
|
@ -39,8 +39,6 @@ namespace Tapeti.Flow.Default
|
||||
return null;
|
||||
|
||||
var flowStateLock = await flowStore.LockFlowState(flowID.Value);
|
||||
if (flowStateLock == null)
|
||||
return null;
|
||||
|
||||
var flowState = await flowStateLock.GetFlowState();
|
||||
if (flowState == null)
|
||||
|
@ -26,13 +26,13 @@ namespace Tapeti.Flow.Default
|
||||
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler)
|
||||
{
|
||||
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
|
||||
return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo));
|
||||
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
|
||||
}
|
||||
|
||||
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler)
|
||||
{
|
||||
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
|
||||
return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo));
|
||||
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
|
||||
}
|
||||
|
||||
public IFlowParallelRequestBuilder YieldWithParallelRequest()
|
||||
@ -42,18 +42,23 @@ namespace Tapeti.Flow.Default
|
||||
|
||||
public IYieldPoint EndWithResponse<TResponse>(TResponse message)
|
||||
{
|
||||
return new DelegateYieldPoint(false, context => SendResponse(context, message));
|
||||
return new DelegateYieldPoint(context => SendResponse(context, message));
|
||||
}
|
||||
|
||||
public IYieldPoint End()
|
||||
{
|
||||
return new DelegateYieldPoint(false, EndFlow);
|
||||
return new DelegateYieldPoint(EndFlow);
|
||||
}
|
||||
|
||||
|
||||
private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
|
||||
string convergeMethodName = null, bool convergeMethodTaskSync = false)
|
||||
{
|
||||
if (context.FlowState == null)
|
||||
{
|
||||
await CreateNewFlowState(context);
|
||||
}
|
||||
|
||||
var continuationID = Guid.NewGuid();
|
||||
|
||||
context.FlowState.Continuations.Add(continuationID,
|
||||
@ -70,14 +75,18 @@ namespace Tapeti.Flow.Default
|
||||
ReplyTo = responseHandlerInfo.ReplyToQueue
|
||||
};
|
||||
|
||||
await context.EnsureStored();
|
||||
await context.Store();
|
||||
|
||||
await publisher.Publish(message, properties);
|
||||
}
|
||||
|
||||
|
||||
private async Task SendResponse(FlowContext context, object message)
|
||||
{
|
||||
var reply = context.FlowState.Metadata.Reply;
|
||||
var reply = context.FlowState == null
|
||||
? GetReply(context.MessageContext)
|
||||
: context.FlowState.Metadata.Reply;
|
||||
|
||||
if (reply == null)
|
||||
throw new YieldPointException("No response is required");
|
||||
|
||||
@ -92,19 +101,21 @@ namespace Tapeti.Flow.Default
|
||||
properties.CorrelationId = reply.CorrelationId;
|
||||
|
||||
// TODO disallow if replyto is not specified?
|
||||
if (context.FlowState.Metadata.Reply.ReplyTo != null)
|
||||
if (reply.ReplyTo != null)
|
||||
await publisher.PublishDirect(message, reply.ReplyTo, properties);
|
||||
else
|
||||
await publisher.Publish(message, properties);
|
||||
|
||||
await context.Delete();
|
||||
}
|
||||
|
||||
|
||||
private static Task EndFlow(FlowContext context)
|
||||
private static async Task EndFlow(FlowContext context)
|
||||
{
|
||||
if (context.FlowState.Metadata.Reply != null)
|
||||
throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}");
|
||||
await context.Delete();
|
||||
|
||||
return Task.CompletedTask;
|
||||
if (context.FlowState != null && context.FlowState.Metadata.Reply != null)
|
||||
throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}");
|
||||
}
|
||||
|
||||
|
||||
@ -147,11 +158,31 @@ namespace Tapeti.Flow.Default
|
||||
};
|
||||
}
|
||||
|
||||
private async Task CreateNewFlowState(FlowContext flowContext)
|
||||
{
|
||||
var flowStore = flowContext.MessageContext.DependencyResolver.Resolve<IFlowStore>();
|
||||
|
||||
var flowID = Guid.NewGuid();
|
||||
flowContext.FlowStateLock = await flowStore.LockFlowState(flowID);
|
||||
|
||||
if (flowContext.FlowStateLock == null)
|
||||
throw new InvalidOperationException("Unable to lock a new flow");
|
||||
|
||||
flowContext.FlowState = new FlowState
|
||||
{
|
||||
Metadata = new FlowMetadata
|
||||
{
|
||||
Reply = GetReply(flowContext.MessageContext)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public async Task Execute(IMessageContext context, IYieldPoint yieldPoint)
|
||||
{
|
||||
var executableYieldPoint = yieldPoint as IExecutableYieldPoint;
|
||||
var storeState = executableYieldPoint?.StoreState ?? false;
|
||||
var executableYieldPoint = yieldPoint as DelegateYieldPoint;
|
||||
|
||||
if (executableYieldPoint == null)
|
||||
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Binding.Method.Name}");
|
||||
|
||||
FlowContext flowContext;
|
||||
object flowContextItem;
|
||||
@ -160,27 +191,10 @@ namespace Tapeti.Flow.Default
|
||||
{
|
||||
flowContext = new FlowContext
|
||||
{
|
||||
MessageContext = context,
|
||||
FlowState = new FlowState()
|
||||
MessageContext = context
|
||||
};
|
||||
|
||||
if (storeState)
|
||||
{
|
||||
// Initiate the flow
|
||||
var flowStore = context.DependencyResolver.Resolve<IFlowStore>();
|
||||
|
||||
var flowID = Guid.NewGuid();
|
||||
flowContext.FlowStateLock = await flowStore.LockFlowState(flowID);
|
||||
|
||||
if (flowContext.FlowStateLock == null)
|
||||
throw new InvalidOperationException("Unable to lock a new flow");
|
||||
|
||||
flowContext.FlowState = await flowContext.FlowStateLock.GetFlowState();
|
||||
if (flowContext.FlowState == null)
|
||||
throw new InvalidOperationException("Unable to get state for new flow");
|
||||
|
||||
flowContext.FlowState.Metadata.Reply = GetReply(context);
|
||||
}
|
||||
context.Items.Add(ContextItems.FlowContext, flowContext);
|
||||
}
|
||||
else
|
||||
flowContext = (FlowContext)flowContextItem;
|
||||
@ -193,19 +207,17 @@ namespace Tapeti.Flow.Default
|
||||
}
|
||||
catch (YieldPointException e)
|
||||
{
|
||||
var controllerName = flowContext.MessageContext.Controller.GetType().FullName;
|
||||
var methodName = flowContext.MessageContext.Binding.Method.Name;
|
||||
|
||||
throw new YieldPointException($"{e.Message} in controller {controllerName}, method {methodName}", e);
|
||||
// Useful for debugging
|
||||
e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName;
|
||||
e.Data["Tapeti.Controller.Method"] = context.Binding.Method.Name;
|
||||
throw;
|
||||
}
|
||||
|
||||
if (storeState)
|
||||
await flowContext.EnsureStored();
|
||||
else if (flowContext.FlowStateLock != null)
|
||||
await flowContext.FlowStateLock.DeleteFlowState();
|
||||
flowContext.EnsureStoreOrDeleteIsCalled();
|
||||
}
|
||||
|
||||
|
||||
|
||||
private class ParallelRequestBuilder : IFlowParallelRequestBuilder
|
||||
{
|
||||
public delegate Task SendRequestFunc(FlowContext context,
|
||||
@ -275,7 +287,7 @@ namespace Tapeti.Flow.Default
|
||||
if (convergeMethod?.Method == null)
|
||||
throw new ArgumentNullException(nameof(convergeMethod));
|
||||
|
||||
return new DelegateYieldPoint(true, context =>
|
||||
return new DelegateYieldPoint(context =>
|
||||
{
|
||||
if (convergeMethod.Method.DeclaringType != context.MessageContext.Controller.GetType())
|
||||
throw new YieldPointException("Converge method must be in the same controller class");
|
||||
|
@ -10,11 +10,13 @@ namespace Tapeti.Flow.Default
|
||||
public class FlowStarter : IFlowStarter
|
||||
{
|
||||
private readonly IConfig config;
|
||||
private readonly ILogger logger;
|
||||
|
||||
|
||||
public FlowStarter(IConfig config)
|
||||
public FlowStarter(IConfig config, ILogger logger)
|
||||
{
|
||||
this.config = config;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
|
||||
@ -52,7 +54,32 @@ namespace Tapeti.Flow.Default
|
||||
};
|
||||
|
||||
var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>();
|
||||
await flowHandler.Execute(context, yieldPoint);
|
||||
|
||||
ConsumeResponse response = ConsumeResponse.Nack;
|
||||
try
|
||||
{
|
||||
await flowHandler.Execute(context, yieldPoint);
|
||||
response = ConsumeResponse.Ack;
|
||||
}
|
||||
finally
|
||||
{
|
||||
await RunCleanup(context, response);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task RunCleanup(MessageContext context, ConsumeResponse response)
|
||||
{
|
||||
foreach (var handler in config.CleanupMiddleware)
|
||||
{
|
||||
try
|
||||
{
|
||||
await handler.Handle(context, response);
|
||||
}
|
||||
catch (Exception eCleanup)
|
||||
{
|
||||
logger.HandlerException(eCleanup);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -25,20 +25,13 @@ namespace Tapeti.Flow.Default
|
||||
}
|
||||
|
||||
|
||||
public void Assign(FlowState value)
|
||||
{
|
||||
Metadata = value.Metadata.Clone();
|
||||
Data = value.Data;
|
||||
Continuations = value.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value.Clone());
|
||||
}
|
||||
|
||||
|
||||
public FlowState Clone()
|
||||
{
|
||||
var result = new FlowState();
|
||||
result.Assign(this);
|
||||
|
||||
return result;
|
||||
return new FlowState {
|
||||
metadata = metadata.Clone(),
|
||||
Data = Data,
|
||||
continuations = continuations?.ToDictionary(kv => kv.Key, kv => kv.Value.Clone())
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,16 +5,19 @@ using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Tapeti.Flow.FlowHelpers;
|
||||
|
||||
namespace Tapeti.Flow.Default
|
||||
{
|
||||
public class FlowStore : IFlowStore
|
||||
{
|
||||
private static readonly ConcurrentDictionary<Guid, FlowState> FlowStates = new ConcurrentDictionary<Guid, FlowState>();
|
||||
private static readonly ConcurrentDictionary<Guid, Guid> ContinuationLookup = new ConcurrentDictionary<Guid, Guid>();
|
||||
private readonly ConcurrentDictionary<Guid, FlowState> FlowStates = new ConcurrentDictionary<Guid, FlowState>();
|
||||
private readonly ConcurrentDictionary<Guid, Guid> ContinuationLookup = new ConcurrentDictionary<Guid, Guid>();
|
||||
private readonly LockCollection<Guid> Locks = new LockCollection<Guid>(EqualityComparer<Guid>.Default);
|
||||
|
||||
private readonly IFlowRepository repository;
|
||||
|
||||
private volatile bool InUse = false;
|
||||
|
||||
public FlowStore(IFlowRepository repository)
|
||||
{
|
||||
@ -24,6 +27,11 @@ namespace Tapeti.Flow.Default
|
||||
|
||||
public async Task Load()
|
||||
{
|
||||
if (InUse)
|
||||
throw new InvalidOperationException("Can only load the saved state once.");
|
||||
|
||||
InUse = true;
|
||||
|
||||
FlowStates.Clear();
|
||||
ContinuationLookup.Clear();
|
||||
|
||||
@ -46,97 +54,76 @@ namespace Tapeti.Flow.Default
|
||||
|
||||
public async Task<IFlowStateLock> LockFlowState(Guid flowID)
|
||||
{
|
||||
var isNew = false;
|
||||
var flowState = FlowStates.GetOrAdd(flowID, id =>
|
||||
{
|
||||
isNew = true;
|
||||
return new FlowState();
|
||||
});
|
||||
InUse = true;
|
||||
|
||||
var result = new FlowStateLock(this, flowState, flowID, isNew);
|
||||
await result.Lock();
|
||||
|
||||
return result;
|
||||
var flowStatelock = new FlowStateLock(this, flowID, await Locks.GetLock(flowID));
|
||||
return flowStatelock;
|
||||
}
|
||||
|
||||
|
||||
private class FlowStateLock : IFlowStateLock
|
||||
{
|
||||
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1);
|
||||
|
||||
private readonly FlowStore owner;
|
||||
private readonly FlowState flowState;
|
||||
private readonly Guid flowID;
|
||||
private bool isNew;
|
||||
private bool isDisposed;
|
||||
private volatile IDisposable flowLock;
|
||||
private FlowState flowState;
|
||||
|
||||
|
||||
public FlowStateLock(FlowStore owner, FlowState flowState, Guid flowID, bool isNew)
|
||||
public FlowStateLock(FlowStore owner, Guid flowID, IDisposable flowLock)
|
||||
{
|
||||
this.owner = owner;
|
||||
this.flowState = flowState;
|
||||
this.flowID = flowID;
|
||||
this.isNew = isNew;
|
||||
this.flowLock = flowLock;
|
||||
|
||||
owner.FlowStates.TryGetValue(flowID, out flowState);
|
||||
}
|
||||
|
||||
|
||||
public Task Lock()
|
||||
{
|
||||
return semaphore.WaitAsync();
|
||||
}
|
||||
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
lock (flowState)
|
||||
{
|
||||
if (!isDisposed)
|
||||
{
|
||||
semaphore.Release();
|
||||
semaphore.Dispose();
|
||||
}
|
||||
|
||||
isDisposed = true;
|
||||
}
|
||||
var l = flowLock;
|
||||
flowLock = null;
|
||||
l?.Dispose();
|
||||
}
|
||||
|
||||
public Guid FlowID => flowID;
|
||||
|
||||
public Task<FlowState> GetFlowState()
|
||||
{
|
||||
lock (flowState)
|
||||
{
|
||||
if (isDisposed)
|
||||
throw new ObjectDisposedException("FlowStateLock");
|
||||
if (flowLock == null)
|
||||
throw new ObjectDisposedException("FlowStateLock");
|
||||
|
||||
return Task.FromResult(flowState.Clone());
|
||||
}
|
||||
return Task.FromResult(flowState?.Clone());
|
||||
}
|
||||
|
||||
public async Task StoreFlowState(FlowState newFlowState)
|
||||
{
|
||||
lock (flowState)
|
||||
{
|
||||
if (isDisposed)
|
||||
throw new ObjectDisposedException("FlowStateLock");
|
||||
if (flowLock == null)
|
||||
throw new ObjectDisposedException("FlowStateLock");
|
||||
|
||||
// Ensure no one has a direct reference to the protected state in the dictionary
|
||||
newFlowState = newFlowState.Clone();
|
||||
|
||||
// Update the lookup dictionary for the ContinuationIDs
|
||||
if (flowState != null)
|
||||
{
|
||||
foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k)))
|
||||
{
|
||||
Guid removedValue;
|
||||
ContinuationLookup.TryRemove(removedContinuation, out removedValue);
|
||||
owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue);
|
||||
}
|
||||
|
||||
foreach (var addedContinuation in newFlowState.Continuations.Where(c => !flowState.Continuations.ContainsKey(c.Key)))
|
||||
{
|
||||
ContinuationLookup.TryAdd(addedContinuation.Key, flowID);
|
||||
}
|
||||
|
||||
flowState.Assign(newFlowState);
|
||||
}
|
||||
|
||||
foreach (var addedContinuation in newFlowState.Continuations.Where(c => flowState == null || !flowState.Continuations.ContainsKey(c.Key)))
|
||||
{
|
||||
owner.ContinuationLookup.TryAdd(addedContinuation.Key, flowID);
|
||||
}
|
||||
|
||||
var isNew = flowState == null;
|
||||
flowState = newFlowState;
|
||||
owner.FlowStates[flowID] = newFlowState;
|
||||
|
||||
// Storing the flowstate in the underlying repository
|
||||
if (isNew)
|
||||
{
|
||||
isNew = false;
|
||||
var now = DateTime.UtcNow;
|
||||
await owner.repository.CreateState(flowID, flowState, now);
|
||||
}
|
||||
@ -148,50 +135,27 @@ namespace Tapeti.Flow.Default
|
||||
|
||||
public async Task DeleteFlowState()
|
||||
{
|
||||
lock (flowState)
|
||||
{
|
||||
if (isDisposed)
|
||||
throw new ObjectDisposedException("FlowStateLock");
|
||||
if (flowLock == null)
|
||||
throw new ObjectDisposedException("FlowStateLock");
|
||||
|
||||
if (flowState != null)
|
||||
{
|
||||
foreach (var removedContinuation in flowState.Continuations.Keys)
|
||||
{
|
||||
Guid removedValue;
|
||||
ContinuationLookup.TryRemove(removedContinuation, out removedValue);
|
||||
owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue);
|
||||
}
|
||||
|
||||
FlowState removedFlow;
|
||||
FlowStates.TryRemove(flowID, out removedFlow);
|
||||
owner.FlowStates.TryRemove(flowID, out removedFlow);
|
||||
|
||||
if (flowState != null)
|
||||
{
|
||||
flowState = null;
|
||||
await owner.repository.DeleteState(flowID);
|
||||
}
|
||||
}
|
||||
|
||||
if (!isNew)
|
||||
await owner.repository.DeleteState(flowID);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static FlowStateRecord ToFlowStateRecord(Guid flowID, FlowState flowState)
|
||||
{
|
||||
return new FlowStateRecord
|
||||
{
|
||||
FlowID = flowID,
|
||||
Metadata = JsonConvert.SerializeObject(flowState.Metadata),
|
||||
Data = flowState.Data,
|
||||
ContinuationMetadata = flowState.Continuations.ToDictionary(
|
||||
kv => kv.Key,
|
||||
kv => JsonConvert.SerializeObject(kv.Value))
|
||||
};
|
||||
}
|
||||
|
||||
private static FlowState ToFlowState(FlowStateRecord flowStateRecord)
|
||||
{
|
||||
return new FlowState
|
||||
{
|
||||
Metadata = JsonConvert.DeserializeObject<FlowMetadata>(flowStateRecord.Metadata),
|
||||
Data = flowStateRecord.Data,
|
||||
Continuations = flowStateRecord.ContinuationMetadata.ToDictionary(
|
||||
kv => kv.Key,
|
||||
kv => JsonConvert.DeserializeObject<ContinuationMetadata>(kv.Value))
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +0,0 @@
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Tapeti.Flow.Default
|
||||
{
|
||||
internal interface IExecutableYieldPoint : IYieldPoint
|
||||
{
|
||||
bool StoreState { get; }
|
||||
Task Execute(FlowContext context);
|
||||
}
|
||||
}
|
@ -1,22 +0,0 @@
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Tapeti.Flow.Default
|
||||
{
|
||||
internal class StateYieldPoint : IExecutableYieldPoint
|
||||
{
|
||||
public bool StoreState { get; }
|
||||
|
||||
|
||||
public StateYieldPoint(bool storeState)
|
||||
{
|
||||
StoreState = storeState;
|
||||
}
|
||||
|
||||
|
||||
public async Task Execute(FlowContext context)
|
||||
{
|
||||
if (StoreState)
|
||||
await context.EnsureStored();
|
||||
}
|
||||
}
|
||||
}
|
110
Tapeti.Flow/FlowHelpers/LockCollection.cs
Normal file
110
Tapeti.Flow/FlowHelpers/LockCollection.cs
Normal file
@ -0,0 +1,110 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Tapeti.Flow.FlowHelpers
|
||||
{
|
||||
public class LockCollection<T>
|
||||
{
|
||||
private readonly Dictionary<T, LockItem> locks;
|
||||
|
||||
public LockCollection(IEqualityComparer<T> comparer)
|
||||
{
|
||||
locks = new Dictionary<T, LockItem>(comparer);
|
||||
}
|
||||
|
||||
public Task<IDisposable> GetLock(T key)
|
||||
{
|
||||
LockItem nextLi = new LockItem(locks, key);
|
||||
try
|
||||
{
|
||||
bool continueImmediately = false;
|
||||
lock (locks)
|
||||
{
|
||||
LockItem li;
|
||||
if (!locks.TryGetValue(key, out li))
|
||||
{
|
||||
locks.Add(key, nextLi);
|
||||
continueImmediately = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
while (li.Next != null)
|
||||
li = li.Next;
|
||||
|
||||
li.Next = nextLi;
|
||||
}
|
||||
}
|
||||
if (continueImmediately)
|
||||
nextLi.Continue();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
nextLi.Error(e);
|
||||
}
|
||||
return nextLi.GetTask();
|
||||
}
|
||||
|
||||
private class LockItem : IDisposable
|
||||
{
|
||||
internal volatile LockItem Next;
|
||||
|
||||
private readonly Dictionary<T, LockItem> locks;
|
||||
private readonly TaskCompletionSource<IDisposable> tcs = new TaskCompletionSource<IDisposable>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
private readonly T key;
|
||||
|
||||
public LockItem(Dictionary<T, LockItem> locks, T key)
|
||||
{
|
||||
this.locks = locks;
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
internal void Continue()
|
||||
{
|
||||
tcs.TrySetResult(this);
|
||||
}
|
||||
|
||||
internal void Error(Exception e)
|
||||
{
|
||||
tcs.SetException(e);
|
||||
}
|
||||
|
||||
internal Task<IDisposable> GetTask()
|
||||
{
|
||||
return tcs.Task;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
lock (locks)
|
||||
{
|
||||
LockItem li;
|
||||
if (!locks.TryGetValue(key, out li))
|
||||
return;
|
||||
|
||||
if (li != this)
|
||||
{
|
||||
// Something is wrong (comparer is not stable?), but we cannot loose the completions sources
|
||||
while (li.Next != null)
|
||||
li = li.Next;
|
||||
li.Next = Next;
|
||||
return;
|
||||
}
|
||||
|
||||
if (Next == null)
|
||||
{
|
||||
locks.Remove(key);
|
||||
return;
|
||||
}
|
||||
|
||||
locks[key] = Next;
|
||||
}
|
||||
|
||||
Next.Continue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -18,8 +18,8 @@ namespace Tapeti.Flow
|
||||
container.RegisterDefault<IFlowProvider, FlowProvider>();
|
||||
container.RegisterDefault<IFlowStarter, FlowStarter>();
|
||||
container.RegisterDefault<IFlowHandler, FlowProvider>();
|
||||
container.RegisterDefault<IFlowRepository>(() => flowRepository ?? new NonPersistentFlowRepository());
|
||||
container.RegisterDefault<IFlowStore, FlowStore>();
|
||||
container.RegisterDefaultSingleton<IFlowRepository>(() => flowRepository ?? new NonPersistentFlowRepository());
|
||||
container.RegisterDefaultSingleton<IFlowStore, FlowStore>();
|
||||
}
|
||||
|
||||
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
|
||||
|
@ -61,11 +61,10 @@
|
||||
<Compile Include="Default\FlowMessageMiddleware.cs" />
|
||||
<Compile Include="Default\FlowStarter.cs" />
|
||||
<Compile Include="Default\FlowState.cs" />
|
||||
<Compile Include="Default\IExecutableYieldPoint.cs" />
|
||||
<Compile Include="Default\NonPersistentFlowRepository.cs" />
|
||||
<Compile Include="Default\DelegateYieldPoint.cs" />
|
||||
<Compile Include="ConfigExtensions.cs" />
|
||||
<Compile Include="Default\StateYieldPoint.cs" />
|
||||
<Compile Include="FlowHelpers\LockCollection.cs" />
|
||||
<Compile Include="FlowHelpers\MethodSerializer.cs" />
|
||||
<Compile Include="FlowMiddleware.cs" />
|
||||
<Compile Include="Default\FlowStore.cs" />
|
||||
|
@ -19,6 +19,7 @@ namespace Test
|
||||
// Public properties are automatically stored and retrieved while in a flow
|
||||
public Guid StateTestGuid { get; set; }
|
||||
|
||||
public int Phase;
|
||||
|
||||
public MarcoController(IPublisher publisher, IFlowProvider flowProvider, Visualizer visualizer)
|
||||
{
|
||||
@ -31,15 +32,17 @@ namespace Test
|
||||
[Start]
|
||||
public async Task<IYieldPoint> StartFlow(bool go)
|
||||
{
|
||||
Console.WriteLine("Starting stand-alone flow");
|
||||
await Task.Delay(1000);
|
||||
Console.WriteLine("Phase = " + Phase + " Starting stand-alone flow");
|
||||
await Task.Delay(10);
|
||||
|
||||
Phase = 1;
|
||||
|
||||
if (go)
|
||||
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>
|
||||
(new PoloConfirmationRequestMessage(),
|
||||
HandlePoloConfirmationResponse);
|
||||
|
||||
Console.WriteLine("Ending stand-alone flow prematurely");
|
||||
Console.WriteLine("Phase = " + Phase + " Ending stand-alone flow prematurely");
|
||||
return flowProvider.End();
|
||||
}
|
||||
|
||||
@ -47,7 +50,21 @@ namespace Test
|
||||
[Continuation]
|
||||
public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage msg)
|
||||
{
|
||||
Console.WriteLine("Ending stand-alone flow");
|
||||
Console.WriteLine("Phase = " + Phase + " Handling the first response and sending the second...");
|
||||
|
||||
Phase = 2;
|
||||
|
||||
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>
|
||||
(new PoloConfirmationRequestMessage(),
|
||||
HandlePoloConfirmationResponseEnd);
|
||||
|
||||
}
|
||||
|
||||
|
||||
[Continuation]
|
||||
public IYieldPoint HandlePoloConfirmationResponseEnd(PoloConfirmationResponseMessage msg)
|
||||
{
|
||||
Console.WriteLine("Phase = " + Phase + " Handling the second response and Ending stand-alone flow");
|
||||
return flowProvider.End();
|
||||
}
|
||||
|
||||
|
@ -17,7 +17,7 @@ namespace Test
|
||||
|
||||
public async Task Run()
|
||||
{
|
||||
await publisher.Publish(new MarcoMessage());
|
||||
//await publisher.Publish(new MarcoMessage());
|
||||
|
||||
/*
|
||||
var concurrent = new SemaphoreSlim(20);
|
||||
|
@ -7,6 +7,7 @@ using Tapeti.Flow;
|
||||
using Tapeti.Flow.SQL;
|
||||
using Tapeti.Helpers;
|
||||
using Tapeti.SimpleInjector;
|
||||
using System.Threading;
|
||||
|
||||
namespace Test
|
||||
{
|
||||
@ -36,6 +37,11 @@ namespace Test
|
||||
Params = new TapetiAppSettingsConnectionParams()
|
||||
})
|
||||
{
|
||||
var flowStore = container.GetInstance<IFlowStore>();
|
||||
var flowStore2 = container.GetInstance<IFlowStore>();
|
||||
|
||||
Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2));
|
||||
|
||||
connection.Connected += (sender, e) => {
|
||||
Console.WriteLine("Event Connected");
|
||||
};
|
||||
@ -58,6 +64,8 @@ namespace Test
|
||||
|
||||
container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true);
|
||||
|
||||
Thread.Sleep(1000);
|
||||
|
||||
var emitter = container.GetInstance<MarcoEmitter>();
|
||||
emitter.Run().Wait();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user