diff --git a/Tapeti.Flow/Default/DelegateYieldPoint.cs b/Tapeti.Flow/Default/DelegateYieldPoint.cs index 984f4cd..2ed0926 100644 --- a/Tapeti.Flow/Default/DelegateYieldPoint.cs +++ b/Tapeti.Flow/Default/DelegateYieldPoint.cs @@ -3,16 +3,13 @@ using System.Threading.Tasks; namespace Tapeti.Flow.Default { - internal class DelegateYieldPoint : IExecutableYieldPoint + internal class DelegateYieldPoint : IYieldPoint { - public bool StoreState { get; } - private readonly Func onExecute; - public DelegateYieldPoint(bool storeState, Func onExecute) + public DelegateYieldPoint(Func onExecute) { - StoreState = storeState; this.onExecute = onExecute; } diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index d7e279a..9053135 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -87,7 +87,7 @@ namespace Tapeti.Flow.Default private static Task HandleParallelResponse(IMessageContext context) { var flowHandler = context.DependencyResolver.Resolve(); - return flowHandler.Execute(context, new StateYieldPoint(true)); + return flowHandler.Execute(context, new DelegateYieldPoint((a) => Task.CompletedTask)); } diff --git a/Tapeti.Flow/Default/FlowCleanupMiddleware.cs b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs index e0420c8..811be59 100644 --- a/Tapeti.Flow/Default/FlowCleanupMiddleware.cs +++ b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs @@ -16,7 +16,7 @@ namespace Tapeti.Flow.Default return; var flowContext = (FlowContext)flowContextObj; - if (flowContext.FlowStateLock != null) + if (flowContext?.FlowStateLock != null) { if (response == ConsumeResponse.Nack) { diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs index 460d5b2..dbadf08 100644 --- a/Tapeti.Flow/Default/FlowContext.cs +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -13,13 +13,13 @@ namespace Tapeti.Flow.Default public Guid ContinuationID { get; set; } public ContinuationMetadata ContinuationMetadata { get; set; } - private bool stored; + private bool storeCalled; + private bool deleteCalled; - public async Task EnsureStored() + public async Task Store() { - if (stored) - return; + storeCalled = true; if (MessageContext == null) throw new ArgumentNullException(nameof(MessageContext)); if (FlowState == null) throw new ArgumentNullException(nameof(FlowState)); @@ -27,8 +27,20 @@ namespace Tapeti.Flow.Default FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(MessageContext.Controller); await FlowStateLock.StoreFlowState(FlowState); + } - stored = true; + public async Task Delete() + { + deleteCalled = true; + + if (FlowStateLock != null) + await FlowStateLock.DeleteFlowState(); + } + + public void EnsureStoreOrDeleteIsCalled() + { + if (!storeCalled && !deleteCalled) + throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID); } public void Dispose() diff --git a/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs b/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs index 1d327ae..8c66e9d 100644 --- a/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs +++ b/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs @@ -39,8 +39,6 @@ namespace Tapeti.Flow.Default return null; var flowStateLock = await flowStore.LockFlowState(flowID.Value); - if (flowStateLock == null) - return null; var flowState = await flowStateLock.GetFlowState(); if (flowState == null) diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 88bc9f6..308cb3a 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -26,13 +26,13 @@ namespace Tapeti.Flow.Default public IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); - return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo)); + return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); - return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo)); + return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } public IFlowParallelRequestBuilder YieldWithParallelRequest() @@ -42,18 +42,23 @@ namespace Tapeti.Flow.Default public IYieldPoint EndWithResponse(TResponse message) { - return new DelegateYieldPoint(false, context => SendResponse(context, message)); + return new DelegateYieldPoint(context => SendResponse(context, message)); } public IYieldPoint End() { - return new DelegateYieldPoint(false, EndFlow); + return new DelegateYieldPoint(EndFlow); } private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, string convergeMethodName = null, bool convergeMethodTaskSync = false) { + if (context.FlowState == null) + { + await CreateNewFlowState(context); + } + var continuationID = Guid.NewGuid(); context.FlowState.Continuations.Add(continuationID, @@ -70,14 +75,18 @@ namespace Tapeti.Flow.Default ReplyTo = responseHandlerInfo.ReplyToQueue }; - await context.EnsureStored(); + await context.Store(); + await publisher.Publish(message, properties); } private async Task SendResponse(FlowContext context, object message) { - var reply = context.FlowState.Metadata.Reply; + var reply = context.FlowState == null + ? GetReply(context.MessageContext) + : context.FlowState.Metadata.Reply; + if (reply == null) throw new YieldPointException("No response is required"); @@ -92,19 +101,21 @@ namespace Tapeti.Flow.Default properties.CorrelationId = reply.CorrelationId; // TODO disallow if replyto is not specified? - if (context.FlowState.Metadata.Reply.ReplyTo != null) + if (reply.ReplyTo != null) await publisher.PublishDirect(message, reply.ReplyTo, properties); else await publisher.Publish(message, properties); + + await context.Delete(); } - private static Task EndFlow(FlowContext context) + private static async Task EndFlow(FlowContext context) { - if (context.FlowState.Metadata.Reply != null) - throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); + await context.Delete(); - return Task.CompletedTask; + if (context.FlowState != null && context.FlowState.Metadata.Reply != null) + throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); } @@ -147,11 +158,31 @@ namespace Tapeti.Flow.Default }; } + private async Task CreateNewFlowState(FlowContext flowContext) + { + var flowStore = flowContext.MessageContext.DependencyResolver.Resolve(); + + var flowID = Guid.NewGuid(); + flowContext.FlowStateLock = await flowStore.LockFlowState(flowID); + + if (flowContext.FlowStateLock == null) + throw new InvalidOperationException("Unable to lock a new flow"); + + flowContext.FlowState = new FlowState + { + Metadata = new FlowMetadata + { + Reply = GetReply(flowContext.MessageContext) + } + }; + } public async Task Execute(IMessageContext context, IYieldPoint yieldPoint) { - var executableYieldPoint = yieldPoint as IExecutableYieldPoint; - var storeState = executableYieldPoint?.StoreState ?? false; + var executableYieldPoint = yieldPoint as DelegateYieldPoint; + + if (executableYieldPoint == null) + throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Binding.Method.Name}"); FlowContext flowContext; object flowContextItem; @@ -160,27 +191,10 @@ namespace Tapeti.Flow.Default { flowContext = new FlowContext { - MessageContext = context, - FlowState = new FlowState() + MessageContext = context }; - if (storeState) - { - // Initiate the flow - var flowStore = context.DependencyResolver.Resolve(); - - var flowID = Guid.NewGuid(); - flowContext.FlowStateLock = await flowStore.LockFlowState(flowID); - - if (flowContext.FlowStateLock == null) - throw new InvalidOperationException("Unable to lock a new flow"); - - flowContext.FlowState = await flowContext.FlowStateLock.GetFlowState(); - if (flowContext.FlowState == null) - throw new InvalidOperationException("Unable to get state for new flow"); - - flowContext.FlowState.Metadata.Reply = GetReply(context); - } + context.Items.Add(ContextItems.FlowContext, flowContext); } else flowContext = (FlowContext)flowContextItem; @@ -193,19 +207,17 @@ namespace Tapeti.Flow.Default } catch (YieldPointException e) { - var controllerName = flowContext.MessageContext.Controller.GetType().FullName; - var methodName = flowContext.MessageContext.Binding.Method.Name; - - throw new YieldPointException($"{e.Message} in controller {controllerName}, method {methodName}", e); + // Useful for debugging + e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName; + e.Data["Tapeti.Controller.Method"] = context.Binding.Method.Name; + throw; } - if (storeState) - await flowContext.EnsureStored(); - else if (flowContext.FlowStateLock != null) - await flowContext.FlowStateLock.DeleteFlowState(); + flowContext.EnsureStoreOrDeleteIsCalled(); } + private class ParallelRequestBuilder : IFlowParallelRequestBuilder { public delegate Task SendRequestFunc(FlowContext context, @@ -275,7 +287,7 @@ namespace Tapeti.Flow.Default if (convergeMethod?.Method == null) throw new ArgumentNullException(nameof(convergeMethod)); - return new DelegateYieldPoint(true, context => + return new DelegateYieldPoint(context => { if (convergeMethod.Method.DeclaringType != context.MessageContext.Controller.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); diff --git a/Tapeti.Flow/Default/FlowStarter.cs b/Tapeti.Flow/Default/FlowStarter.cs index 215decb..d376f65 100644 --- a/Tapeti.Flow/Default/FlowStarter.cs +++ b/Tapeti.Flow/Default/FlowStarter.cs @@ -10,11 +10,13 @@ namespace Tapeti.Flow.Default public class FlowStarter : IFlowStarter { private readonly IConfig config; + private readonly ILogger logger; - public FlowStarter(IConfig config) + public FlowStarter(IConfig config, ILogger logger) { this.config = config; + this.logger = logger; } @@ -52,7 +54,32 @@ namespace Tapeti.Flow.Default }; var flowHandler = config.DependencyResolver.Resolve(); - await flowHandler.Execute(context, yieldPoint); + + ConsumeResponse response = ConsumeResponse.Nack; + try + { + await flowHandler.Execute(context, yieldPoint); + response = ConsumeResponse.Ack; + } + finally + { + await RunCleanup(context, response); + } + } + + private async Task RunCleanup(MessageContext context, ConsumeResponse response) + { + foreach (var handler in config.CleanupMiddleware) + { + try + { + await handler.Handle(context, response); + } + catch (Exception eCleanup) + { + logger.HandlerException(eCleanup); + } + } } diff --git a/Tapeti.Flow/Default/FlowState.cs b/Tapeti.Flow/Default/FlowState.cs index e1cb0cf..0ee3eec 100644 --- a/Tapeti.Flow/Default/FlowState.cs +++ b/Tapeti.Flow/Default/FlowState.cs @@ -25,20 +25,13 @@ namespace Tapeti.Flow.Default } - public void Assign(FlowState value) - { - Metadata = value.Metadata.Clone(); - Data = value.Data; - Continuations = value.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value.Clone()); - } - - public FlowState Clone() { - var result = new FlowState(); - result.Assign(this); - - return result; + return new FlowState { + metadata = metadata.Clone(), + Data = Data, + continuations = continuations?.ToDictionary(kv => kv.Key, kv => kv.Value.Clone()) + }; } } diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index b4e63ba..d2a6d87 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -5,16 +5,19 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using Tapeti.Flow.FlowHelpers; namespace Tapeti.Flow.Default { public class FlowStore : IFlowStore { - private static readonly ConcurrentDictionary FlowStates = new ConcurrentDictionary(); - private static readonly ConcurrentDictionary ContinuationLookup = new ConcurrentDictionary(); + private readonly ConcurrentDictionary FlowStates = new ConcurrentDictionary(); + private readonly ConcurrentDictionary ContinuationLookup = new ConcurrentDictionary(); + private readonly LockCollection Locks = new LockCollection(EqualityComparer.Default); private readonly IFlowRepository repository; + private volatile bool InUse = false; public FlowStore(IFlowRepository repository) { @@ -24,6 +27,11 @@ namespace Tapeti.Flow.Default public async Task Load() { + if (InUse) + throw new InvalidOperationException("Can only load the saved state once."); + + InUse = true; + FlowStates.Clear(); ContinuationLookup.Clear(); @@ -46,97 +54,76 @@ namespace Tapeti.Flow.Default public async Task LockFlowState(Guid flowID) { - var isNew = false; - var flowState = FlowStates.GetOrAdd(flowID, id => - { - isNew = true; - return new FlowState(); - }); + InUse = true; - var result = new FlowStateLock(this, flowState, flowID, isNew); - await result.Lock(); - - return result; + var flowStatelock = new FlowStateLock(this, flowID, await Locks.GetLock(flowID)); + return flowStatelock; } - private class FlowStateLock : IFlowStateLock { - private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1); - private readonly FlowStore owner; - private readonly FlowState flowState; private readonly Guid flowID; - private bool isNew; - private bool isDisposed; + private volatile IDisposable flowLock; + private FlowState flowState; - public FlowStateLock(FlowStore owner, FlowState flowState, Guid flowID, bool isNew) + public FlowStateLock(FlowStore owner, Guid flowID, IDisposable flowLock) { this.owner = owner; - this.flowState = flowState; this.flowID = flowID; - this.isNew = isNew; + this.flowLock = flowLock; + + owner.FlowStates.TryGetValue(flowID, out flowState); } - - public Task Lock() - { - return semaphore.WaitAsync(); - } - - public void Dispose() { - lock (flowState) - { - if (!isDisposed) - { - semaphore.Release(); - semaphore.Dispose(); - } - - isDisposed = true; - } + var l = flowLock; + flowLock = null; + l?.Dispose(); } public Guid FlowID => flowID; public Task GetFlowState() { - lock (flowState) - { - if (isDisposed) - throw new ObjectDisposedException("FlowStateLock"); + if (flowLock == null) + throw new ObjectDisposedException("FlowStateLock"); - return Task.FromResult(flowState.Clone()); - } + return Task.FromResult(flowState?.Clone()); } public async Task StoreFlowState(FlowState newFlowState) { - lock (flowState) - { - if (isDisposed) - throw new ObjectDisposedException("FlowStateLock"); + if (flowLock == null) + throw new ObjectDisposedException("FlowStateLock"); + // Ensure no one has a direct reference to the protected state in the dictionary + newFlowState = newFlowState.Clone(); + + // Update the lookup dictionary for the ContinuationIDs + if (flowState != null) + { foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k))) { Guid removedValue; - ContinuationLookup.TryRemove(removedContinuation, out removedValue); + owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue); } - - foreach (var addedContinuation in newFlowState.Continuations.Where(c => !flowState.Continuations.ContainsKey(c.Key))) - { - ContinuationLookup.TryAdd(addedContinuation.Key, flowID); - } - - flowState.Assign(newFlowState); } + foreach (var addedContinuation in newFlowState.Continuations.Where(c => flowState == null || !flowState.Continuations.ContainsKey(c.Key))) + { + owner.ContinuationLookup.TryAdd(addedContinuation.Key, flowID); + } + + var isNew = flowState == null; + flowState = newFlowState; + owner.FlowStates[flowID] = newFlowState; + + // Storing the flowstate in the underlying repository if (isNew) { - isNew = false; var now = DateTime.UtcNow; await owner.repository.CreateState(flowID, flowState, now); } @@ -148,50 +135,27 @@ namespace Tapeti.Flow.Default public async Task DeleteFlowState() { - lock (flowState) - { - if (isDisposed) - throw new ObjectDisposedException("FlowStateLock"); + if (flowLock == null) + throw new ObjectDisposedException("FlowStateLock"); + if (flowState != null) + { foreach (var removedContinuation in flowState.Continuations.Keys) { Guid removedValue; - ContinuationLookup.TryRemove(removedContinuation, out removedValue); + owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue); } FlowState removedFlow; - FlowStates.TryRemove(flowID, out removedFlow); + owner.FlowStates.TryRemove(flowID, out removedFlow); + + if (flowState != null) + { + flowState = null; + await owner.repository.DeleteState(flowID); + } } - - if (!isNew) - await owner.repository.DeleteState(flowID); } } - - - private static FlowStateRecord ToFlowStateRecord(Guid flowID, FlowState flowState) - { - return new FlowStateRecord - { - FlowID = flowID, - Metadata = JsonConvert.SerializeObject(flowState.Metadata), - Data = flowState.Data, - ContinuationMetadata = flowState.Continuations.ToDictionary( - kv => kv.Key, - kv => JsonConvert.SerializeObject(kv.Value)) - }; - } - - private static FlowState ToFlowState(FlowStateRecord flowStateRecord) - { - return new FlowState - { - Metadata = JsonConvert.DeserializeObject(flowStateRecord.Metadata), - Data = flowStateRecord.Data, - Continuations = flowStateRecord.ContinuationMetadata.ToDictionary( - kv => kv.Key, - kv => JsonConvert.DeserializeObject(kv.Value)) - }; - } } } diff --git a/Tapeti.Flow/Default/IExecutableYieldPoint.cs b/Tapeti.Flow/Default/IExecutableYieldPoint.cs deleted file mode 100644 index 80db564..0000000 --- a/Tapeti.Flow/Default/IExecutableYieldPoint.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System.Threading.Tasks; - -namespace Tapeti.Flow.Default -{ - internal interface IExecutableYieldPoint : IYieldPoint - { - bool StoreState { get; } - Task Execute(FlowContext context); - } -} diff --git a/Tapeti.Flow/Default/StateYieldPoint.cs b/Tapeti.Flow/Default/StateYieldPoint.cs deleted file mode 100644 index 521c2fc..0000000 --- a/Tapeti.Flow/Default/StateYieldPoint.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System.Threading.Tasks; - -namespace Tapeti.Flow.Default -{ - internal class StateYieldPoint : IExecutableYieldPoint - { - public bool StoreState { get; } - - - public StateYieldPoint(bool storeState) - { - StoreState = storeState; - } - - - public async Task Execute(FlowContext context) - { - if (StoreState) - await context.EnsureStored(); - } - } -} diff --git a/Tapeti.Flow/FlowHelpers/LockCollection.cs b/Tapeti.Flow/FlowHelpers/LockCollection.cs new file mode 100644 index 0000000..f8a3533 --- /dev/null +++ b/Tapeti.Flow/FlowHelpers/LockCollection.cs @@ -0,0 +1,110 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti.Flow.FlowHelpers +{ + public class LockCollection + { + private readonly Dictionary locks; + + public LockCollection(IEqualityComparer comparer) + { + locks = new Dictionary(comparer); + } + + public Task GetLock(T key) + { + LockItem nextLi = new LockItem(locks, key); + try + { + bool continueImmediately = false; + lock (locks) + { + LockItem li; + if (!locks.TryGetValue(key, out li)) + { + locks.Add(key, nextLi); + continueImmediately = true; + } + else + { + while (li.Next != null) + li = li.Next; + + li.Next = nextLi; + } + } + if (continueImmediately) + nextLi.Continue(); + } + catch (Exception e) + { + nextLi.Error(e); + } + return nextLi.GetTask(); + } + + private class LockItem : IDisposable + { + internal volatile LockItem Next; + + private readonly Dictionary locks; + private readonly TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly T key; + + public LockItem(Dictionary locks, T key) + { + this.locks = locks; + this.key = key; + } + + internal void Continue() + { + tcs.TrySetResult(this); + } + + internal void Error(Exception e) + { + tcs.SetException(e); + } + + internal Task GetTask() + { + return tcs.Task; + } + + public void Dispose() + { + lock (locks) + { + LockItem li; + if (!locks.TryGetValue(key, out li)) + return; + + if (li != this) + { + // Something is wrong (comparer is not stable?), but we cannot loose the completions sources + while (li.Next != null) + li = li.Next; + li.Next = Next; + return; + } + + if (Next == null) + { + locks.Remove(key); + return; + } + + locks[key] = Next; + } + + Next.Continue(); + } + } + } + +} diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowMiddleware.cs index fb13d90..751374c 100644 --- a/Tapeti.Flow/FlowMiddleware.cs +++ b/Tapeti.Flow/FlowMiddleware.cs @@ -18,8 +18,8 @@ namespace Tapeti.Flow container.RegisterDefault(); container.RegisterDefault(); container.RegisterDefault(); - container.RegisterDefault(() => flowRepository ?? new NonPersistentFlowRepository()); - container.RegisterDefault(); + container.RegisterDefaultSingleton(() => flowRepository ?? new NonPersistentFlowRepository()); + container.RegisterDefaultSingleton(); } public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index 5a4b9fd..b4cb237 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -61,11 +61,10 @@ - - + diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index a3a65f4..4e631b2 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -19,6 +19,7 @@ namespace Test // Public properties are automatically stored and retrieved while in a flow public Guid StateTestGuid { get; set; } + public int Phase; public MarcoController(IPublisher publisher, IFlowProvider flowProvider, Visualizer visualizer) { @@ -31,15 +32,17 @@ namespace Test [Start] public async Task StartFlow(bool go) { - Console.WriteLine("Starting stand-alone flow"); - await Task.Delay(1000); + Console.WriteLine("Phase = " + Phase + " Starting stand-alone flow"); + await Task.Delay(10); + + Phase = 1; if (go) return flowProvider.YieldWithRequestSync (new PoloConfirmationRequestMessage(), HandlePoloConfirmationResponse); - Console.WriteLine("Ending stand-alone flow prematurely"); + Console.WriteLine("Phase = " + Phase + " Ending stand-alone flow prematurely"); return flowProvider.End(); } @@ -47,7 +50,21 @@ namespace Test [Continuation] public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage msg) { - Console.WriteLine("Ending stand-alone flow"); + Console.WriteLine("Phase = " + Phase + " Handling the first response and sending the second..."); + + Phase = 2; + + return flowProvider.YieldWithRequestSync + (new PoloConfirmationRequestMessage(), + HandlePoloConfirmationResponseEnd); + + } + + + [Continuation] + public IYieldPoint HandlePoloConfirmationResponseEnd(PoloConfirmationResponseMessage msg) + { + Console.WriteLine("Phase = " + Phase + " Handling the second response and Ending stand-alone flow"); return flowProvider.End(); } diff --git a/Test/MarcoEmitter.cs b/Test/MarcoEmitter.cs index 7298937..66a4a75 100644 --- a/Test/MarcoEmitter.cs +++ b/Test/MarcoEmitter.cs @@ -17,7 +17,7 @@ namespace Test public async Task Run() { - await publisher.Publish(new MarcoMessage()); + //await publisher.Publish(new MarcoMessage()); /* var concurrent = new SemaphoreSlim(20); diff --git a/Test/Program.cs b/Test/Program.cs index dc9fef8..9a71579 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -7,6 +7,7 @@ using Tapeti.Flow; using Tapeti.Flow.SQL; using Tapeti.Helpers; using Tapeti.SimpleInjector; +using System.Threading; namespace Test { @@ -36,6 +37,11 @@ namespace Test Params = new TapetiAppSettingsConnectionParams() }) { + var flowStore = container.GetInstance(); + var flowStore2 = container.GetInstance(); + + Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2)); + connection.Connected += (sender, e) => { Console.WriteLine("Event Connected"); }; @@ -58,6 +64,8 @@ namespace Test container.GetInstance().Start(c => c.StartFlow, true); + Thread.Sleep(1000); + var emitter = container.GetInstance(); emitter.Run().Wait();