diff --git a/Publish.ps1 b/Publish.ps1 deleted file mode 100644 index 7646e36..0000000 --- a/Publish.ps1 +++ /dev/null @@ -1,55 +0,0 @@ -param([switch]$nopush) - - -function pack -{ - param([string]$project) - - Write-Host "Packing $($project).csproj" -Foreground Blue - NuGet.exe pack "$($project)\$($project).csproj" -Build -OutputDir publish -Version "$($version.NuGetVersion)" -Properties depversion="$($version.NuGetVersion)" -} - - -function push -{ - param([string]$project) - - Write-Host "Pushing $($project).csproj" -Foreground Blue - NuGet.exe push "publish\X2Software.$($project).$($version.NuGetVersion).nupkg" -apikey "$($nugetkey)" -Source https://www.nuget.org/api/v2/package -} - - -$projects = @( - "Tapeti.Annotations", - "Tapeti", - "Tapeti.DataAnnotations", - "Tapeti.Flow", - "Tapeti.SimpleInjector" -) - - -New-Item -Path publish -Type directory -Force | Out-Null - -$version = GitVersion.exe | Out-String | ConvertFrom-Json -$nugetkey = Get-Content .nuget.apikey - - -Write-Host "Publishing version $($version.NuGetVersion) using API key $($nugetkey)"-Foreground Cyan - -foreach ($project in $projects) -{ - pack($project) -} - - -if ($nopush -eq $false) -{ - foreach ($project in $projects) - { - push($project) - } -} -else -{ - Write-Host "Skipping push" -Foreground Blue -} \ No newline at end of file diff --git a/README.md b/README.md index 4d31212..c35e158 100644 --- a/README.md +++ b/README.md @@ -6,3 +6,14 @@ The documentation for Tapeti is available on Read the Docs: [Master branch](http://tapeti.readthedocs.io/en/stable/)
[![Documentation Status](https://readthedocs.org/projects/tapeti/badge/?version=stable)](http://tapeti.readthedocs.io/en/stable/?badge=stable) + + +## Builds +Builds are automatically run using AppVeyor, with the resulting packages being pushed to NuGet. + + +Latest build +[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti) + +Master build +[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x/branch/master?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti/branch/master) diff --git a/Tapeti.Annotations/Tapeti.Annotations.nuspec b/Tapeti.Annotations/Tapeti.Annotations.nuspec index acceaf2..d2684ac 100644 --- a/Tapeti.Annotations/Tapeti.Annotations.nuspec +++ b/Tapeti.Annotations/Tapeti.Annotations.nuspec @@ -1,17 +1,17 @@ - - - - X2Software.Tapeti.Annotations - $version$ - $title$ - Mark van Renswoude - Mark van Renswoude - https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE - https://git.x2software.net/pub/tapeti - https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.Annotations.png - false - Annotations for Tapeti - - rabbitmq tapeti - + + + + X2Software.Tapeti.Annotations + $version$ + Tapeti Annotations + Mark van Renswoude + Mark van Renswoude + https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE + https://github.com/MvRens/Tapeti + https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Annotations.png + false + Annotations for Tapeti + + rabbitmq tapeti + \ No newline at end of file diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.nuspec b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.nuspec index a1b9042..571c3a5 100644 --- a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.nuspec +++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.nuspec @@ -1,20 +1,20 @@ - - - - X2Software.Tapeti.DataAnnotations - $version$ - $title$ - Mark van Renswoude - Mark van Renswoude - https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE - https://git.x2software.net/pub/tapeti - https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.DataAnnotations.png - false - DataAnnotations validation extension for Tapeti - - rabbitmq tapeti dataannotations - - - - + + + + X2Software.Tapeti.DataAnnotations + $version$ + Tapeti DataAnnotations + Mark van Renswoude + Mark van Renswoude + https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE + https://github.com/MvRens/Tapeti + https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.DataAnnotations.png + false + DataAnnotations validation extension for Tapeti + + rabbitmq tapeti dataannotations + + + + \ No newline at end of file diff --git a/Tapeti.Flow.SQL/ConfigExtensions.cs b/Tapeti.Flow.SQL/ConfigExtensions.cs index 70b1aff..c5e660d 100644 --- a/Tapeti.Flow.SQL/ConfigExtensions.cs +++ b/Tapeti.Flow.SQL/ConfigExtensions.cs @@ -30,7 +30,7 @@ namespace Tapeti.Flow.SQL public void RegisterDefaults(IDependencyContainer container) { - container.RegisterDefault>(() => new SqlConnectionFlowRepository(connectionString, serviceId, schema)); + container.RegisterDefault(() => new SqlConnectionFlowRepository(connectionString, serviceId, schema)); } diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs index 4af9d05..78e47a7 100644 --- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs +++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs @@ -24,7 +24,7 @@ namespace Tapeti.Flow.SQL ); go; */ - public class SqlConnectionFlowRepository : IFlowRepository + public class SqlConnectionFlowRepository : IFlowRepository { private readonly string connectionString; private readonly int serviceId; @@ -39,7 +39,7 @@ namespace Tapeti.Flow.SQL } - public async Task>> GetStates() + public async Task>> GetStates() { using (var connection = await GetConnection()) { @@ -69,14 +69,14 @@ namespace Tapeti.Flow.SQL } - public Task CreateState(Guid flowID, T state, DateTime timestamp) + public Task CreateState(Guid flowID, T state, DateTime timestamp) { var stateJason = JsonConvert.SerializeObject(state); throw new NotImplementedException(); } - public Task UpdateState(Guid flowID, T state) + public Task UpdateState(Guid flowID, T state) { throw new NotImplementedException(); } diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.nuspec b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.nuspec index 686bbf9..0a667c6 100644 --- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.nuspec +++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.nuspec @@ -1,21 +1,21 @@ - - - - X2Software.Tapeti.Flow.SQL - $version$ - $title$ - Mark van Renswoude - Mark van Renswoude - https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE - https://git.x2software.net/pub/tapeti - https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.Flow.SQL.png - false - SQL backing repository for the Tapeti Flow package - - rabbitmq tapeti sql - - - - - + + + + X2Software.Tapeti.Flow.SQL + $version$ + Tapeti Flow SQL + Mark van Renswoude + Mark van Renswoude + https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE + https://github.com/MvRens/Tapeti + https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.SQL.png + false + SQL backing repository for the Tapeti Flow package + + rabbitmq tapeti sql + + + + + \ No newline at end of file diff --git a/Tapeti.Flow/ConfigExtensions.cs b/Tapeti.Flow/ConfigExtensions.cs index 843333e..127a0c2 100644 --- a/Tapeti.Flow/ConfigExtensions.cs +++ b/Tapeti.Flow/ConfigExtensions.cs @@ -2,7 +2,7 @@ { public static class ConfigExtensions { - public static TapetiConfig WithFlow(this TapetiConfig config, IFlowRepository flowRepository = null) + public static TapetiConfig WithFlow(this TapetiConfig config, IFlowRepository flowRepository = null) { config.Use(new FlowMiddleware(flowRepository)); return config; diff --git a/Tapeti.Flow/Default/DelegateYieldPoint.cs b/Tapeti.Flow/Default/DelegateYieldPoint.cs index 984f4cd..2ed0926 100644 --- a/Tapeti.Flow/Default/DelegateYieldPoint.cs +++ b/Tapeti.Flow/Default/DelegateYieldPoint.cs @@ -3,16 +3,13 @@ using System.Threading.Tasks; namespace Tapeti.Flow.Default { - internal class DelegateYieldPoint : IExecutableYieldPoint + internal class DelegateYieldPoint : IYieldPoint { - public bool StoreState { get; } - private readonly Func onExecute; - public DelegateYieldPoint(bool storeState, Func onExecute) + public DelegateYieldPoint(Func onExecute) { - StoreState = storeState; this.onExecute = onExecute; } diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index d7e279a..9053135 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -87,7 +87,7 @@ namespace Tapeti.Flow.Default private static Task HandleParallelResponse(IMessageContext context) { var flowHandler = context.DependencyResolver.Resolve(); - return flowHandler.Execute(context, new StateYieldPoint(true)); + return flowHandler.Execute(context, new DelegateYieldPoint((a) => Task.CompletedTask)); } diff --git a/Tapeti.Flow/Default/FlowCleanupMiddleware.cs b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs new file mode 100644 index 0000000..16cf61b --- /dev/null +++ b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs @@ -0,0 +1,30 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Flow.Default +{ + public class FlowCleanupMiddleware : ICleanupMiddleware + { + public async Task Handle(IMessageContext context, HandlingResult handlingResult) + { + object flowContextObj; + if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextObj)) + return; + var flowContext = (FlowContext)flowContextObj; + + if (flowContext?.FlowStateLock != null) + { + if (handlingResult.ConsumeResponse == ConsumeResponse.Nack + || handlingResult.MessageAction == MessageAction.ErrorLog) + { + await flowContext.FlowStateLock.DeleteFlowState(); + } + flowContext.FlowStateLock.Dispose(); + } + } + } +} diff --git a/Tapeti.Flow/Default/FlowContext.cs b/Tapeti.Flow/Default/FlowContext.cs index 460d5b2..dbadf08 100644 --- a/Tapeti.Flow/Default/FlowContext.cs +++ b/Tapeti.Flow/Default/FlowContext.cs @@ -13,13 +13,13 @@ namespace Tapeti.Flow.Default public Guid ContinuationID { get; set; } public ContinuationMetadata ContinuationMetadata { get; set; } - private bool stored; + private bool storeCalled; + private bool deleteCalled; - public async Task EnsureStored() + public async Task Store() { - if (stored) - return; + storeCalled = true; if (MessageContext == null) throw new ArgumentNullException(nameof(MessageContext)); if (FlowState == null) throw new ArgumentNullException(nameof(FlowState)); @@ -27,8 +27,20 @@ namespace Tapeti.Flow.Default FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(MessageContext.Controller); await FlowStateLock.StoreFlowState(FlowState); + } - stored = true; + public async Task Delete() + { + deleteCalled = true; + + if (FlowStateLock != null) + await FlowStateLock.DeleteFlowState(); + } + + public void EnsureStoreOrDeleteIsCalled() + { + if (!storeCalled && !deleteCalled) + throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID); } public void Dispose() diff --git a/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs b/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs index 1d327ae..8c66e9d 100644 --- a/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs +++ b/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs @@ -39,8 +39,6 @@ namespace Tapeti.Flow.Default return null; var flowStateLock = await flowStore.LockFlowState(flowID.Value); - if (flowStateLock == null) - return null; var flowState = await flowStateLock.GetFlowState(); if (flowState == null) diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 88bc9f6..308cb3a 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -26,13 +26,13 @@ namespace Tapeti.Flow.Default public IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler) { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); - return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo)); + return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler) { var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); - return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo)); + return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); } public IFlowParallelRequestBuilder YieldWithParallelRequest() @@ -42,18 +42,23 @@ namespace Tapeti.Flow.Default public IYieldPoint EndWithResponse(TResponse message) { - return new DelegateYieldPoint(false, context => SendResponse(context, message)); + return new DelegateYieldPoint(context => SendResponse(context, message)); } public IYieldPoint End() { - return new DelegateYieldPoint(false, EndFlow); + return new DelegateYieldPoint(EndFlow); } private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, string convergeMethodName = null, bool convergeMethodTaskSync = false) { + if (context.FlowState == null) + { + await CreateNewFlowState(context); + } + var continuationID = Guid.NewGuid(); context.FlowState.Continuations.Add(continuationID, @@ -70,14 +75,18 @@ namespace Tapeti.Flow.Default ReplyTo = responseHandlerInfo.ReplyToQueue }; - await context.EnsureStored(); + await context.Store(); + await publisher.Publish(message, properties); } private async Task SendResponse(FlowContext context, object message) { - var reply = context.FlowState.Metadata.Reply; + var reply = context.FlowState == null + ? GetReply(context.MessageContext) + : context.FlowState.Metadata.Reply; + if (reply == null) throw new YieldPointException("No response is required"); @@ -92,19 +101,21 @@ namespace Tapeti.Flow.Default properties.CorrelationId = reply.CorrelationId; // TODO disallow if replyto is not specified? - if (context.FlowState.Metadata.Reply.ReplyTo != null) + if (reply.ReplyTo != null) await publisher.PublishDirect(message, reply.ReplyTo, properties); else await publisher.Publish(message, properties); + + await context.Delete(); } - private static Task EndFlow(FlowContext context) + private static async Task EndFlow(FlowContext context) { - if (context.FlowState.Metadata.Reply != null) - throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); + await context.Delete(); - return Task.CompletedTask; + if (context.FlowState != null && context.FlowState.Metadata.Reply != null) + throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); } @@ -147,11 +158,31 @@ namespace Tapeti.Flow.Default }; } + private async Task CreateNewFlowState(FlowContext flowContext) + { + var flowStore = flowContext.MessageContext.DependencyResolver.Resolve(); + + var flowID = Guid.NewGuid(); + flowContext.FlowStateLock = await flowStore.LockFlowState(flowID); + + if (flowContext.FlowStateLock == null) + throw new InvalidOperationException("Unable to lock a new flow"); + + flowContext.FlowState = new FlowState + { + Metadata = new FlowMetadata + { + Reply = GetReply(flowContext.MessageContext) + } + }; + } public async Task Execute(IMessageContext context, IYieldPoint yieldPoint) { - var executableYieldPoint = yieldPoint as IExecutableYieldPoint; - var storeState = executableYieldPoint?.StoreState ?? false; + var executableYieldPoint = yieldPoint as DelegateYieldPoint; + + if (executableYieldPoint == null) + throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Binding.Method.Name}"); FlowContext flowContext; object flowContextItem; @@ -160,27 +191,10 @@ namespace Tapeti.Flow.Default { flowContext = new FlowContext { - MessageContext = context, - FlowState = new FlowState() + MessageContext = context }; - if (storeState) - { - // Initiate the flow - var flowStore = context.DependencyResolver.Resolve(); - - var flowID = Guid.NewGuid(); - flowContext.FlowStateLock = await flowStore.LockFlowState(flowID); - - if (flowContext.FlowStateLock == null) - throw new InvalidOperationException("Unable to lock a new flow"); - - flowContext.FlowState = await flowContext.FlowStateLock.GetFlowState(); - if (flowContext.FlowState == null) - throw new InvalidOperationException("Unable to get state for new flow"); - - flowContext.FlowState.Metadata.Reply = GetReply(context); - } + context.Items.Add(ContextItems.FlowContext, flowContext); } else flowContext = (FlowContext)flowContextItem; @@ -193,19 +207,17 @@ namespace Tapeti.Flow.Default } catch (YieldPointException e) { - var controllerName = flowContext.MessageContext.Controller.GetType().FullName; - var methodName = flowContext.MessageContext.Binding.Method.Name; - - throw new YieldPointException($"{e.Message} in controller {controllerName}, method {methodName}", e); + // Useful for debugging + e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName; + e.Data["Tapeti.Controller.Method"] = context.Binding.Method.Name; + throw; } - if (storeState) - await flowContext.EnsureStored(); - else if (flowContext.FlowStateLock != null) - await flowContext.FlowStateLock.DeleteFlowState(); + flowContext.EnsureStoreOrDeleteIsCalled(); } + private class ParallelRequestBuilder : IFlowParallelRequestBuilder { public delegate Task SendRequestFunc(FlowContext context, @@ -275,7 +287,7 @@ namespace Tapeti.Flow.Default if (convergeMethod?.Method == null) throw new ArgumentNullException(nameof(convergeMethod)); - return new DelegateYieldPoint(true, context => + return new DelegateYieldPoint(context => { if (convergeMethod.Method.DeclaringType != context.MessageContext.Controller.GetType()) throw new YieldPointException("Converge method must be in the same controller class"); diff --git a/Tapeti.Flow/Default/FlowStarter.cs b/Tapeti.Flow/Default/FlowStarter.cs index a21593f..306f034 100644 --- a/Tapeti.Flow/Default/FlowStarter.cs +++ b/Tapeti.Flow/Default/FlowStarter.cs @@ -10,30 +10,42 @@ namespace Tapeti.Flow.Default public class FlowStarter : IFlowStarter { private readonly IConfig config; + private readonly ILogger logger; - public FlowStarter(IConfig config) + public FlowStarter(IConfig config, ILogger logger) { this.config = config; + this.logger = logger; } public Task Start(Expression>> methodSelector) where TController : class { - return CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value)); + return CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] { }); } public Task Start(Expression>>> methodSelector) where TController : class { - return CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value); + return CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, new object[] {}); + } + + public Task Start(Expression>> methodSelector, TParameter parameter) where TController : class + { + return CallControllerMethod(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] {parameter}); + } + + public Task Start(Expression>>> methodSelector, TParameter parameter) where TController : class + { + return CallControllerMethod(GetExpressionMethod(methodSelector), value => (Task)value, new object[] {parameter}); } - private async Task CallControllerMethod(MethodInfo method, Func> getYieldPointResult) where TController : class + private async Task CallControllerMethod(MethodInfo method, Func> getYieldPointResult, object[] parameters) where TController : class { var controller = config.DependencyResolver.Resolve(); - var yieldPoint = await getYieldPointResult(method.Invoke(controller, new object[] {})); + var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters)); var context = new MessageContext { @@ -42,7 +54,35 @@ namespace Tapeti.Flow.Default }; var flowHandler = config.DependencyResolver.Resolve(); - await flowHandler.Execute(context, yieldPoint); + + HandlingResultBuilder handlingResult = new HandlingResultBuilder + { + ConsumeResponse = ConsumeResponse.Nack, + }; + try + { + await flowHandler.Execute(context, yieldPoint); + handlingResult.ConsumeResponse = ConsumeResponse.Ack; + } + finally + { + await RunCleanup(context, handlingResult.ToHandlingResult()); + } + } + + private async Task RunCleanup(MessageContext context, HandlingResult handlingResult) + { + foreach (var handler in config.CleanupMiddleware) + { + try + { + await handler.Handle(context, handlingResult); + } + catch (Exception eCleanup) + { + logger.HandlerException(eCleanup); + } + } } @@ -57,5 +97,17 @@ namespace Tapeti.Flow.Default return method; } + + private static MethodInfo GetExpressionMethod(Expression>> methodSelector) + { + var callExpression = (methodSelector.Body as UnaryExpression)?.Operand as MethodCallExpression; + var targetMethodExpression = callExpression?.Object as ConstantExpression; + + var method = targetMethodExpression?.Value as MethodInfo; + if (method == null) + throw new ArgumentException("Unable to determine the starting method", nameof(methodSelector)); + + return method; + } } } diff --git a/Tapeti.Flow/Default/FlowState.cs b/Tapeti.Flow/Default/FlowState.cs index e1cb0cf..0ee3eec 100644 --- a/Tapeti.Flow/Default/FlowState.cs +++ b/Tapeti.Flow/Default/FlowState.cs @@ -25,20 +25,13 @@ namespace Tapeti.Flow.Default } - public void Assign(FlowState value) - { - Metadata = value.Metadata.Clone(); - Data = value.Data; - Continuations = value.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value.Clone()); - } - - public FlowState Clone() { - var result = new FlowState(); - result.Assign(this); - - return result; + return new FlowState { + metadata = metadata.Clone(), + Data = Data, + continuations = continuations?.ToDictionary(kv => kv.Key, kv => kv.Value.Clone()) + }; } } diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index 8bb1061..d2a6d87 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -5,18 +5,21 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using Tapeti.Flow.FlowHelpers; namespace Tapeti.Flow.Default { public class FlowStore : IFlowStore { - private static readonly ConcurrentDictionary FlowStates = new ConcurrentDictionary(); - private static readonly ConcurrentDictionary ContinuationLookup = new ConcurrentDictionary(); + private readonly ConcurrentDictionary FlowStates = new ConcurrentDictionary(); + private readonly ConcurrentDictionary ContinuationLookup = new ConcurrentDictionary(); + private readonly LockCollection Locks = new LockCollection(EqualityComparer.Default); - private readonly IFlowRepository repository; + private readonly IFlowRepository repository; + private volatile bool InUse = false; - public FlowStore(IFlowRepository repository) + public FlowStore(IFlowRepository repository) { this.repository = repository; } @@ -24,10 +27,15 @@ namespace Tapeti.Flow.Default public async Task Load() { + if (InUse) + throw new InvalidOperationException("Can only load the saved state once."); + + InUse = true; + FlowStates.Clear(); ContinuationLookup.Clear(); - foreach (var flowStateRecord in await repository.GetStates()) + foreach (var flowStateRecord in await repository.GetStates()) { FlowStates.TryAdd(flowStateRecord.Key, flowStateRecord.Value); @@ -46,97 +54,76 @@ namespace Tapeti.Flow.Default public async Task LockFlowState(Guid flowID) { - var isNew = false; - var flowState = FlowStates.GetOrAdd(flowID, id => - { - isNew = true; - return new FlowState(); - }); + InUse = true; - var result = new FlowStateLock(this, flowState, flowID, isNew); - await result.Lock(); - - return result; + var flowStatelock = new FlowStateLock(this, flowID, await Locks.GetLock(flowID)); + return flowStatelock; } - private class FlowStateLock : IFlowStateLock { - private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1); - private readonly FlowStore owner; - private readonly FlowState flowState; private readonly Guid flowID; - private bool isNew; - private bool isDisposed; + private volatile IDisposable flowLock; + private FlowState flowState; - public FlowStateLock(FlowStore owner, FlowState flowState, Guid flowID, bool isNew) + public FlowStateLock(FlowStore owner, Guid flowID, IDisposable flowLock) { this.owner = owner; - this.flowState = flowState; this.flowID = flowID; - this.isNew = isNew; + this.flowLock = flowLock; + + owner.FlowStates.TryGetValue(flowID, out flowState); } - - public Task Lock() - { - return semaphore.WaitAsync(); - } - - public void Dispose() { - lock (flowState) - { - if (!isDisposed) - { - semaphore.Release(); - semaphore.Dispose(); - } - - isDisposed = true; - } + var l = flowLock; + flowLock = null; + l?.Dispose(); } public Guid FlowID => flowID; public Task GetFlowState() { - lock (flowState) - { - if (isDisposed) - throw new ObjectDisposedException("FlowStateLock"); + if (flowLock == null) + throw new ObjectDisposedException("FlowStateLock"); - return Task.FromResult(flowState.Clone()); - } + return Task.FromResult(flowState?.Clone()); } public async Task StoreFlowState(FlowState newFlowState) { - lock (flowState) - { - if (isDisposed) - throw new ObjectDisposedException("FlowStateLock"); + if (flowLock == null) + throw new ObjectDisposedException("FlowStateLock"); + // Ensure no one has a direct reference to the protected state in the dictionary + newFlowState = newFlowState.Clone(); + + // Update the lookup dictionary for the ContinuationIDs + if (flowState != null) + { foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k))) { Guid removedValue; - ContinuationLookup.TryRemove(removedContinuation, out removedValue); + owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue); } - - foreach (var addedContinuation in newFlowState.Continuations.Where(c => !flowState.Continuations.ContainsKey(c.Key))) - { - ContinuationLookup.TryAdd(addedContinuation.Key, flowID); - } - - flowState.Assign(newFlowState); } + foreach (var addedContinuation in newFlowState.Continuations.Where(c => flowState == null || !flowState.Continuations.ContainsKey(c.Key))) + { + owner.ContinuationLookup.TryAdd(addedContinuation.Key, flowID); + } + + var isNew = flowState == null; + flowState = newFlowState; + owner.FlowStates[flowID] = newFlowState; + + // Storing the flowstate in the underlying repository if (isNew) { - isNew = false; var now = DateTime.UtcNow; await owner.repository.CreateState(flowID, flowState, now); } @@ -148,50 +135,27 @@ namespace Tapeti.Flow.Default public async Task DeleteFlowState() { - lock (flowState) - { - if (isDisposed) - throw new ObjectDisposedException("FlowStateLock"); + if (flowLock == null) + throw new ObjectDisposedException("FlowStateLock"); + if (flowState != null) + { foreach (var removedContinuation in flowState.Continuations.Keys) { Guid removedValue; - ContinuationLookup.TryRemove(removedContinuation, out removedValue); + owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue); } FlowState removedFlow; - FlowStates.TryRemove(flowID, out removedFlow); + owner.FlowStates.TryRemove(flowID, out removedFlow); + + if (flowState != null) + { + flowState = null; + await owner.repository.DeleteState(flowID); + } } - - if (!isNew) - await owner.repository.DeleteState(flowID); } } - - - private static FlowStateRecord ToFlowStateRecord(Guid flowID, FlowState flowState) - { - return new FlowStateRecord - { - FlowID = flowID, - Metadata = JsonConvert.SerializeObject(flowState.Metadata), - Data = flowState.Data, - ContinuationMetadata = flowState.Continuations.ToDictionary( - kv => kv.Key, - kv => JsonConvert.SerializeObject(kv.Value)) - }; - } - - private static FlowState ToFlowState(FlowStateRecord flowStateRecord) - { - return new FlowState - { - Metadata = JsonConvert.DeserializeObject(flowStateRecord.Metadata), - Data = flowStateRecord.Data, - Continuations = flowStateRecord.ContinuationMetadata.ToDictionary( - kv => kv.Key, - kv => JsonConvert.DeserializeObject(kv.Value)) - }; - } } } diff --git a/Tapeti.Flow/Default/IExecutableYieldPoint.cs b/Tapeti.Flow/Default/IExecutableYieldPoint.cs deleted file mode 100644 index 80db564..0000000 --- a/Tapeti.Flow/Default/IExecutableYieldPoint.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System.Threading.Tasks; - -namespace Tapeti.Flow.Default -{ - internal interface IExecutableYieldPoint : IYieldPoint - { - bool StoreState { get; } - Task Execute(FlowContext context); - } -} diff --git a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs index dee4e78..00ec009 100644 --- a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs +++ b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs @@ -5,19 +5,19 @@ using System.Threading.Tasks; namespace Tapeti.Flow.Default { - public class NonPersistentFlowRepository : IFlowRepository + public class NonPersistentFlowRepository : IFlowRepository { - Task>> IFlowRepository.GetStates() + Task>> IFlowRepository.GetStates() { return Task.FromResult(new List>()); } - public Task CreateState(Guid flowID, T state, DateTime timestamp) + public Task CreateState(Guid flowID, T state, DateTime timestamp) { return Task.CompletedTask; } - public Task UpdateState(Guid flowID, T state) + public Task UpdateState(Guid flowID, T state) { return Task.CompletedTask; } diff --git a/Tapeti.Flow/Default/StateYieldPoint.cs b/Tapeti.Flow/Default/StateYieldPoint.cs deleted file mode 100644 index 521c2fc..0000000 --- a/Tapeti.Flow/Default/StateYieldPoint.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System.Threading.Tasks; - -namespace Tapeti.Flow.Default -{ - internal class StateYieldPoint : IExecutableYieldPoint - { - public bool StoreState { get; } - - - public StateYieldPoint(bool storeState) - { - StoreState = storeState; - } - - - public async Task Execute(FlowContext context) - { - if (StoreState) - await context.EnsureStored(); - } - } -} diff --git a/Tapeti.Flow/FlowHelpers/LockCollection.cs b/Tapeti.Flow/FlowHelpers/LockCollection.cs new file mode 100644 index 0000000..f8a3533 --- /dev/null +++ b/Tapeti.Flow/FlowHelpers/LockCollection.cs @@ -0,0 +1,110 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti.Flow.FlowHelpers +{ + public class LockCollection + { + private readonly Dictionary locks; + + public LockCollection(IEqualityComparer comparer) + { + locks = new Dictionary(comparer); + } + + public Task GetLock(T key) + { + LockItem nextLi = new LockItem(locks, key); + try + { + bool continueImmediately = false; + lock (locks) + { + LockItem li; + if (!locks.TryGetValue(key, out li)) + { + locks.Add(key, nextLi); + continueImmediately = true; + } + else + { + while (li.Next != null) + li = li.Next; + + li.Next = nextLi; + } + } + if (continueImmediately) + nextLi.Continue(); + } + catch (Exception e) + { + nextLi.Error(e); + } + return nextLi.GetTask(); + } + + private class LockItem : IDisposable + { + internal volatile LockItem Next; + + private readonly Dictionary locks; + private readonly TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly T key; + + public LockItem(Dictionary locks, T key) + { + this.locks = locks; + this.key = key; + } + + internal void Continue() + { + tcs.TrySetResult(this); + } + + internal void Error(Exception e) + { + tcs.SetException(e); + } + + internal Task GetTask() + { + return tcs.Task; + } + + public void Dispose() + { + lock (locks) + { + LockItem li; + if (!locks.TryGetValue(key, out li)) + return; + + if (li != this) + { + // Something is wrong (comparer is not stable?), but we cannot loose the completions sources + while (li.Next != null) + li = li.Next; + li.Next = Next; + return; + } + + if (Next == null) + { + locks.Remove(key); + return; + } + + locks[key] = Next; + } + + Next.Continue(); + } + } + } + +} diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowMiddleware.cs index cba399d..751374c 100644 --- a/Tapeti.Flow/FlowMiddleware.cs +++ b/Tapeti.Flow/FlowMiddleware.cs @@ -6,9 +6,9 @@ namespace Tapeti.Flow { public class FlowMiddleware : ITapetiExtension { - private IFlowRepository flowRepository; + private IFlowRepository flowRepository; - public FlowMiddleware(IFlowRepository flowRepository) + public FlowMiddleware(IFlowRepository flowRepository) { this.flowRepository = flowRepository; } @@ -18,13 +18,14 @@ namespace Tapeti.Flow container.RegisterDefault(); container.RegisterDefault(); container.RegisterDefault(); - container.RegisterDefault>(() => flowRepository ?? new NonPersistentFlowRepository()); - container.RegisterDefault(); + container.RegisterDefaultSingleton(() => flowRepository ?? new NonPersistentFlowRepository()); + container.RegisterDefaultSingleton(); } public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) { - return new[] { new FlowBindingMiddleware() }; + yield return new FlowBindingMiddleware(); + yield return new FlowCleanupMiddleware(); } } } diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index f4bc7d1..edb3968 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -27,6 +27,8 @@ namespace Tapeti.Flow { Task Start(Expression>> methodSelector) where TController : class; Task Start(Expression>>> methodSelector) where TController : class; + Task Start(Expression>> methodSelector, TParameter parameter) where TController : class; + Task Start(Expression>>> methodSelector, TParameter parameter) where TController : class; } /// diff --git a/Tapeti.Flow/IFlowRepository.cs b/Tapeti.Flow/IFlowRepository.cs index ce026d9..0f5dbb4 100644 --- a/Tapeti.Flow/IFlowRepository.cs +++ b/Tapeti.Flow/IFlowRepository.cs @@ -5,11 +5,11 @@ using System.Threading.Tasks; namespace Tapeti.Flow { - public interface IFlowRepository + public interface IFlowRepository { - Task>> GetStates(); - Task CreateState(Guid flowID, T state, DateTime timestamp); - Task UpdateState(Guid flowID, T state); + Task>> GetStates(); + Task CreateState(Guid flowID, T state, DateTime timestamp); + Task UpdateState(Guid flowID, T state); Task DeleteState(Guid flowID); } diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index b085913..b4cb237 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -54,17 +54,17 @@ + - - + diff --git a/Tapeti.Flow/Tapeti.Flow.nuspec b/Tapeti.Flow/Tapeti.Flow.nuspec index 1df4677..54a7e1f 100644 --- a/Tapeti.Flow/Tapeti.Flow.nuspec +++ b/Tapeti.Flow/Tapeti.Flow.nuspec @@ -1,21 +1,21 @@ - - - - X2Software.Tapeti.Flow - $version$ - $title$ - Menno van Lavieren, Mark van Renswoude - Mark van Renswoude - https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE - https://git.x2software.net/pub/tapeti - https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.Flow.png - false - Flow extension for Tapeti - - rabbitmq tapeti flow - - - - - + + + + X2Software.Tapeti.Flow + $version$ + Tapeti Flow + Menno van Lavieren, Mark van Renswoude + Mark van Renswoude + https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE + https://github.com/MvRens/Tapeti + https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.png + false + Flow extension for Tapeti + + rabbitmq tapeti flow + + + + + \ No newline at end of file diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.nuspec b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.nuspec index 6af6063..e53c636 100644 --- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.nuspec +++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.nuspec @@ -1,20 +1,20 @@ - - - - X2Software.Tapeti.SimpleInjector - $version$ - $title$ - Mark van Renswoude - Mark van Renswoude - https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE - https://git.x2software.net/pub/tapeti - https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.SimpleInjector.png - false - SimpleInjector integration package for Tapeti - - rabbitmq tapeti simpleinjector - - - - + + + + X2Software.Tapeti.SimpleInjector + $version$ + Tapeti SimpleInjector + Mark van Renswoude + Mark van Renswoude + https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE + https://github.com/MvRens/Tapeti + https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png + false + SimpleInjector integration package for Tapeti + + rabbitmq tapeti simpleinjector + + + + \ No newline at end of file diff --git a/Tapeti/Config/ICleanupMiddleware.cs b/Tapeti/Config/ICleanupMiddleware.cs new file mode 100644 index 0000000..290236e --- /dev/null +++ b/Tapeti/Config/ICleanupMiddleware.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface ICleanupMiddleware + { + Task Handle(IMessageContext context, HandlingResult handlingResult); + } +} diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs index a7cd268..46bf575 100644 --- a/Tapeti/Config/IConfig.cs +++ b/Tapeti/Config/IConfig.cs @@ -9,6 +9,7 @@ namespace Tapeti.Config { IDependencyResolver DependencyResolver { get; } IReadOnlyList MessageMiddleware { get; } + IReadOnlyList CleanupMiddleware { get; } IReadOnlyList PublishMiddleware { get; } IEnumerable Queues { get; } diff --git a/Tapeti/Config/IExceptionStrategyContext.cs b/Tapeti/Config/IExceptionStrategyContext.cs new file mode 100644 index 0000000..2a99af9 --- /dev/null +++ b/Tapeti/Config/IExceptionStrategyContext.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface IExceptionStrategyContext + { + IMessageContext MessageContext { get; } + + Exception Exception { get; } + + HandlingResultBuilder HandlingResult { get; set; } + } +} diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index d47a498..03355f5 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -16,18 +16,23 @@ namespace Tapeti.Connection private readonly string queueName; private readonly IDependencyResolver dependencyResolver; private readonly IReadOnlyList messageMiddleware; + private readonly IReadOnlyList cleanupMiddleware; private readonly List bindings; + + private readonly ILogger logger; private readonly IExceptionStrategy exceptionStrategy; - public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware) + public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable bindings, IReadOnlyList messageMiddleware, IReadOnlyList cleanupMiddleware) { this.worker = worker; this.queueName = queueName; this.dependencyResolver = dependencyResolver; this.messageMiddleware = messageMiddleware; + this.cleanupMiddleware = cleanupMiddleware; this.bindings = bindings.ToList(); + logger = dependencyResolver.Resolve(); exceptionStrategy = dependencyResolver.Resolve(); } @@ -35,59 +40,138 @@ namespace Tapeti.Connection public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) { - ExceptionDispatchInfo exception = null; - try + Task.Run(async () => { - var message = dependencyResolver.Resolve().Deserialize(body, properties); - if (message == null) - throw new ArgumentException("Empty message"); - - var validMessageType = false; - - using (var context = new MessageContext - { - DependencyResolver = dependencyResolver, - Queue = queueName, - RoutingKey = routingKey, - Message = message, - Properties = properties - }) + ExceptionDispatchInfo exception = null; + MessageContext context = null; + HandlingResult handlingResult = null; + try { try { - foreach (var binding in bindings) + context = new MessageContext { - if (binding.Accept(context, message)) - { - InvokeUsingBinding(context, binding, message); + DependencyResolver = dependencyResolver, + Queue = queueName, + RoutingKey = routingKey, + Properties = properties + }; - validMessageType = true; - } - } + await DispatchMesage(context, body); - if (!validMessageType) - throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); - - worker.Respond(deliveryTag, ConsumeResponse.Ack); + handlingResult = new HandlingResult + { + ConsumeResponse = ConsumeResponse.Ack, + MessageAction = MessageAction.None + }; } - catch (Exception e) + catch (Exception eDispatch) { - exception = ExceptionDispatchInfo.Capture(UnwrapException(e)); - worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, exception.SourceException)); + exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch)); + logger.HandlerException(eDispatch); + try + { + var exceptionStrategyContext = new ExceptionStrategyContext(context, exception.SourceException); + + exceptionStrategy.HandleException(exceptionStrategyContext); + + handlingResult = exceptionStrategyContext.HandlingResult.ToHandlingResult(); + } + catch (Exception eStrategy) + { + logger.HandlerException(eStrategy); + } + } + try + { + if (handlingResult == null) + { + handlingResult = new HandlingResult + { + ConsumeResponse = ConsumeResponse.Nack, + MessageAction = MessageAction.None + }; + } + await RunCleanup(context, handlingResult); + } + catch (Exception eCleanup) + { + logger.HandlerException(eCleanup); } } - } - catch (Exception e) - { - exception = ExceptionDispatchInfo.Capture(UnwrapException(e)); - worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, exception.SourceException)); - } - - exception?.Throw(); + finally + { + try + { + if (handlingResult == null) + { + handlingResult = new HandlingResult + { + ConsumeResponse = ConsumeResponse.Nack, + MessageAction = MessageAction.None + }; + } + await worker.Respond(deliveryTag, handlingResult.ConsumeResponse); + } + catch (Exception eRespond) + { + logger.HandlerException(eRespond); + } + try + { + if (context != null) + { + context.Dispose(); + } + } + catch (Exception eDispose) + { + logger.HandlerException(eDispose); + } + } + }); } + private async Task RunCleanup(MessageContext context, HandlingResult handlingResult) + { + foreach(var handler in cleanupMiddleware) + { + try + { + await handler.Handle(context, handlingResult); + } + catch (Exception eCleanup) + { + logger.HandlerException(eCleanup); + } + } + } - private void InvokeUsingBinding(MessageContext context, IBinding binding, object message) + private async Task DispatchMesage(MessageContext context, byte[] body) + { + var message = dependencyResolver.Resolve().Deserialize(body, context.Properties); + if (message == null) + throw new ArgumentException("Empty message"); + + context.Message = message; + + var validMessageType = false; + + foreach (var binding in bindings) + { + if (binding.Accept(context, message)) + { + await InvokeUsingBinding(context, binding, message); + + validMessageType = true; + } + } + + if (!validMessageType) + throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); + } + + private Task InvokeUsingBinding(MessageContext context, IBinding binding, object message) { context.Binding = binding; @@ -136,9 +220,7 @@ namespace Tapeti.Connection await binding.Invoke(c, message); }); - firstCaller.Call(context) - .Wait(); - + return firstCaller.Call(context); } private static Exception UnwrapException(Exception exception) diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index 8b9f7d6..e539bf7 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -57,7 +57,7 @@ namespace Tapeti.Connection return taskQueue.Value.Add(async () => { - (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware)); + (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware)); }).Unwrap(); } diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index 2cb7caf..98484fe 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -1,4 +1,6 @@ -namespace Tapeti.Default +using System; + +namespace Tapeti.Default { public class ConsoleLogger : ILogger { @@ -16,5 +18,10 @@ { throw new System.NotImplementedException(); } + + public void HandlerException(Exception e) + { + Console.WriteLine(e.ToString()); + } } } diff --git a/Tapeti/Default/DevNullLogger.cs b/Tapeti/Default/DevNullLogger.cs index 24919fc..af4ce57 100644 --- a/Tapeti/Default/DevNullLogger.cs +++ b/Tapeti/Default/DevNullLogger.cs @@ -1,4 +1,6 @@ -namespace Tapeti.Default +using System; + +namespace Tapeti.Default { public class DevNullLogger : ILogger { @@ -13,5 +15,9 @@ public void ConnectSuccess(TapetiConnectionParams connectionParams) { } + + public void HandlerException(Exception e) + { + } } } diff --git a/Tapeti/Default/ExceptionStrategyContext.cs b/Tapeti/Default/ExceptionStrategyContext.cs new file mode 100644 index 0000000..fc24ab3 --- /dev/null +++ b/Tapeti/Default/ExceptionStrategyContext.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class ExceptionStrategyContext : IExceptionStrategyContext + { + internal ExceptionStrategyContext(IMessageContext messageContext, Exception exception) + { + MessageContext = messageContext; + Exception = exception; + } + + public IMessageContext MessageContext { get; } + + public Exception Exception { get; } + + private HandlingResultBuilder handlingResult; + public HandlingResultBuilder HandlingResult + { + get + { + if (handlingResult == null) + { + handlingResult = new HandlingResultBuilder(); + } + return handlingResult; + } + + set + { + handlingResult = value; + } + } + } +} diff --git a/Tapeti/Default/NackExceptionStrategy.cs b/Tapeti/Default/NackExceptionStrategy.cs new file mode 100644 index 0000000..48babe3 --- /dev/null +++ b/Tapeti/Default/NackExceptionStrategy.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tapeti.Config; + +namespace Tapeti.Default +{ + public class NackExceptionStrategy : IExceptionStrategy + { + public void HandleException(IExceptionStrategyContext context) + { + context.HandlingResult.ConsumeResponse = ConsumeResponse.Nack; + } + } +} diff --git a/Tapeti/Default/RequeueExceptionStrategy.cs b/Tapeti/Default/RequeueExceptionStrategy.cs index 6a20ca7..afa3143 100644 --- a/Tapeti/Default/RequeueExceptionStrategy.cs +++ b/Tapeti/Default/RequeueExceptionStrategy.cs @@ -5,10 +5,9 @@ namespace Tapeti.Default { public class RequeueExceptionStrategy : IExceptionStrategy { - public ConsumeResponse HandleException(IMessageContext context, Exception exception) + public void HandleException(IExceptionStrategyContext context) { - // TODO log exception - return ConsumeResponse.Requeue; + context.HandlingResult.ConsumeResponse = ConsumeResponse.Requeue; } } } diff --git a/Tapeti/HandlingResult.cs b/Tapeti/HandlingResult.cs new file mode 100644 index 0000000..e1bb575 --- /dev/null +++ b/Tapeti/HandlingResult.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti +{ + public class HandlingResult + { + public HandlingResult() + { + ConsumeResponse = ConsumeResponse.Nack; + MessageAction = MessageAction.None; + } + + /// + /// Determines which response will be given to the message bus from where the message originates. + /// + public ConsumeResponse ConsumeResponse { get; internal set; } + + /// + /// Registers which action the Exception strategy has taken or will take to handle the error condition + /// on the message. This is important to know for cleanup handlers registered by middleware. + /// + public MessageAction MessageAction { get; internal set; } + + } + + public class HandlingResultBuilder + { + private static readonly HandlingResult Default = new HandlingResult(); + + private HandlingResult data = Default; + + public ConsumeResponse ConsumeResponse { + get + { + return data.ConsumeResponse; + } + set + { + GetWritableData().ConsumeResponse = value; + } + } + + public MessageAction MessageAction + { + get + { + return data.MessageAction; + } + set + { + GetWritableData().MessageAction = value; + } + } + + public HandlingResult ToHandlingResult() + { + if (data == Default) + { + return new HandlingResult(); + } + var result = GetWritableData(); + data = Default; + return result; + } + + private HandlingResult GetWritableData() + { + if (data == Default) + { + data = new HandlingResult(); + } + return data; + } + } +} diff --git a/Tapeti/IExceptionStrategy.cs b/Tapeti/IExceptionStrategy.cs index 7b46af6..7525324 100644 --- a/Tapeti/IExceptionStrategy.cs +++ b/Tapeti/IExceptionStrategy.cs @@ -8,9 +8,9 @@ namespace Tapeti /// /// Called when an exception occurs while handling a message. /// - /// The message context if available. May be null! - /// The exception instance - /// The ConsumeResponse to determine whether to requeue, dead-letter (nack) or simply ack the message. - ConsumeResponse HandleException(IMessageContext context, Exception exception); + /// The exception strategy context containing the necessary data including the message context and the thrown exception. + /// Also the response to the message can be set. + /// If there is any other handling of the message than the expected default than HandlingResult.MessageFutureAction must be set accordingly. + void HandleException(IExceptionStrategyContext context); } } diff --git a/Tapeti/ILogger.cs b/Tapeti/ILogger.cs index 014f217..1dc244d 100644 --- a/Tapeti/ILogger.cs +++ b/Tapeti/ILogger.cs @@ -1,4 +1,6 @@ -namespace Tapeti +using System; + +namespace Tapeti { // This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog) // instead of only string-based logging without control over the output. @@ -7,5 +9,6 @@ void Connect(TapetiConnectionParams connectionParams); void ConnectFailed(TapetiConnectionParams connectionParams); void ConnectSuccess(TapetiConnectionParams connectionParams); + void HandlerException(Exception e); } } diff --git a/Tapeti/MessageFutureAction.cs b/Tapeti/MessageFutureAction.cs new file mode 100644 index 0000000..7cbd319 --- /dev/null +++ b/Tapeti/MessageFutureAction.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Tapeti +{ + public enum MessageAction + { + None = 1, + ErrorLog = 2, + Retry = 3, + } +} diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 493548a..5ce8b98 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -53,6 +53,8 @@ + + @@ -63,6 +65,9 @@ + + + @@ -83,6 +88,7 @@ + diff --git a/Tapeti/Tapeti.nuspec b/Tapeti/Tapeti.nuspec index 249c4c2..b1eb7ca 100644 --- a/Tapeti/Tapeti.nuspec +++ b/Tapeti/Tapeti.nuspec @@ -1,20 +1,20 @@ - - - - X2Software.Tapeti - $version$ - $title$ - Mark van Renswoude - Mark van Renswoude - https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE - https://git.x2software.net/pub/tapeti - https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.png - false - Controller-based framework for RabbitMQ microservice architectures - - rabbitmq tapeti - - - - + + + + X2Software.Tapeti + $version$ + Tapeti + Mark van Renswoude + Mark van Renswoude + https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE + https://github.com/MvRens/Tapeti + https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.png + false + Controller-based framework for RabbitMQ microservice architectures + + rabbitmq tapeti + + + + \ No newline at end of file diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 75ed930..117c4c9 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -26,6 +26,7 @@ namespace Tapeti private readonly List bindingMiddleware = new List(); private readonly List messageMiddleware = new List(); + private readonly List cleanupMiddleware = new List(); private readonly List publishMiddleware = new List(); private readonly IDependencyResolver dependencyResolver; @@ -62,7 +63,7 @@ namespace Tapeti queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl))); - var config = new Config(dependencyResolver, messageMiddleware, publishMiddleware, queues); + var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues); (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); return config; @@ -83,6 +84,13 @@ namespace Tapeti } + public TapetiConfig Use(ICleanupMiddleware handler) + { + cleanupMiddleware.Add(handler); + return this; + } + + public TapetiConfig Use(IPublishMiddleware handler) { publishMiddleware.Add(handler); @@ -108,6 +116,8 @@ namespace Tapeti Use((IBindingMiddleware)middleware); else if (middleware is IMessageMiddleware) Use((IMessageMiddleware)middleware); + else if (middleware is ICleanupMiddleware) + Use((ICleanupMiddleware)middleware); else if (middleware is IPublishMiddleware) Use((IPublishMiddleware)middleware); else @@ -133,7 +143,7 @@ namespace Tapeti container.RegisterDefault(); container.RegisterDefault(); container.RegisterDefault(); - container.RegisterDefault(); + container.RegisterDefault(); } @@ -345,16 +355,18 @@ namespace Tapeti { public IDependencyResolver DependencyResolver { get; } public IReadOnlyList MessageMiddleware { get; } + public IReadOnlyList CleanupMiddleware { get; } public IReadOnlyList PublishMiddleware { get; } public IEnumerable Queues { get; } private readonly Dictionary bindingMethodLookup; - public Config(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IReadOnlyList publishMiddleware, IEnumerable queues) + public Config(IDependencyResolver dependencyResolver, IReadOnlyList messageMiddleware, IReadOnlyList cleanupMiddleware, IReadOnlyList publishMiddleware, IEnumerable queues) { DependencyResolver = dependencyResolver; MessageMiddleware = messageMiddleware; + CleanupMiddleware = cleanupMiddleware; PublishMiddleware = publishMiddleware; Queues = queues.ToList(); diff --git a/Test/FlowEndController.cs b/Test/FlowEndController.cs index 522b98f..679e542 100644 --- a/Test/FlowEndController.cs +++ b/Test/FlowEndController.cs @@ -1,6 +1,7 @@ using System; using Tapeti.Annotations; using Tapeti.Flow; +using Tapeti.Flow.Annotations; namespace Test { @@ -17,7 +18,25 @@ namespace Test public IYieldPoint StartFlow(PingMessage message) { - Console.WriteLine("PingMessage received, call flowProvider.End()"); + Console.WriteLine("PingMessage received, calling flowProvider.End() directly"); + + if (DateTime.Now < new DateTime(2000, 1, 1)) + { + //never true + return flowProvider + .YieldWithRequestSync + (new PingConfirmationRequestMessage() { StoredInState = "Ping:" }, + HandlePingConfirmationResponse); + } + + return Finish(); + } + + + [Continuation] + public IYieldPoint HandlePingConfirmationResponse(PingConfirmationResponseMessage msg) + { + Console.WriteLine("Ending ping flow: " + msg.Answer); return Finish(); } @@ -33,5 +52,26 @@ namespace Test } + [Request(Response = typeof(PingConfirmationResponseMessage))] + public class PingConfirmationRequestMessage + { + public string StoredInState { get; set; } + } + + + public class PingConfirmationResponseMessage + { + public string Answer { get; set; } + } + + public PingConfirmationResponseMessage PingConfirmation(PingConfirmationRequestMessage message) + { + Console.WriteLine(">> receive Ping (returning pong)"); + + return new PingConfirmationResponseMessage + { + Answer = message.StoredInState + " Pong!" + }; + } } } diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index 1bf8ae3..fe78c4b 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -19,6 +19,7 @@ namespace Test // Public properties are automatically stored and retrieved while in a flow public Guid StateTestGuid { get; set; } + public int Phase; public MarcoController(IPublisher publisher, IFlowProvider flowProvider, Visualizer visualizer) { @@ -29,21 +30,40 @@ namespace Test [Start] - public async Task StartFlow() + public async Task StartFlow(bool go) { - Console.WriteLine("Starting stand-alone flow"); - await Task.Delay(1000); + Console.WriteLine("Phase = " + Phase + " Starting stand-alone flow"); + await Task.Delay(10); - return flowProvider.YieldWithRequestSync - (new PoloConfirmationRequestMessage(), - HandlePoloConfirmationResponse); + Phase = 1; + + if (go) + return flowProvider.YieldWithRequestSync + (new PoloConfirmationRequestMessage(), + HandlePoloConfirmationResponse); + + Console.WriteLine("Phase = " + Phase + " Ending stand-alone flow prematurely"); + return flowProvider.End(); } [Continuation] public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage msg) { - Console.WriteLine("Ending stand-alone flow"); + Console.WriteLine("Phase = " + Phase + " Handling the first response and sending the second..."); + + Phase = 2; + + return flowProvider.YieldWithRequestSync + (new PoloConfirmationRequestMessage(), + HandlePoloConfirmationResponseEnd); + } + + + [Continuation] + public IYieldPoint HandlePoloConfirmationResponseEnd(PoloConfirmationResponseMessage msg) + { + Console.WriteLine("Phase = " + Phase + " Handling the second response and Ending stand-alone flow"); return flowProvider.End(); } diff --git a/Test/MarcoEmitter.cs b/Test/MarcoEmitter.cs index 7298937..66a4a75 100644 --- a/Test/MarcoEmitter.cs +++ b/Test/MarcoEmitter.cs @@ -17,7 +17,7 @@ namespace Test public async Task Run() { - await publisher.Publish(new MarcoMessage()); + //await publisher.Publish(new MarcoMessage()); /* var concurrent = new SemaphoreSlim(20); diff --git a/Test/Program.cs b/Test/Program.cs index 6eecc55..a6c110f 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using SimpleInjector; using Tapeti; using Tapeti.DataAnnotations; @@ -6,6 +7,7 @@ using Tapeti.Flow; using Tapeti.Flow.SQL; using Tapeti.Helpers; using Tapeti.SimpleInjector; +using System.Threading; namespace Test { @@ -20,8 +22,7 @@ namespace Test var container = new Container(); container.Register(); container.Register(); - - //container.Register(() => new EF(serviceID)); + container.Register(); var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) .WithFlow() @@ -34,6 +35,11 @@ namespace Test Params = new TapetiAppSettingsConnectionParams() }) { + var flowStore = container.GetInstance(); + var flowStore2 = container.GetInstance(); + + Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2)); + connection.Connected += (sender, e) => { Console.WriteLine("Event Connected"); }; @@ -54,7 +60,9 @@ namespace Test connection.GetPublisher().Publish(new FlowEndController.PingMessage()); - container.GetInstance().Start(c => c.StartFlow); + container.GetInstance().Start(c => c.StartFlow, true); + + Thread.Sleep(1000); var emitter = container.GetInstance(); emitter.Run().Wait(); diff --git a/appveyor.yml b/appveyor.yml new file mode 100644 index 0000000..374feac --- /dev/null +++ b/appveyor.yml @@ -0,0 +1,40 @@ +image: Visual Studio 2015 + +install: + - choco install gitversion.portable -pre -y + +before_build: + - nuget restore + - ps: gitversion /l console /output buildserver /updateAssemblyInfo + +after_build: + - cmd: ECHO nuget pack Tapeti\Tapeti.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" + - cmd: nuget pack Tapeti\Tapeti.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" + - cmd: appveyor PushArtifact "X2Software.Tapeti.%GitVersion_NuGetVersion%.nupkg" + - cmd: nuget pack Tapeti.Annotations\Tapeti.Annotations.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" + - cmd: appveyor PushArtifact "X2Software.Tapeti.Annotations.%GitVersion_NuGetVersion%.nupkg" + - cmd: nuget pack Tapeti.DataAnnotations\Tapeti.DataAnnotations.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" + - cmd: appveyor PushArtifact "X2Software.Tapeti.DataAnnotations.%GitVersion_NuGetVersion%.nupkg" + - cmd: nuget pack Tapeti.Flow\Tapeti.Flow.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" + - cmd: appveyor PushArtifact "X2Software.Tapeti.Flow.%GitVersion_NuGetVersion%.nupkg" + - cmd: nuget pack Tapeti.SimpleInjector\Tapeti.SimpleInjector.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" + - cmd: appveyor PushArtifact "X2Software.Tapeti.SimpleInjector.%GitVersion_NuGetVersion%.nupkg" + +assembly_info: + patch: false + +build: + project: Tapeti.sln + +platform: + - Any CPU + +configuration: + - Release + +deploy: + provider: NuGet + api_key: + secure: pkaN6R8ocu0Q93uCK3DOCifgr1Q4tuH4ZJ4eiV9U5NmwE5qRM2xjUy4B9SkZCsWx + skip_symbols: false + artifact: /.*\.nupkg/ \ No newline at end of file