1
0
mirror of synced 2024-11-21 17:03:50 +00:00

Merge branch 'release/2.0.4'

This commit is contained in:
Mark van Renswoude 2020-01-24 14:53:17 +01:00
commit 97a7742840
8 changed files with 361 additions and 318 deletions

View File

@ -14,7 +14,7 @@ namespace Tapeti.Cmd
[Option('h', "host", HelpText = "Hostname of the RabbitMQ server.", Default = "localhost")] [Option('h', "host", HelpText = "Hostname of the RabbitMQ server.", Default = "localhost")]
public string Host { get; set; } public string Host { get; set; }
[Option('p', "port", HelpText = "AMQP port of the RabbitMQ server.", Default = 5672)] [Option("port", HelpText = "AMQP port of the RabbitMQ server.", Default = 5672)]
public int Port { get; set; } public int Port { get; set; }
[Option('v', "virtualhost", HelpText = "Virtual host used for the RabbitMQ connection.", Default = "/")] [Option('v', "virtualhost", HelpText = "Virtual host used for the RabbitMQ connection.", Default = "/")]

View File

@ -10,6 +10,14 @@
<NoWarn>1701;1702</NoWarn> <NoWarn>1701;1702</NoWarn>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<None Remove="scripts\Flow table.sql" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="scripts\Flow table.sql" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="System.Data.SqlClient" Version="4.6.1" /> <PackageReference Include="System.Data.SqlClient" Version="4.6.1" />
</ItemGroup> </ItemGroup>

View File

@ -0,0 +1,13 @@
/*
This script is embedded in the Tapeti.Flow.SQL package so it can be used with, for example, DbUp
*/
create table Flow
(
FlowID uniqueidentifier not null,
CreationTime datetime2(3) not null,
StateJson nvarchar(max) null,
constraint PK_Flow primary key clustered(FlowID)
);

View File

@ -9,5 +9,12 @@
/// Key given to the FlowContext object as stored in the message context. /// Key given to the FlowContext object as stored in the message context.
/// </summary> /// </summary>
public const string FlowContext = "Tapeti.Flow.FlowContext"; public const string FlowContext = "Tapeti.Flow.FlowContext";
/// <summary>
/// Indicates if the current message handler is the last one to be called before a
/// parallel flow is done and the convergeMethod will be called.
/// Temporarily disables storing the flow state.
/// </summary>
public const string FlowIsConverging = "Tapeti.Flow.IsConverging";
} }
} }

View File

@ -83,6 +83,9 @@ namespace Tapeti.Flow.Default
private static Task HandleParallelResponse(IControllerMessageContext context) private static Task HandleParallelResponse(IControllerMessageContext context)
{ {
if (context.Get<object>(ContextItems.FlowIsConverging, out _))
return Task.CompletedTask;
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext => return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext =>
{ {

View File

@ -36,9 +36,14 @@ namespace Tapeti.Flow.Default
await FlowStateLock.DeleteFlowState(); await FlowStateLock.DeleteFlowState();
} }
public bool IsStoredOrDeleted()
{
return storeCalled || deleteCalled;
}
public void EnsureStoreOrDeleteIsCalled() public void EnsureStoreOrDeleteIsCalled()
{ {
if (!storeCalled && !deleteCalled) if (!IsStoredOrDeleted())
throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID); throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID);
} }

View File

@ -36,6 +36,10 @@ namespace Tapeti.Flow.Default
var converge = flowContext.FlowState.Continuations.Count == 0 && var converge = flowContext.FlowState.Continuations.Count == 0 &&
flowContext.ContinuationMetadata.ConvergeMethodName != null; flowContext.ContinuationMetadata.ConvergeMethodName != null;
if (converge)
// Indicate to the FlowBindingMiddleware that the state must not to be stored
context.Store(ContextItems.FlowIsConverging, null);
await next(); await next();
if (converge) if (converge)
@ -57,7 +61,10 @@ namespace Tapeti.Flow.Default
if (flowContext?.FlowStateLock != null) if (flowContext?.FlowStateLock != null)
{ {
if (consumeResult == ConsumeResult.Error) // TODO do not call when the controller method was filtered, if the same message has two methods
if (!flowContext.IsStoredOrDeleted())
// The exception strategy can set the consume result to Success. Instead, check if the yield point
// was handled. The flow provider ensures we only end up here in case of an exception.
await flowContext.FlowStateLock.DeleteFlowState(); await flowContext.FlowStateLock.DeleteFlowState();
flowContext.FlowStateLock.Dispose(); flowContext.FlowStateLock.Dispose();

View File

@ -184,7 +184,7 @@ namespace Tapeti.Default
{ {
await MiddlewareHelper.GoAsync( await MiddlewareHelper.GoAsync(
bindingInfo.CleanupMiddleware, bindingInfo.CleanupMiddleware,
async (handler, next) => await handler.Cleanup(context, ConsumeResult.Success, next), async (handler, next) => await handler.Cleanup(context, consumeResult, next),
() => Task.CompletedTask); () => Task.CompletedTask);
} }