1
0
mirror of synced 2024-11-24 19:53:10 +01:00

Merge branch 'release/0.4.0'

This commit is contained in:
Menno van Lavieren 2017-10-20 11:28:20 +02:00
commit 5bf0ed03df
49 changed files with 957 additions and 457 deletions

View File

@ -1,55 +0,0 @@
param([switch]$nopush)
function pack
{
param([string]$project)
Write-Host "Packing $($project).csproj" -Foreground Blue
NuGet.exe pack "$($project)\$($project).csproj" -Build -OutputDir publish -Version "$($version.NuGetVersion)" -Properties depversion="$($version.NuGetVersion)"
}
function push
{
param([string]$project)
Write-Host "Pushing $($project).csproj" -Foreground Blue
NuGet.exe push "publish\X2Software.$($project).$($version.NuGetVersion).nupkg" -apikey "$($nugetkey)" -Source https://www.nuget.org/api/v2/package
}
$projects = @(
"Tapeti.Annotations",
"Tapeti",
"Tapeti.DataAnnotations",
"Tapeti.Flow",
"Tapeti.SimpleInjector"
)
New-Item -Path publish -Type directory -Force | Out-Null
$version = GitVersion.exe | Out-String | ConvertFrom-Json
$nugetkey = Get-Content .nuget.apikey
Write-Host "Publishing version $($version.NuGetVersion) using API key $($nugetkey)"-Foreground Cyan
foreach ($project in $projects)
{
pack($project)
}
if ($nopush -eq $false)
{
foreach ($project in $projects)
{
push($project)
}
}
else
{
Write-Host "Skipping push" -Foreground Blue
}

View File

@ -6,3 +6,14 @@ The documentation for Tapeti is available on Read the Docs:
[Master branch](http://tapeti.readthedocs.io/en/stable/)<br /> [Master branch](http://tapeti.readthedocs.io/en/stable/)<br />
[![Documentation Status](https://readthedocs.org/projects/tapeti/badge/?version=stable)](http://tapeti.readthedocs.io/en/stable/?badge=stable) [![Documentation Status](https://readthedocs.org/projects/tapeti/badge/?version=stable)](http://tapeti.readthedocs.io/en/stable/?badge=stable)
## Builds
Builds are automatically run using AppVeyor, with the resulting packages being pushed to NuGet.
Latest build
[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti)
Master build
[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x/branch/master?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti/branch/master)

View File

@ -1,17 +1,17 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<package > <package >
<metadata> <metadata>
<id>X2Software.Tapeti.Annotations</id> <id>X2Software.Tapeti.Annotations</id>
<version>$version$</version> <version>$version$</version>
<title>$title$</title> <title>Tapeti Annotations</title>
<authors>Mark van Renswoude</authors> <authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners> <owners>Mark van Renswoude</owners>
<licenseUrl>https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE</licenseUrl> <licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<projectUrl>https://git.x2software.net/pub/tapeti</projectUrl> <projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.Annotations.png</iconUrl> <iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Annotations.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Annotations for Tapeti</description> <description>Annotations for Tapeti</description>
<copyright></copyright> <copyright></copyright>
<tags>rabbitmq tapeti</tags> <tags>rabbitmq tapeti</tags>
</metadata> </metadata>
</package> </package>

View File

@ -1,20 +1,20 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<package > <package >
<metadata> <metadata>
<id>X2Software.Tapeti.DataAnnotations</id> <id>X2Software.Tapeti.DataAnnotations</id>
<version>$version$</version> <version>$version$</version>
<title>$title$</title> <title>Tapeti DataAnnotations</title>
<authors>Mark van Renswoude</authors> <authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners> <owners>Mark van Renswoude</owners>
<licenseUrl>https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE</licenseUrl> <licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<projectUrl>https://git.x2software.net/pub/tapeti</projectUrl> <projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.DataAnnotations.png</iconUrl> <iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.DataAnnotations.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>DataAnnotations validation extension for Tapeti</description> <description>DataAnnotations validation extension for Tapeti</description>
<copyright></copyright> <copyright></copyright>
<tags>rabbitmq tapeti dataannotations</tags> <tags>rabbitmq tapeti dataannotations</tags>
<dependencies> <dependencies>
<dependency id="X2Software.Tapeti" version="[$depversion$]" /> <dependency id="X2Software.Tapeti" version="[$version$]" />
</dependencies> </dependencies>
</metadata> </metadata>
</package> </package>

View File

@ -30,7 +30,7 @@ namespace Tapeti.Flow.SQL
public void RegisterDefaults(IDependencyContainer container) public void RegisterDefaults(IDependencyContainer container)
{ {
container.RegisterDefault<IFlowRepository<Default.FlowState>>(() => new SqlConnectionFlowRepository<Default.FlowState>(connectionString, serviceId, schema)); container.RegisterDefault<IFlowRepository>(() => new SqlConnectionFlowRepository(connectionString, serviceId, schema));
} }

View File

@ -24,7 +24,7 @@ namespace Tapeti.Flow.SQL
); );
go; go;
*/ */
public class SqlConnectionFlowRepository<T> : IFlowRepository<T> public class SqlConnectionFlowRepository : IFlowRepository
{ {
private readonly string connectionString; private readonly string connectionString;
private readonly int serviceId; private readonly int serviceId;
@ -39,7 +39,7 @@ namespace Tapeti.Flow.SQL
} }
public async Task<List<KeyValuePair<Guid, T>>> GetStates() public async Task<List<KeyValuePair<Guid, T>>> GetStates<T>()
{ {
using (var connection = await GetConnection()) using (var connection = await GetConnection())
{ {
@ -69,14 +69,14 @@ namespace Tapeti.Flow.SQL
} }
public Task CreateState(Guid flowID, T state, DateTime timestamp) public Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
{ {
var stateJason = JsonConvert.SerializeObject(state); var stateJason = JsonConvert.SerializeObject(state);
throw new NotImplementedException(); throw new NotImplementedException();
} }
public Task UpdateState(Guid flowID, T state) public Task UpdateState<T>(Guid flowID, T state)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }

View File

@ -1,21 +1,21 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<package > <package >
<metadata> <metadata>
<id>X2Software.Tapeti.Flow.SQL</id> <id>X2Software.Tapeti.Flow.SQL</id>
<version>$version$</version> <version>$version$</version>
<title>$title$</title> <title>Tapeti Flow SQL</title>
<authors>Mark van Renswoude</authors> <authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners> <owners>Mark van Renswoude</owners>
<licenseUrl>https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE</licenseUrl> <licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<projectUrl>https://git.x2software.net/pub/tapeti</projectUrl> <projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.Flow.SQL.png</iconUrl> <iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.SQL.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>SQL backing repository for the Tapeti Flow package</description> <description>SQL backing repository for the Tapeti Flow package</description>
<copyright></copyright> <copyright></copyright>
<tags>rabbitmq tapeti sql</tags> <tags>rabbitmq tapeti sql</tags>
<dependencies> <dependencies>
<dependency id="X2Software.Tapeti" version="[$depversion$]" /> <dependency id="X2Software.Tapeti" version="[$version$]" />
<dependency id="X2Software.Tapeti.Flow" version="[$depversion$]" /> <dependency id="X2Software.Tapeti.Flow" version="[$version$]" />
</dependencies> </dependencies>
</metadata> </metadata>
</package> </package>

View File

@ -2,7 +2,7 @@
{ {
public static class ConfigExtensions public static class ConfigExtensions
{ {
public static TapetiConfig WithFlow(this TapetiConfig config, IFlowRepository<Default.FlowState> flowRepository = null) public static TapetiConfig WithFlow(this TapetiConfig config, IFlowRepository flowRepository = null)
{ {
config.Use(new FlowMiddleware(flowRepository)); config.Use(new FlowMiddleware(flowRepository));
return config; return config;

View File

@ -3,16 +3,13 @@ using System.Threading.Tasks;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
{ {
internal class DelegateYieldPoint : IExecutableYieldPoint internal class DelegateYieldPoint : IYieldPoint
{ {
public bool StoreState { get; }
private readonly Func<FlowContext, Task> onExecute; private readonly Func<FlowContext, Task> onExecute;
public DelegateYieldPoint(bool storeState, Func<FlowContext, Task> onExecute) public DelegateYieldPoint(Func<FlowContext, Task> onExecute)
{ {
StoreState = storeState;
this.onExecute = onExecute; this.onExecute = onExecute;
} }

View File

@ -87,7 +87,7 @@ namespace Tapeti.Flow.Default
private static Task HandleParallelResponse(IMessageContext context) private static Task HandleParallelResponse(IMessageContext context)
{ {
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(context, new StateYieldPoint(true)); return flowHandler.Execute(context, new DelegateYieldPoint((a) => Task.CompletedTask));
} }

View File

@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Flow.Default
{
public class FlowCleanupMiddleware : ICleanupMiddleware
{
public async Task Handle(IMessageContext context, HandlingResult handlingResult)
{
object flowContextObj;
if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextObj))
return;
var flowContext = (FlowContext)flowContextObj;
if (flowContext?.FlowStateLock != null)
{
if (handlingResult.ConsumeResponse == ConsumeResponse.Nack
|| handlingResult.MessageAction == MessageAction.ErrorLog)
{
await flowContext.FlowStateLock.DeleteFlowState();
}
flowContext.FlowStateLock.Dispose();
}
}
}
}

View File

@ -13,13 +13,13 @@ namespace Tapeti.Flow.Default
public Guid ContinuationID { get; set; } public Guid ContinuationID { get; set; }
public ContinuationMetadata ContinuationMetadata { get; set; } public ContinuationMetadata ContinuationMetadata { get; set; }
private bool stored; private bool storeCalled;
private bool deleteCalled;
public async Task EnsureStored() public async Task Store()
{ {
if (stored) storeCalled = true;
return;
if (MessageContext == null) throw new ArgumentNullException(nameof(MessageContext)); if (MessageContext == null) throw new ArgumentNullException(nameof(MessageContext));
if (FlowState == null) throw new ArgumentNullException(nameof(FlowState)); if (FlowState == null) throw new ArgumentNullException(nameof(FlowState));
@ -27,8 +27,20 @@ namespace Tapeti.Flow.Default
FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(MessageContext.Controller); FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(MessageContext.Controller);
await FlowStateLock.StoreFlowState(FlowState); await FlowStateLock.StoreFlowState(FlowState);
}
stored = true; public async Task Delete()
{
deleteCalled = true;
if (FlowStateLock != null)
await FlowStateLock.DeleteFlowState();
}
public void EnsureStoreOrDeleteIsCalled()
{
if (!storeCalled && !deleteCalled)
throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID);
} }
public void Dispose() public void Dispose()

View File

@ -39,8 +39,6 @@ namespace Tapeti.Flow.Default
return null; return null;
var flowStateLock = await flowStore.LockFlowState(flowID.Value); var flowStateLock = await flowStore.LockFlowState(flowID.Value);
if (flowStateLock == null)
return null;
var flowState = await flowStateLock.GetFlowState(); var flowState = await flowStateLock.GetFlowState();
if (flowState == null) if (flowState == null)

View File

@ -26,13 +26,13 @@ namespace Tapeti.Flow.Default
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler) public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler)
{ {
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo)); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
} }
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler)
{ {
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler); var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(true, context => SendRequest(context, message, responseHandlerInfo)); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
} }
public IFlowParallelRequestBuilder YieldWithParallelRequest() public IFlowParallelRequestBuilder YieldWithParallelRequest()
@ -42,18 +42,23 @@ namespace Tapeti.Flow.Default
public IYieldPoint EndWithResponse<TResponse>(TResponse message) public IYieldPoint EndWithResponse<TResponse>(TResponse message)
{ {
return new DelegateYieldPoint(false, context => SendResponse(context, message)); return new DelegateYieldPoint(context => SendResponse(context, message));
} }
public IYieldPoint End() public IYieldPoint End()
{ {
return new DelegateYieldPoint(false, EndFlow); return new DelegateYieldPoint(EndFlow);
} }
private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false) string convergeMethodName = null, bool convergeMethodTaskSync = false)
{ {
if (context.FlowState == null)
{
await CreateNewFlowState(context);
}
var continuationID = Guid.NewGuid(); var continuationID = Guid.NewGuid();
context.FlowState.Continuations.Add(continuationID, context.FlowState.Continuations.Add(continuationID,
@ -70,14 +75,18 @@ namespace Tapeti.Flow.Default
ReplyTo = responseHandlerInfo.ReplyToQueue ReplyTo = responseHandlerInfo.ReplyToQueue
}; };
await context.EnsureStored(); await context.Store();
await publisher.Publish(message, properties); await publisher.Publish(message, properties);
} }
private async Task SendResponse(FlowContext context, object message) private async Task SendResponse(FlowContext context, object message)
{ {
var reply = context.FlowState.Metadata.Reply; var reply = context.FlowState == null
? GetReply(context.MessageContext)
: context.FlowState.Metadata.Reply;
if (reply == null) if (reply == null)
throw new YieldPointException("No response is required"); throw new YieldPointException("No response is required");
@ -92,19 +101,21 @@ namespace Tapeti.Flow.Default
properties.CorrelationId = reply.CorrelationId; properties.CorrelationId = reply.CorrelationId;
// TODO disallow if replyto is not specified? // TODO disallow if replyto is not specified?
if (context.FlowState.Metadata.Reply.ReplyTo != null) if (reply.ReplyTo != null)
await publisher.PublishDirect(message, reply.ReplyTo, properties); await publisher.PublishDirect(message, reply.ReplyTo, properties);
else else
await publisher.Publish(message, properties); await publisher.Publish(message, properties);
await context.Delete();
} }
private static Task EndFlow(FlowContext context) private static async Task EndFlow(FlowContext context)
{ {
if (context.FlowState.Metadata.Reply != null) await context.Delete();
throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}");
return Task.CompletedTask; if (context.FlowState != null && context.FlowState.Metadata.Reply != null)
throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}");
} }
@ -147,11 +158,31 @@ namespace Tapeti.Flow.Default
}; };
} }
private async Task CreateNewFlowState(FlowContext flowContext)
{
var flowStore = flowContext.MessageContext.DependencyResolver.Resolve<IFlowStore>();
var flowID = Guid.NewGuid();
flowContext.FlowStateLock = await flowStore.LockFlowState(flowID);
if (flowContext.FlowStateLock == null)
throw new InvalidOperationException("Unable to lock a new flow");
flowContext.FlowState = new FlowState
{
Metadata = new FlowMetadata
{
Reply = GetReply(flowContext.MessageContext)
}
};
}
public async Task Execute(IMessageContext context, IYieldPoint yieldPoint) public async Task Execute(IMessageContext context, IYieldPoint yieldPoint)
{ {
var executableYieldPoint = yieldPoint as IExecutableYieldPoint; var executableYieldPoint = yieldPoint as DelegateYieldPoint;
var storeState = executableYieldPoint?.StoreState ?? false;
if (executableYieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Binding.Method.Name}");
FlowContext flowContext; FlowContext flowContext;
object flowContextItem; object flowContextItem;
@ -160,27 +191,10 @@ namespace Tapeti.Flow.Default
{ {
flowContext = new FlowContext flowContext = new FlowContext
{ {
MessageContext = context, MessageContext = context
FlowState = new FlowState()
}; };
if (storeState) context.Items.Add(ContextItems.FlowContext, flowContext);
{
// Initiate the flow
var flowStore = context.DependencyResolver.Resolve<IFlowStore>();
var flowID = Guid.NewGuid();
flowContext.FlowStateLock = await flowStore.LockFlowState(flowID);
if (flowContext.FlowStateLock == null)
throw new InvalidOperationException("Unable to lock a new flow");
flowContext.FlowState = await flowContext.FlowStateLock.GetFlowState();
if (flowContext.FlowState == null)
throw new InvalidOperationException("Unable to get state for new flow");
flowContext.FlowState.Metadata.Reply = GetReply(context);
}
} }
else else
flowContext = (FlowContext)flowContextItem; flowContext = (FlowContext)flowContextItem;
@ -193,19 +207,17 @@ namespace Tapeti.Flow.Default
} }
catch (YieldPointException e) catch (YieldPointException e)
{ {
var controllerName = flowContext.MessageContext.Controller.GetType().FullName; // Useful for debugging
var methodName = flowContext.MessageContext.Binding.Method.Name; e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName;
e.Data["Tapeti.Controller.Method"] = context.Binding.Method.Name;
throw new YieldPointException($"{e.Message} in controller {controllerName}, method {methodName}", e); throw;
} }
if (storeState) flowContext.EnsureStoreOrDeleteIsCalled();
await flowContext.EnsureStored();
else if (flowContext.FlowStateLock != null)
await flowContext.FlowStateLock.DeleteFlowState();
} }
private class ParallelRequestBuilder : IFlowParallelRequestBuilder private class ParallelRequestBuilder : IFlowParallelRequestBuilder
{ {
public delegate Task SendRequestFunc(FlowContext context, public delegate Task SendRequestFunc(FlowContext context,
@ -275,7 +287,7 @@ namespace Tapeti.Flow.Default
if (convergeMethod?.Method == null) if (convergeMethod?.Method == null)
throw new ArgumentNullException(nameof(convergeMethod)); throw new ArgumentNullException(nameof(convergeMethod));
return new DelegateYieldPoint(true, context => return new DelegateYieldPoint(context =>
{ {
if (convergeMethod.Method.DeclaringType != context.MessageContext.Controller.GetType()) if (convergeMethod.Method.DeclaringType != context.MessageContext.Controller.GetType())
throw new YieldPointException("Converge method must be in the same controller class"); throw new YieldPointException("Converge method must be in the same controller class");

View File

@ -10,30 +10,42 @@ namespace Tapeti.Flow.Default
public class FlowStarter : IFlowStarter public class FlowStarter : IFlowStarter
{ {
private readonly IConfig config; private readonly IConfig config;
private readonly ILogger logger;
public FlowStarter(IConfig config) public FlowStarter(IConfig config, ILogger logger)
{ {
this.config = config; this.config = config;
this.logger = logger;
} }
public Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class public Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class
{ {
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value)); return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] { });
} }
public Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class public Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class
{ {
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value); return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {});
}
public Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class
{
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] {parameter});
}
public Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class
{
return CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {parameter});
} }
private async Task CallControllerMethod<TController>(MethodInfo method, Func<object, Task<IYieldPoint>> getYieldPointResult) where TController : class private async Task CallControllerMethod<TController>(MethodInfo method, Func<object, Task<IYieldPoint>> getYieldPointResult, object[] parameters) where TController : class
{ {
var controller = config.DependencyResolver.Resolve<TController>(); var controller = config.DependencyResolver.Resolve<TController>();
var yieldPoint = await getYieldPointResult(method.Invoke(controller, new object[] {})); var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters));
var context = new MessageContext var context = new MessageContext
{ {
@ -42,7 +54,35 @@ namespace Tapeti.Flow.Default
}; };
var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(context, yieldPoint);
HandlingResultBuilder handlingResult = new HandlingResultBuilder
{
ConsumeResponse = ConsumeResponse.Nack,
};
try
{
await flowHandler.Execute(context, yieldPoint);
handlingResult.ConsumeResponse = ConsumeResponse.Ack;
}
finally
{
await RunCleanup(context, handlingResult.ToHandlingResult());
}
}
private async Task RunCleanup(MessageContext context, HandlingResult handlingResult)
{
foreach (var handler in config.CleanupMiddleware)
{
try
{
await handler.Handle(context, handlingResult);
}
catch (Exception eCleanup)
{
logger.HandlerException(eCleanup);
}
}
} }
@ -57,5 +97,17 @@ namespace Tapeti.Flow.Default
return method; return method;
} }
private static MethodInfo GetExpressionMethod<TController, TResult, TParameter>(Expression<Func<TController, Func<TParameter, TResult>>> methodSelector)
{
var callExpression = (methodSelector.Body as UnaryExpression)?.Operand as MethodCallExpression;
var targetMethodExpression = callExpression?.Object as ConstantExpression;
var method = targetMethodExpression?.Value as MethodInfo;
if (method == null)
throw new ArgumentException("Unable to determine the starting method", nameof(methodSelector));
return method;
}
} }
} }

View File

@ -25,20 +25,13 @@ namespace Tapeti.Flow.Default
} }
public void Assign(FlowState value)
{
Metadata = value.Metadata.Clone();
Data = value.Data;
Continuations = value.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value.Clone());
}
public FlowState Clone() public FlowState Clone()
{ {
var result = new FlowState(); return new FlowState {
result.Assign(this); metadata = metadata.Clone(),
Data = Data,
return result; continuations = continuations?.ToDictionary(kv => kv.Key, kv => kv.Value.Clone())
};
} }
} }

View File

@ -5,18 +5,21 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
{ {
public class FlowStore : IFlowStore public class FlowStore : IFlowStore
{ {
private static readonly ConcurrentDictionary<Guid, FlowState> FlowStates = new ConcurrentDictionary<Guid, FlowState>(); private readonly ConcurrentDictionary<Guid, FlowState> FlowStates = new ConcurrentDictionary<Guid, FlowState>();
private static readonly ConcurrentDictionary<Guid, Guid> ContinuationLookup = new ConcurrentDictionary<Guid, Guid>(); private readonly ConcurrentDictionary<Guid, Guid> ContinuationLookup = new ConcurrentDictionary<Guid, Guid>();
private readonly LockCollection<Guid> Locks = new LockCollection<Guid>(EqualityComparer<Guid>.Default);
private readonly IFlowRepository<FlowState> repository; private readonly IFlowRepository repository;
private volatile bool InUse = false;
public FlowStore(IFlowRepository<FlowState> repository) public FlowStore(IFlowRepository repository)
{ {
this.repository = repository; this.repository = repository;
} }
@ -24,10 +27,15 @@ namespace Tapeti.Flow.Default
public async Task Load() public async Task Load()
{ {
if (InUse)
throw new InvalidOperationException("Can only load the saved state once.");
InUse = true;
FlowStates.Clear(); FlowStates.Clear();
ContinuationLookup.Clear(); ContinuationLookup.Clear();
foreach (var flowStateRecord in await repository.GetStates()) foreach (var flowStateRecord in await repository.GetStates<FlowState>())
{ {
FlowStates.TryAdd(flowStateRecord.Key, flowStateRecord.Value); FlowStates.TryAdd(flowStateRecord.Key, flowStateRecord.Value);
@ -46,97 +54,76 @@ namespace Tapeti.Flow.Default
public async Task<IFlowStateLock> LockFlowState(Guid flowID) public async Task<IFlowStateLock> LockFlowState(Guid flowID)
{ {
var isNew = false; InUse = true;
var flowState = FlowStates.GetOrAdd(flowID, id =>
{
isNew = true;
return new FlowState();
});
var result = new FlowStateLock(this, flowState, flowID, isNew); var flowStatelock = new FlowStateLock(this, flowID, await Locks.GetLock(flowID));
await result.Lock(); return flowStatelock;
return result;
} }
private class FlowStateLock : IFlowStateLock private class FlowStateLock : IFlowStateLock
{ {
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1);
private readonly FlowStore owner; private readonly FlowStore owner;
private readonly FlowState flowState;
private readonly Guid flowID; private readonly Guid flowID;
private bool isNew; private volatile IDisposable flowLock;
private bool isDisposed; private FlowState flowState;
public FlowStateLock(FlowStore owner, FlowState flowState, Guid flowID, bool isNew) public FlowStateLock(FlowStore owner, Guid flowID, IDisposable flowLock)
{ {
this.owner = owner; this.owner = owner;
this.flowState = flowState;
this.flowID = flowID; this.flowID = flowID;
this.isNew = isNew; this.flowLock = flowLock;
owner.FlowStates.TryGetValue(flowID, out flowState);
} }
public Task Lock()
{
return semaphore.WaitAsync();
}
public void Dispose() public void Dispose()
{ {
lock (flowState) var l = flowLock;
{ flowLock = null;
if (!isDisposed) l?.Dispose();
{
semaphore.Release();
semaphore.Dispose();
}
isDisposed = true;
}
} }
public Guid FlowID => flowID; public Guid FlowID => flowID;
public Task<FlowState> GetFlowState() public Task<FlowState> GetFlowState()
{ {
lock (flowState) if (flowLock == null)
{ throw new ObjectDisposedException("FlowStateLock");
if (isDisposed)
throw new ObjectDisposedException("FlowStateLock");
return Task.FromResult(flowState.Clone()); return Task.FromResult(flowState?.Clone());
}
} }
public async Task StoreFlowState(FlowState newFlowState) public async Task StoreFlowState(FlowState newFlowState)
{ {
lock (flowState) if (flowLock == null)
{ throw new ObjectDisposedException("FlowStateLock");
if (isDisposed)
throw new ObjectDisposedException("FlowStateLock");
// Ensure no one has a direct reference to the protected state in the dictionary
newFlowState = newFlowState.Clone();
// Update the lookup dictionary for the ContinuationIDs
if (flowState != null)
{
foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k))) foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k)))
{ {
Guid removedValue; Guid removedValue;
ContinuationLookup.TryRemove(removedContinuation, out removedValue); owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue);
} }
foreach (var addedContinuation in newFlowState.Continuations.Where(c => !flowState.Continuations.ContainsKey(c.Key)))
{
ContinuationLookup.TryAdd(addedContinuation.Key, flowID);
}
flowState.Assign(newFlowState);
} }
foreach (var addedContinuation in newFlowState.Continuations.Where(c => flowState == null || !flowState.Continuations.ContainsKey(c.Key)))
{
owner.ContinuationLookup.TryAdd(addedContinuation.Key, flowID);
}
var isNew = flowState == null;
flowState = newFlowState;
owner.FlowStates[flowID] = newFlowState;
// Storing the flowstate in the underlying repository
if (isNew) if (isNew)
{ {
isNew = false;
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
await owner.repository.CreateState(flowID, flowState, now); await owner.repository.CreateState(flowID, flowState, now);
} }
@ -148,50 +135,27 @@ namespace Tapeti.Flow.Default
public async Task DeleteFlowState() public async Task DeleteFlowState()
{ {
lock (flowState) if (flowLock == null)
{ throw new ObjectDisposedException("FlowStateLock");
if (isDisposed)
throw new ObjectDisposedException("FlowStateLock");
if (flowState != null)
{
foreach (var removedContinuation in flowState.Continuations.Keys) foreach (var removedContinuation in flowState.Continuations.Keys)
{ {
Guid removedValue; Guid removedValue;
ContinuationLookup.TryRemove(removedContinuation, out removedValue); owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue);
} }
FlowState removedFlow; FlowState removedFlow;
FlowStates.TryRemove(flowID, out removedFlow); owner.FlowStates.TryRemove(flowID, out removedFlow);
if (flowState != null)
{
flowState = null;
await owner.repository.DeleteState(flowID);
}
} }
if (!isNew)
await owner.repository.DeleteState(flowID);
} }
} }
private static FlowStateRecord ToFlowStateRecord(Guid flowID, FlowState flowState)
{
return new FlowStateRecord
{
FlowID = flowID,
Metadata = JsonConvert.SerializeObject(flowState.Metadata),
Data = flowState.Data,
ContinuationMetadata = flowState.Continuations.ToDictionary(
kv => kv.Key,
kv => JsonConvert.SerializeObject(kv.Value))
};
}
private static FlowState ToFlowState(FlowStateRecord flowStateRecord)
{
return new FlowState
{
Metadata = JsonConvert.DeserializeObject<FlowMetadata>(flowStateRecord.Metadata),
Data = flowStateRecord.Data,
Continuations = flowStateRecord.ContinuationMetadata.ToDictionary(
kv => kv.Key,
kv => JsonConvert.DeserializeObject<ContinuationMetadata>(kv.Value))
};
}
} }
} }

View File

@ -1,10 +0,0 @@
using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
internal interface IExecutableYieldPoint : IYieldPoint
{
bool StoreState { get; }
Task Execute(FlowContext context);
}
}

View File

@ -5,19 +5,19 @@ using System.Threading.Tasks;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
{ {
public class NonPersistentFlowRepository<T> : IFlowRepository<T> public class NonPersistentFlowRepository : IFlowRepository
{ {
Task<List<KeyValuePair<Guid, T>>> IFlowRepository<T>.GetStates() Task<List<KeyValuePair<Guid, T>>> IFlowRepository.GetStates<T>()
{ {
return Task.FromResult(new List<KeyValuePair<Guid, T>>()); return Task.FromResult(new List<KeyValuePair<Guid, T>>());
} }
public Task CreateState(Guid flowID, T state, DateTime timestamp) public Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
{ {
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task UpdateState(Guid flowID, T state) public Task UpdateState<T>(Guid flowID, T state)
{ {
return Task.CompletedTask; return Task.CompletedTask;
} }

View File

@ -1,22 +0,0 @@
using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
internal class StateYieldPoint : IExecutableYieldPoint
{
public bool StoreState { get; }
public StateYieldPoint(bool storeState)
{
StoreState = storeState;
}
public async Task Execute(FlowContext context)
{
if (StoreState)
await context.EnsureStored();
}
}
}

View File

@ -0,0 +1,110 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Flow.FlowHelpers
{
public class LockCollection<T>
{
private readonly Dictionary<T, LockItem> locks;
public LockCollection(IEqualityComparer<T> comparer)
{
locks = new Dictionary<T, LockItem>(comparer);
}
public Task<IDisposable> GetLock(T key)
{
LockItem nextLi = new LockItem(locks, key);
try
{
bool continueImmediately = false;
lock (locks)
{
LockItem li;
if (!locks.TryGetValue(key, out li))
{
locks.Add(key, nextLi);
continueImmediately = true;
}
else
{
while (li.Next != null)
li = li.Next;
li.Next = nextLi;
}
}
if (continueImmediately)
nextLi.Continue();
}
catch (Exception e)
{
nextLi.Error(e);
}
return nextLi.GetTask();
}
private class LockItem : IDisposable
{
internal volatile LockItem Next;
private readonly Dictionary<T, LockItem> locks;
private readonly TaskCompletionSource<IDisposable> tcs = new TaskCompletionSource<IDisposable>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly T key;
public LockItem(Dictionary<T, LockItem> locks, T key)
{
this.locks = locks;
this.key = key;
}
internal void Continue()
{
tcs.TrySetResult(this);
}
internal void Error(Exception e)
{
tcs.SetException(e);
}
internal Task<IDisposable> GetTask()
{
return tcs.Task;
}
public void Dispose()
{
lock (locks)
{
LockItem li;
if (!locks.TryGetValue(key, out li))
return;
if (li != this)
{
// Something is wrong (comparer is not stable?), but we cannot loose the completions sources
while (li.Next != null)
li = li.Next;
li.Next = Next;
return;
}
if (Next == null)
{
locks.Remove(key);
return;
}
locks[key] = Next;
}
Next.Continue();
}
}
}
}

View File

@ -6,9 +6,9 @@ namespace Tapeti.Flow
{ {
public class FlowMiddleware : ITapetiExtension public class FlowMiddleware : ITapetiExtension
{ {
private IFlowRepository<Default.FlowState> flowRepository; private IFlowRepository flowRepository;
public FlowMiddleware(IFlowRepository<Default.FlowState> flowRepository) public FlowMiddleware(IFlowRepository flowRepository)
{ {
this.flowRepository = flowRepository; this.flowRepository = flowRepository;
} }
@ -18,13 +18,14 @@ namespace Tapeti.Flow
container.RegisterDefault<IFlowProvider, FlowProvider>(); container.RegisterDefault<IFlowProvider, FlowProvider>();
container.RegisterDefault<IFlowStarter, FlowStarter>(); container.RegisterDefault<IFlowStarter, FlowStarter>();
container.RegisterDefault<IFlowHandler, FlowProvider>(); container.RegisterDefault<IFlowHandler, FlowProvider>();
container.RegisterDefault<IFlowRepository<FlowState>>(() => flowRepository ?? new NonPersistentFlowRepository<Default.FlowState>()); container.RegisterDefaultSingleton<IFlowRepository>(() => flowRepository ?? new NonPersistentFlowRepository());
container.RegisterDefault<IFlowStore, FlowStore>(); container.RegisterDefaultSingleton<IFlowStore, FlowStore>();
} }
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver) public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
{ {
return new[] { new FlowBindingMiddleware() }; yield return new FlowBindingMiddleware();
yield return new FlowCleanupMiddleware();
} }
} }
} }

View File

@ -27,6 +27,8 @@ namespace Tapeti.Flow
{ {
Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class; Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class;
Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class; Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class;
Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class;
Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class;
} }
/// <summary> /// <summary>

View File

@ -5,11 +5,11 @@ using System.Threading.Tasks;
namespace Tapeti.Flow namespace Tapeti.Flow
{ {
public interface IFlowRepository<T> public interface IFlowRepository
{ {
Task<List<KeyValuePair<Guid, T>>> GetStates(); Task<List<KeyValuePair<Guid, T>>> GetStates<T>();
Task CreateState(Guid flowID, T state, DateTime timestamp); Task CreateState<T>(Guid flowID, T state, DateTime timestamp);
Task UpdateState(Guid flowID, T state); Task UpdateState<T>(Guid flowID, T state);
Task DeleteState(Guid flowID); Task DeleteState(Guid flowID);
} }

View File

@ -54,17 +54,17 @@
<Compile Include="Annotations\ContinuationAttribute.cs" /> <Compile Include="Annotations\ContinuationAttribute.cs" />
<Compile Include="Annotations\StartAttribute.cs" /> <Compile Include="Annotations\StartAttribute.cs" />
<Compile Include="ContextItems.cs" /> <Compile Include="ContextItems.cs" />
<Compile Include="Default\FlowCleanupMiddleware.cs" />
<Compile Include="Default\FlowMessageFilterMiddleware.cs" /> <Compile Include="Default\FlowMessageFilterMiddleware.cs" />
<Compile Include="Default\FlowBindingMiddleware.cs" /> <Compile Include="Default\FlowBindingMiddleware.cs" />
<Compile Include="Default\FlowContext.cs" /> <Compile Include="Default\FlowContext.cs" />
<Compile Include="Default\FlowMessageMiddleware.cs" /> <Compile Include="Default\FlowMessageMiddleware.cs" />
<Compile Include="Default\FlowStarter.cs" /> <Compile Include="Default\FlowStarter.cs" />
<Compile Include="Default\FlowState.cs" /> <Compile Include="Default\FlowState.cs" />
<Compile Include="Default\IExecutableYieldPoint.cs" />
<Compile Include="Default\NonPersistentFlowRepository.cs" /> <Compile Include="Default\NonPersistentFlowRepository.cs" />
<Compile Include="Default\DelegateYieldPoint.cs" /> <Compile Include="Default\DelegateYieldPoint.cs" />
<Compile Include="ConfigExtensions.cs" /> <Compile Include="ConfigExtensions.cs" />
<Compile Include="Default\StateYieldPoint.cs" /> <Compile Include="FlowHelpers\LockCollection.cs" />
<Compile Include="FlowHelpers\MethodSerializer.cs" /> <Compile Include="FlowHelpers\MethodSerializer.cs" />
<Compile Include="FlowMiddleware.cs" /> <Compile Include="FlowMiddleware.cs" />
<Compile Include="Default\FlowStore.cs" /> <Compile Include="Default\FlowStore.cs" />

View File

@ -1,21 +1,21 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<package > <package >
<metadata> <metadata>
<id>X2Software.Tapeti.Flow</id> <id>X2Software.Tapeti.Flow</id>
<version>$version$</version> <version>$version$</version>
<title>$title$</title> <title>Tapeti Flow</title>
<authors>Menno van Lavieren, Mark van Renswoude</authors> <authors>Menno van Lavieren, Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners> <owners>Mark van Renswoude</owners>
<licenseUrl>https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE</licenseUrl> <licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<projectUrl>https://git.x2software.net/pub/tapeti</projectUrl> <projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.Flow.png</iconUrl> <iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Flow extension for Tapeti</description> <description>Flow extension for Tapeti</description>
<copyright></copyright> <copyright></copyright>
<tags>rabbitmq tapeti flow</tags> <tags>rabbitmq tapeti flow</tags>
<dependencies> <dependencies>
<dependency id="X2Software.Tapeti" version="[$depversion$]" /> <dependency id="X2Software.Tapeti" version="[$version$]" />
<dependency id="X2Software.Tapeti.Annotations" version="[$depversion$]" /> <dependency id="X2Software.Tapeti.Annotations" version="[$version$]" />
</dependencies> </dependencies>
</metadata> </metadata>
</package> </package>

View File

@ -1,20 +1,20 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<package > <package >
<metadata> <metadata>
<id>X2Software.Tapeti.SimpleInjector</id> <id>X2Software.Tapeti.SimpleInjector</id>
<version>$version$</version> <version>$version$</version>
<title>$title$</title> <title>Tapeti SimpleInjector</title>
<authors>Mark van Renswoude</authors> <authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners> <owners>Mark van Renswoude</owners>
<licenseUrl>https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE</licenseUrl> <licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<projectUrl>https://git.x2software.net/pub/tapeti</projectUrl> <projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.SimpleInjector.png</iconUrl> <iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>SimpleInjector integration package for Tapeti</description> <description>SimpleInjector integration package for Tapeti</description>
<copyright></copyright> <copyright></copyright>
<tags>rabbitmq tapeti simpleinjector</tags> <tags>rabbitmq tapeti simpleinjector</tags>
<dependencies> <dependencies>
<dependency id="X2Software.Tapeti" version="[$depversion$]" /> <dependency id="X2Software.Tapeti" version="[$version$]" />
</dependencies> </dependencies>
</metadata> </metadata>
</package> </package>

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Config
{
public interface ICleanupMiddleware
{
Task Handle(IMessageContext context, HandlingResult handlingResult);
}
}

View File

@ -9,6 +9,7 @@ namespace Tapeti.Config
{ {
IDependencyResolver DependencyResolver { get; } IDependencyResolver DependencyResolver { get; }
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; } IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
IReadOnlyList<ICleanupMiddleware> CleanupMiddleware { get; }
IReadOnlyList<IPublishMiddleware> PublishMiddleware { get; } IReadOnlyList<IPublishMiddleware> PublishMiddleware { get; }
IEnumerable<IQueue> Queues { get; } IEnumerable<IQueue> Queues { get; }

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Config
{
public interface IExceptionStrategyContext
{
IMessageContext MessageContext { get; }
Exception Exception { get; }
HandlingResultBuilder HandlingResult { get; set; }
}
}

View File

@ -16,18 +16,23 @@ namespace Tapeti.Connection
private readonly string queueName; private readonly string queueName;
private readonly IDependencyResolver dependencyResolver; private readonly IDependencyResolver dependencyResolver;
private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware; private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware;
private readonly IReadOnlyList<ICleanupMiddleware> cleanupMiddleware;
private readonly List<IBinding> bindings; private readonly List<IBinding> bindings;
private readonly ILogger logger;
private readonly IExceptionStrategy exceptionStrategy; private readonly IExceptionStrategy exceptionStrategy;
public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable<IBinding> bindings, IReadOnlyList<IMessageMiddleware> messageMiddleware) public TapetiConsumer(TapetiWorker worker, string queueName, IDependencyResolver dependencyResolver, IEnumerable<IBinding> bindings, IReadOnlyList<IMessageMiddleware> messageMiddleware, IReadOnlyList<ICleanupMiddleware> cleanupMiddleware)
{ {
this.worker = worker; this.worker = worker;
this.queueName = queueName; this.queueName = queueName;
this.dependencyResolver = dependencyResolver; this.dependencyResolver = dependencyResolver;
this.messageMiddleware = messageMiddleware; this.messageMiddleware = messageMiddleware;
this.cleanupMiddleware = cleanupMiddleware;
this.bindings = bindings.ToList(); this.bindings = bindings.ToList();
logger = dependencyResolver.Resolve<ILogger>();
exceptionStrategy = dependencyResolver.Resolve<IExceptionStrategy>(); exceptionStrategy = dependencyResolver.Resolve<IExceptionStrategy>();
} }
@ -35,59 +40,138 @@ namespace Tapeti.Connection
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IBasicProperties properties, byte[] body) IBasicProperties properties, byte[] body)
{ {
ExceptionDispatchInfo exception = null; Task.Run(async () =>
try
{ {
var message = dependencyResolver.Resolve<IMessageSerializer>().Deserialize(body, properties); ExceptionDispatchInfo exception = null;
if (message == null) MessageContext context = null;
throw new ArgumentException("Empty message"); HandlingResult handlingResult = null;
try
var validMessageType = false;
using (var context = new MessageContext
{
DependencyResolver = dependencyResolver,
Queue = queueName,
RoutingKey = routingKey,
Message = message,
Properties = properties
})
{ {
try try
{ {
foreach (var binding in bindings) context = new MessageContext
{ {
if (binding.Accept(context, message)) DependencyResolver = dependencyResolver,
{ Queue = queueName,
InvokeUsingBinding(context, binding, message); RoutingKey = routingKey,
Properties = properties
};
validMessageType = true; await DispatchMesage(context, body);
}
}
if (!validMessageType) handlingResult = new HandlingResult
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); {
ConsumeResponse = ConsumeResponse.Ack,
worker.Respond(deliveryTag, ConsumeResponse.Ack); MessageAction = MessageAction.None
};
} }
catch (Exception e) catch (Exception eDispatch)
{ {
exception = ExceptionDispatchInfo.Capture(UnwrapException(e)); exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch));
worker.Respond(deliveryTag, exceptionStrategy.HandleException(context, exception.SourceException)); logger.HandlerException(eDispatch);
try
{
var exceptionStrategyContext = new ExceptionStrategyContext(context, exception.SourceException);
exceptionStrategy.HandleException(exceptionStrategyContext);
handlingResult = exceptionStrategyContext.HandlingResult.ToHandlingResult();
}
catch (Exception eStrategy)
{
logger.HandlerException(eStrategy);
}
}
try
{
if (handlingResult == null)
{
handlingResult = new HandlingResult
{
ConsumeResponse = ConsumeResponse.Nack,
MessageAction = MessageAction.None
};
}
await RunCleanup(context, handlingResult);
}
catch (Exception eCleanup)
{
logger.HandlerException(eCleanup);
} }
} }
} finally
catch (Exception e) {
{ try
exception = ExceptionDispatchInfo.Capture(UnwrapException(e)); {
worker.Respond(deliveryTag, exceptionStrategy.HandleException(null, exception.SourceException)); if (handlingResult == null)
} {
handlingResult = new HandlingResult
exception?.Throw(); {
ConsumeResponse = ConsumeResponse.Nack,
MessageAction = MessageAction.None
};
}
await worker.Respond(deliveryTag, handlingResult.ConsumeResponse);
}
catch (Exception eRespond)
{
logger.HandlerException(eRespond);
}
try
{
if (context != null)
{
context.Dispose();
}
}
catch (Exception eDispose)
{
logger.HandlerException(eDispose);
}
}
});
} }
private async Task RunCleanup(MessageContext context, HandlingResult handlingResult)
{
foreach(var handler in cleanupMiddleware)
{
try
{
await handler.Handle(context, handlingResult);
}
catch (Exception eCleanup)
{
logger.HandlerException(eCleanup);
}
}
}
private void InvokeUsingBinding(MessageContext context, IBinding binding, object message) private async Task DispatchMesage(MessageContext context, byte[] body)
{
var message = dependencyResolver.Resolve<IMessageSerializer>().Deserialize(body, context.Properties);
if (message == null)
throw new ArgumentException("Empty message");
context.Message = message;
var validMessageType = false;
foreach (var binding in bindings)
{
if (binding.Accept(context, message))
{
await InvokeUsingBinding(context, binding, message);
validMessageType = true;
}
}
if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
}
private Task InvokeUsingBinding(MessageContext context, IBinding binding, object message)
{ {
context.Binding = binding; context.Binding = binding;
@ -136,9 +220,7 @@ namespace Tapeti.Connection
await binding.Invoke(c, message); await binding.Invoke(c, message);
}); });
firstCaller.Call(context) return firstCaller.Call(context);
.Wait();
} }
private static Exception UnwrapException(Exception exception) private static Exception UnwrapException(Exception exception)

View File

@ -57,7 +57,7 @@ namespace Tapeti.Connection
return taskQueue.Value.Add(async () => return taskQueue.Value.Add(async () =>
{ {
(await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware)); (await GetChannel()).BasicConsume(queueName, false, new TapetiConsumer(this, queueName, config.DependencyResolver, bindings, config.MessageMiddleware, config.CleanupMiddleware));
}).Unwrap(); }).Unwrap();
} }

View File

@ -1,4 +1,6 @@
namespace Tapeti.Default using System;
namespace Tapeti.Default
{ {
public class ConsoleLogger : ILogger public class ConsoleLogger : ILogger
{ {
@ -16,5 +18,10 @@
{ {
throw new System.NotImplementedException(); throw new System.NotImplementedException();
} }
public void HandlerException(Exception e)
{
Console.WriteLine(e.ToString());
}
} }
} }

View File

@ -1,4 +1,6 @@
namespace Tapeti.Default using System;
namespace Tapeti.Default
{ {
public class DevNullLogger : ILogger public class DevNullLogger : ILogger
{ {
@ -13,5 +15,9 @@
public void ConnectSuccess(TapetiConnectionParams connectionParams) public void ConnectSuccess(TapetiConnectionParams connectionParams)
{ {
} }
public void HandlerException(Exception e)
{
}
} }
} }

View File

@ -0,0 +1,40 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Default
{
public class ExceptionStrategyContext : IExceptionStrategyContext
{
internal ExceptionStrategyContext(IMessageContext messageContext, Exception exception)
{
MessageContext = messageContext;
Exception = exception;
}
public IMessageContext MessageContext { get; }
public Exception Exception { get; }
private HandlingResultBuilder handlingResult;
public HandlingResultBuilder HandlingResult
{
get
{
if (handlingResult == null)
{
handlingResult = new HandlingResultBuilder();
}
return handlingResult;
}
set
{
handlingResult = value;
}
}
}
}

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Default
{
public class NackExceptionStrategy : IExceptionStrategy
{
public void HandleException(IExceptionStrategyContext context)
{
context.HandlingResult.ConsumeResponse = ConsumeResponse.Nack;
}
}
}

View File

@ -5,10 +5,9 @@ namespace Tapeti.Default
{ {
public class RequeueExceptionStrategy : IExceptionStrategy public class RequeueExceptionStrategy : IExceptionStrategy
{ {
public ConsumeResponse HandleException(IMessageContext context, Exception exception) public void HandleException(IExceptionStrategyContext context)
{ {
// TODO log exception context.HandlingResult.ConsumeResponse = ConsumeResponse.Requeue;
return ConsumeResponse.Requeue;
} }
} }
} }

79
Tapeti/HandlingResult.cs Normal file
View File

@ -0,0 +1,79 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti
{
public class HandlingResult
{
public HandlingResult()
{
ConsumeResponse = ConsumeResponse.Nack;
MessageAction = MessageAction.None;
}
/// <summary>
/// Determines which response will be given to the message bus from where the message originates.
/// </summary>
public ConsumeResponse ConsumeResponse { get; internal set; }
/// <summary>
/// Registers which action the Exception strategy has taken or will take to handle the error condition
/// on the message. This is important to know for cleanup handlers registered by middleware.
/// </summary>
public MessageAction MessageAction { get; internal set; }
}
public class HandlingResultBuilder
{
private static readonly HandlingResult Default = new HandlingResult();
private HandlingResult data = Default;
public ConsumeResponse ConsumeResponse {
get
{
return data.ConsumeResponse;
}
set
{
GetWritableData().ConsumeResponse = value;
}
}
public MessageAction MessageAction
{
get
{
return data.MessageAction;
}
set
{
GetWritableData().MessageAction = value;
}
}
public HandlingResult ToHandlingResult()
{
if (data == Default)
{
return new HandlingResult();
}
var result = GetWritableData();
data = Default;
return result;
}
private HandlingResult GetWritableData()
{
if (data == Default)
{
data = new HandlingResult();
}
return data;
}
}
}

View File

@ -8,9 +8,9 @@ namespace Tapeti
/// <summary> /// <summary>
/// Called when an exception occurs while handling a message. /// Called when an exception occurs while handling a message.
/// </summary> /// </summary>
/// <param name="context">The message context if available. May be null!</param> /// <param name="context">The exception strategy context containing the necessary data including the message context and the thrown exception.
/// <param name="exception">The exception instance</param> /// Also the response to the message can be set.
/// <returns>The ConsumeResponse to determine whether to requeue, dead-letter (nack) or simply ack the message.</returns> /// If there is any other handling of the message than the expected default than HandlingResult.MessageFutureAction must be set accordingly. </param>
ConsumeResponse HandleException(IMessageContext context, Exception exception); void HandleException(IExceptionStrategyContext context);
} }
} }

View File

@ -1,4 +1,6 @@
namespace Tapeti using System;
namespace Tapeti
{ {
// This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog) // This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog)
// instead of only string-based logging without control over the output. // instead of only string-based logging without control over the output.
@ -7,5 +9,6 @@
void Connect(TapetiConnectionParams connectionParams); void Connect(TapetiConnectionParams connectionParams);
void ConnectFailed(TapetiConnectionParams connectionParams); void ConnectFailed(TapetiConnectionParams connectionParams);
void ConnectSuccess(TapetiConnectionParams connectionParams); void ConnectSuccess(TapetiConnectionParams connectionParams);
void HandlerException(Exception e);
} }
} }

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti
{
public enum MessageAction
{
None = 1,
ErrorLog = 2,
Retry = 3,
}
}

View File

@ -53,6 +53,8 @@
<Reference Include="System.Xml" /> <Reference Include="System.Xml" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="Config\IExceptionStrategyContext.cs" />
<Compile Include="Config\ICleanupMiddleware.cs" />
<Compile Include="Config\IPublishContext.cs" /> <Compile Include="Config\IPublishContext.cs" />
<Compile Include="Config\IMessageFilterMiddleware.cs" /> <Compile Include="Config\IMessageFilterMiddleware.cs" />
<Compile Include="Config\IPublishMiddleware.cs" /> <Compile Include="Config\IPublishMiddleware.cs" />
@ -63,6 +65,9 @@
<Compile Include="Connection\TapetiWorker.cs" /> <Compile Include="Connection\TapetiWorker.cs" />
<Compile Include="Default\ConsoleLogger.cs" /> <Compile Include="Default\ConsoleLogger.cs" />
<Compile Include="Default\DevNullLogger.cs" /> <Compile Include="Default\DevNullLogger.cs" />
<Compile Include="Default\ExceptionStrategyContext.cs" />
<Compile Include="Default\NackExceptionStrategy.cs" />
<Compile Include="HandlingResult.cs" />
<Compile Include="Default\JsonMessageSerializer.cs" /> <Compile Include="Default\JsonMessageSerializer.cs" />
<Compile Include="Default\MessageContext.cs" /> <Compile Include="Default\MessageContext.cs" />
<Compile Include="Default\PublishResultBinding.cs" /> <Compile Include="Default\PublishResultBinding.cs" />
@ -83,6 +88,7 @@
<Compile Include="Config\IConfig.cs" /> <Compile Include="Config\IConfig.cs" />
<Compile Include="MessageController.cs" /> <Compile Include="MessageController.cs" />
<Compile Include="Config\IBindingMiddleware.cs" /> <Compile Include="Config\IBindingMiddleware.cs" />
<Compile Include="MessageFutureAction.cs" />
<Compile Include="TapetiAppSettingsConnectionParams.cs" /> <Compile Include="TapetiAppSettingsConnectionParams.cs" />
<Compile Include="TapetiConnectionParams.cs" /> <Compile Include="TapetiConnectionParams.cs" />
<Compile Include="TapetiConfig.cs" /> <Compile Include="TapetiConfig.cs" />

View File

@ -1,20 +1,20 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<package > <package >
<metadata> <metadata>
<id>X2Software.Tapeti</id> <id>X2Software.Tapeti</id>
<version>$version$</version> <version>$version$</version>
<title>$title$</title> <title>Tapeti</title>
<authors>Mark van Renswoude</authors> <authors>Mark van Renswoude</authors>
<owners>Mark van Renswoude</owners> <owners>Mark van Renswoude</owners>
<licenseUrl>https://git.x2software.net/pub/tapeti/raw/master/UNLICENSE</licenseUrl> <licenseUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE</licenseUrl>
<projectUrl>https://git.x2software.net/pub/tapeti</projectUrl> <projectUrl>https://github.com/MvRens/Tapeti</projectUrl>
<iconUrl>https://git.x2software.net/pub/tapeti/raw/master/resources/icons/Tapeti.png</iconUrl> <iconUrl>https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance> <requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Controller-based framework for RabbitMQ microservice architectures</description> <description>Controller-based framework for RabbitMQ microservice architectures</description>
<copyright></copyright> <copyright></copyright>
<tags>rabbitmq tapeti</tags> <tags>rabbitmq tapeti</tags>
<dependencies> <dependencies>
<dependency id="X2Software.Tapeti.Annotations" version="[$depversion$]" /> <dependency id="X2Software.Tapeti.Annotations" version="[$version$]" />
</dependencies> </dependencies>
</metadata> </metadata>
</package> </package>

View File

@ -26,6 +26,7 @@ namespace Tapeti
private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>(); private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>();
private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>(); private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>();
private readonly List<ICleanupMiddleware> cleanupMiddleware = new List<ICleanupMiddleware>();
private readonly List<IPublishMiddleware> publishMiddleware = new List<IPublishMiddleware>(); private readonly List<IPublishMiddleware> publishMiddleware = new List<IPublishMiddleware>();
private readonly IDependencyResolver dependencyResolver; private readonly IDependencyResolver dependencyResolver;
@ -62,7 +63,7 @@ namespace Tapeti
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl))); queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl)));
var config = new Config(dependencyResolver, messageMiddleware, publishMiddleware, queues); var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues);
(dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<IConfig>(config); (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<IConfig>(config);
return config; return config;
@ -83,6 +84,13 @@ namespace Tapeti
} }
public TapetiConfig Use(ICleanupMiddleware handler)
{
cleanupMiddleware.Add(handler);
return this;
}
public TapetiConfig Use(IPublishMiddleware handler) public TapetiConfig Use(IPublishMiddleware handler)
{ {
publishMiddleware.Add(handler); publishMiddleware.Add(handler);
@ -108,6 +116,8 @@ namespace Tapeti
Use((IBindingMiddleware)middleware); Use((IBindingMiddleware)middleware);
else if (middleware is IMessageMiddleware) else if (middleware is IMessageMiddleware)
Use((IMessageMiddleware)middleware); Use((IMessageMiddleware)middleware);
else if (middleware is ICleanupMiddleware)
Use((ICleanupMiddleware)middleware);
else if (middleware is IPublishMiddleware) else if (middleware is IPublishMiddleware)
Use((IPublishMiddleware)middleware); Use((IPublishMiddleware)middleware);
else else
@ -133,7 +143,7 @@ namespace Tapeti
container.RegisterDefault<IMessageSerializer, JsonMessageSerializer>(); container.RegisterDefault<IMessageSerializer, JsonMessageSerializer>();
container.RegisterDefault<IExchangeStrategy, NamespaceMatchExchangeStrategy>(); container.RegisterDefault<IExchangeStrategy, NamespaceMatchExchangeStrategy>();
container.RegisterDefault<IRoutingKeyStrategy, TypeNameRoutingKeyStrategy>(); container.RegisterDefault<IRoutingKeyStrategy, TypeNameRoutingKeyStrategy>();
container.RegisterDefault<IExceptionStrategy, RequeueExceptionStrategy>(); container.RegisterDefault<IExceptionStrategy, NackExceptionStrategy>();
} }
@ -345,16 +355,18 @@ namespace Tapeti
{ {
public IDependencyResolver DependencyResolver { get; } public IDependencyResolver DependencyResolver { get; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; } public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
public IReadOnlyList<ICleanupMiddleware> CleanupMiddleware { get; }
public IReadOnlyList<IPublishMiddleware> PublishMiddleware { get; } public IReadOnlyList<IPublishMiddleware> PublishMiddleware { get; }
public IEnumerable<IQueue> Queues { get; } public IEnumerable<IQueue> Queues { get; }
private readonly Dictionary<MethodInfo, IBinding> bindingMethodLookup; private readonly Dictionary<MethodInfo, IBinding> bindingMethodLookup;
public Config(IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware, IReadOnlyList<IPublishMiddleware> publishMiddleware, IEnumerable<IQueue> queues) public Config(IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware, IReadOnlyList<ICleanupMiddleware> cleanupMiddleware, IReadOnlyList<IPublishMiddleware> publishMiddleware, IEnumerable<IQueue> queues)
{ {
DependencyResolver = dependencyResolver; DependencyResolver = dependencyResolver;
MessageMiddleware = messageMiddleware; MessageMiddleware = messageMiddleware;
CleanupMiddleware = cleanupMiddleware;
PublishMiddleware = publishMiddleware; PublishMiddleware = publishMiddleware;
Queues = queues.ToList(); Queues = queues.ToList();

View File

@ -1,6 +1,7 @@
using System; using System;
using Tapeti.Annotations; using Tapeti.Annotations;
using Tapeti.Flow; using Tapeti.Flow;
using Tapeti.Flow.Annotations;
namespace Test namespace Test
{ {
@ -17,7 +18,25 @@ namespace Test
public IYieldPoint StartFlow(PingMessage message) public IYieldPoint StartFlow(PingMessage message)
{ {
Console.WriteLine("PingMessage received, call flowProvider.End()"); Console.WriteLine("PingMessage received, calling flowProvider.End() directly");
if (DateTime.Now < new DateTime(2000, 1, 1))
{
//never true
return flowProvider
.YieldWithRequestSync<PingConfirmationRequestMessage, PingConfirmationResponseMessage>
(new PingConfirmationRequestMessage() { StoredInState = "Ping:" },
HandlePingConfirmationResponse);
}
return Finish();
}
[Continuation]
public IYieldPoint HandlePingConfirmationResponse(PingConfirmationResponseMessage msg)
{
Console.WriteLine("Ending ping flow: " + msg.Answer);
return Finish(); return Finish();
} }
@ -33,5 +52,26 @@ namespace Test
} }
[Request(Response = typeof(PingConfirmationResponseMessage))]
public class PingConfirmationRequestMessage
{
public string StoredInState { get; set; }
}
public class PingConfirmationResponseMessage
{
public string Answer { get; set; }
}
public PingConfirmationResponseMessage PingConfirmation(PingConfirmationRequestMessage message)
{
Console.WriteLine(">> receive Ping (returning pong)");
return new PingConfirmationResponseMessage
{
Answer = message.StoredInState + " Pong!"
};
}
} }
} }

View File

@ -19,6 +19,7 @@ namespace Test
// Public properties are automatically stored and retrieved while in a flow // Public properties are automatically stored and retrieved while in a flow
public Guid StateTestGuid { get; set; } public Guid StateTestGuid { get; set; }
public int Phase;
public MarcoController(IPublisher publisher, IFlowProvider flowProvider, Visualizer visualizer) public MarcoController(IPublisher publisher, IFlowProvider flowProvider, Visualizer visualizer)
{ {
@ -29,21 +30,40 @@ namespace Test
[Start] [Start]
public async Task<IYieldPoint> StartFlow() public async Task<IYieldPoint> StartFlow(bool go)
{ {
Console.WriteLine("Starting stand-alone flow"); Console.WriteLine("Phase = " + Phase + " Starting stand-alone flow");
await Task.Delay(1000); await Task.Delay(10);
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage> Phase = 1;
(new PoloConfirmationRequestMessage(),
HandlePoloConfirmationResponse); if (go)
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>
(new PoloConfirmationRequestMessage(),
HandlePoloConfirmationResponse);
Console.WriteLine("Phase = " + Phase + " Ending stand-alone flow prematurely");
return flowProvider.End();
} }
[Continuation] [Continuation]
public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage msg) public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage msg)
{ {
Console.WriteLine("Ending stand-alone flow"); Console.WriteLine("Phase = " + Phase + " Handling the first response and sending the second...");
Phase = 2;
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>
(new PoloConfirmationRequestMessage(),
HandlePoloConfirmationResponseEnd);
}
[Continuation]
public IYieldPoint HandlePoloConfirmationResponseEnd(PoloConfirmationResponseMessage msg)
{
Console.WriteLine("Phase = " + Phase + " Handling the second response and Ending stand-alone flow");
return flowProvider.End(); return flowProvider.End();
} }

View File

@ -17,7 +17,7 @@ namespace Test
public async Task Run() public async Task Run()
{ {
await publisher.Publish(new MarcoMessage()); //await publisher.Publish(new MarcoMessage());
/* /*
var concurrent = new SemaphoreSlim(20); var concurrent = new SemaphoreSlim(20);

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Threading.Tasks;
using SimpleInjector; using SimpleInjector;
using Tapeti; using Tapeti;
using Tapeti.DataAnnotations; using Tapeti.DataAnnotations;
@ -6,6 +7,7 @@ using Tapeti.Flow;
using Tapeti.Flow.SQL; using Tapeti.Flow.SQL;
using Tapeti.Helpers; using Tapeti.Helpers;
using Tapeti.SimpleInjector; using Tapeti.SimpleInjector;
using System.Threading;
namespace Test namespace Test
{ {
@ -20,8 +22,7 @@ namespace Test
var container = new Container(); var container = new Container();
container.Register<MarcoEmitter>(); container.Register<MarcoEmitter>();
container.Register<Visualizer>(); container.Register<Visualizer>();
container.Register<ILogger, Tapeti.Default.ConsoleLogger>();
//container.Register<IFlowRepository>(() => new EF(serviceID));
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
.WithFlow() .WithFlow()
@ -34,6 +35,11 @@ namespace Test
Params = new TapetiAppSettingsConnectionParams() Params = new TapetiAppSettingsConnectionParams()
}) })
{ {
var flowStore = container.GetInstance<IFlowStore>();
var flowStore2 = container.GetInstance<IFlowStore>();
Console.WriteLine("IFlowHandler is singleton = " + (flowStore == flowStore2));
connection.Connected += (sender, e) => { connection.Connected += (sender, e) => {
Console.WriteLine("Event Connected"); Console.WriteLine("Event Connected");
}; };
@ -54,7 +60,9 @@ namespace Test
connection.GetPublisher().Publish(new FlowEndController.PingMessage()); connection.GetPublisher().Publish(new FlowEndController.PingMessage());
container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.StartFlow); container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true);
Thread.Sleep(1000);
var emitter = container.GetInstance<MarcoEmitter>(); var emitter = container.GetInstance<MarcoEmitter>();
emitter.Run().Wait(); emitter.Run().Wait();

40
appveyor.yml Normal file
View File

@ -0,0 +1,40 @@
image: Visual Studio 2015
install:
- choco install gitversion.portable -pre -y
before_build:
- nuget restore
- ps: gitversion /l console /output buildserver /updateAssemblyInfo
after_build:
- cmd: ECHO nuget pack Tapeti\Tapeti.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%"
- cmd: nuget pack Tapeti\Tapeti.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%"
- cmd: appveyor PushArtifact "X2Software.Tapeti.%GitVersion_NuGetVersion%.nupkg"
- cmd: nuget pack Tapeti.Annotations\Tapeti.Annotations.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%"
- cmd: appveyor PushArtifact "X2Software.Tapeti.Annotations.%GitVersion_NuGetVersion%.nupkg"
- cmd: nuget pack Tapeti.DataAnnotations\Tapeti.DataAnnotations.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%"
- cmd: appveyor PushArtifact "X2Software.Tapeti.DataAnnotations.%GitVersion_NuGetVersion%.nupkg"
- cmd: nuget pack Tapeti.Flow\Tapeti.Flow.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%"
- cmd: appveyor PushArtifact "X2Software.Tapeti.Flow.%GitVersion_NuGetVersion%.nupkg"
- cmd: nuget pack Tapeti.SimpleInjector\Tapeti.SimpleInjector.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%"
- cmd: appveyor PushArtifact "X2Software.Tapeti.SimpleInjector.%GitVersion_NuGetVersion%.nupkg"
assembly_info:
patch: false
build:
project: Tapeti.sln
platform:
- Any CPU
configuration:
- Release
deploy:
provider: NuGet
api_key:
secure: pkaN6R8ocu0Q93uCK3DOCifgr1Q4tuH4ZJ4eiV9U5NmwE5qRM2xjUy4B9SkZCsWx
skip_symbols: false
artifact: /.*\.nupkg/