diff --git a/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj b/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj
index 7104473..42fe49a 100644
--- a/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj
+++ b/02-DeclareDurableQueues/02-DeclareDurableQueues.csproj
@@ -13,6 +13,7 @@
+
diff --git a/03-FlowRequestResponse/03-FlowRequestResponse.csproj b/03-FlowRequestResponse/03-FlowRequestResponse.csproj
new file mode 100644
index 0000000..a864182
--- /dev/null
+++ b/03-FlowRequestResponse/03-FlowRequestResponse.csproj
@@ -0,0 +1,21 @@
+
+
+
+ Exe
+ netcoreapp2.1
+ _03_FlowRequestResponse
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/03-FlowRequestResponse/Program.cs b/03-FlowRequestResponse/Program.cs
new file mode 100644
index 0000000..48d0938
--- /dev/null
+++ b/03-FlowRequestResponse/Program.cs
@@ -0,0 +1,66 @@
+using System;
+using System.Threading.Tasks;
+using ExampleLib;
+using SimpleInjector;
+using Tapeti;
+using Tapeti.DataAnnotations;
+using Tapeti.Default;
+using Tapeti.Flow;
+using Tapeti.SimpleInjector;
+
+namespace _03_FlowRequestResponse
+{
+ public class Program
+ {
+ public static void Main(string[] args)
+ {
+ var container = new Container();
+ var dependencyResolver = new SimpleInjectorDependencyResolver(container);
+
+ container.Register();
+
+ var helper = new ExampleConsoleApp(dependencyResolver);
+ helper.Run(MainAsync);
+ }
+
+
+ internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone)
+ {
+ var config = new TapetiConfig(dependencyResolver)
+ .WithDataAnnotations()
+ .WithFlow()
+ .RegisterAllControllers()
+ .Build();
+
+
+ using (var connection = new TapetiConnection(config))
+ {
+ // Must be called before using any flow. When using a persistent repository like the
+ // SQL server implementation, you can run any required update scripts (for example, using DbUp)
+ // before calling this Load method.
+ // Call after creating the TapetiConnection, as it modifies the container to inject IPublisher.
+ await dependencyResolver.Resolve().Load();
+
+
+ // This creates or updates the durable queue
+ await connection.Subscribe();
+
+
+ var flowStarter = dependencyResolver.Resolve();
+
+ var startData = new SendingFlowController.StartData
+ {
+ RequestStartTime = DateTime.Now,
+ Amount = 1
+ };
+
+
+ await flowStarter.Start(c => c.StartFlow, startData);
+
+
+ // Wait for the controller to signal that the message has been received
+ await waitForDone();
+ }
+ }
+ }
+}
diff --git a/03-FlowRequestResponse/ReceivingMessageController.cs b/03-FlowRequestResponse/ReceivingMessageController.cs
new file mode 100644
index 0000000..46a265e
--- /dev/null
+++ b/03-FlowRequestResponse/ReceivingMessageController.cs
@@ -0,0 +1,42 @@
+using System.Threading.Tasks;
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+
+namespace _03_FlowRequestResponse
+{
+ [MessageController]
+ [DynamicQueue("tapeti.example.03")]
+ public class ReceivingMessageController
+ {
+ // No publisher required, responses can simply be returned
+ public async Task HandleQuoteRequest(QuoteRequestMessage message)
+ {
+ string quote;
+
+ switch (message.Amount)
+ {
+ case 1:
+ // Well, they asked for it... :-)
+ quote = "'";
+ break;
+
+ case 2:
+ quote = "\"";
+ break;
+
+ default:
+ // We have to return a response.
+ quote = null;
+ break;
+ }
+
+ // Just gonna let them wait for a bit, to demonstrate async message handlers
+ await Task.Delay(1000);
+
+ return new QuoteResponseMessage
+ {
+ Quote = quote
+ };
+ }
+ }
+}
diff --git a/03-FlowRequestResponse/SendingFlowController.cs b/03-FlowRequestResponse/SendingFlowController.cs
new file mode 100644
index 0000000..0678915
--- /dev/null
+++ b/03-FlowRequestResponse/SendingFlowController.cs
@@ -0,0 +1,73 @@
+using System;
+using ExampleLib;
+using Messaging.TapetiExample;
+using Tapeti.Annotations;
+using Tapeti.Flow;
+using Tapeti.Flow.Annotations;
+
+namespace _03_FlowRequestResponse
+{
+ [MessageController]
+ [DynamicQueue("tapeti.example.03")]
+ public class SendingFlowController
+ {
+ private readonly IFlowProvider flowProvider;
+ private readonly IExampleState exampleState;
+
+
+ // Shows how multiple values can be passed to a start method
+ public struct StartData
+ {
+ public DateTime RequestStartTime;
+ public int Amount;
+ }
+
+ // Private and protected fields are lost between method calls because the controller is
+ // recreated when a response arrives. When using a persistent flow repository this may
+ // even be after a restart of the application.
+ private bool nonPersistentState;
+
+
+ // Public fields will be stored.
+ public DateTime RequestStartTime;
+
+
+ public SendingFlowController(IFlowProvider flowProvider, IExampleState exampleState)
+ {
+ this.flowProvider = flowProvider;
+ this.exampleState = exampleState;
+ }
+
+
+ [Start]
+ public IYieldPoint StartFlow(StartData startData)
+ {
+ nonPersistentState = true;
+ RequestStartTime = startData.RequestStartTime;
+
+ return flowProvider.YieldWithRequestSync(
+ new QuoteRequestMessage
+ {
+ Amount = startData.Amount
+ },
+ HandleQuoteResponse);
+ }
+
+
+ [Continuation]
+ public IYieldPoint HandleQuoteResponse(QuoteResponseMessage message)
+ {
+ if (nonPersistentState)
+ Console.WriteLine("This is not supposed to show. NonPersistentState should not be retained. Someone please check http://www.hasthelargehadroncolliderdestroyedtheworldyet.com.");
+
+ Console.WriteLine("Request start: " + RequestStartTime.ToLongTimeString());
+ Console.WriteLine("Response time: " + DateTime.Now.ToLongTimeString());
+ Console.WriteLine("Quote: " + message.Quote);
+
+
+ exampleState.Done();
+
+ return flowProvider.End();
+ }
+ }
+}
diff --git a/ExampleHelper.cs/ExampleConsoleApp.cs b/ExampleHelper.cs/ExampleConsoleApp.cs
index 59e15f0..e6ff049 100644
--- a/ExampleHelper.cs/ExampleConsoleApp.cs
+++ b/ExampleHelper.cs/ExampleConsoleApp.cs
@@ -24,7 +24,7 @@ namespace ExampleLib
private readonly TaskCompletionSource doneSignal = new TaskCompletionSource();
- ///
+ /// Uses Tapeti's IDependencyContainer interface so you can easily switch an example to your favourite IoC container
public ExampleConsoleApp(IDependencyContainer dependencyResolver)
{
this.dependencyResolver = dependencyResolver;
diff --git a/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj b/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj
index 245b78a..42369b4 100644
--- a/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj
+++ b/Examples/01-PublishSubscribe/01-PublishSubscribe.csproj
@@ -13,6 +13,7 @@
+
diff --git a/Examples/01-PublishSubscribe/ExamplePublisher.cs b/Examples/01-PublishSubscribe/ExamplePublisher.cs
index 83347b7..4ebf41b 100644
--- a/Examples/01-PublishSubscribe/ExamplePublisher.cs
+++ b/Examples/01-PublishSubscribe/ExamplePublisher.cs
@@ -1,4 +1,6 @@
-using System.Threading.Tasks;
+using System;
+using System.ComponentModel.DataAnnotations;
+using System.Threading.Tasks;
using Messaging.TapetiExample;
using Tapeti;
@@ -24,6 +26,20 @@ namespace _01_PublishSubscribe
{
Greeting = "Hello world of messaging!"
});
+
+
+ // Demonstrates what happens when DataAnnotations is enabled
+ // and the message is invalid
+ try
+ {
+ await publisher.Publish(new PublishSubscribeMessage());
+
+ Console.WriteLine("This is not supposed to show. Did you disable the DataAnnotations extension?");
+ }
+ catch (ValidationException e)
+ {
+ Console.WriteLine("As expected, the DataAnnotations check failed: " + e.Message);
+ }
}
}
}
diff --git a/Examples/01-PublishSubscribe/Program.cs b/Examples/01-PublishSubscribe/Program.cs
index a010fc1..eb777d3 100644
--- a/Examples/01-PublishSubscribe/Program.cs
+++ b/Examples/01-PublishSubscribe/Program.cs
@@ -3,6 +3,7 @@ using System.Threading.Tasks;
using ExampleLib;
using SimpleInjector;
using Tapeti;
+using Tapeti.DataAnnotations;
using Tapeti.Default;
using Tapeti.SimpleInjector;
@@ -29,6 +30,7 @@ namespace _01_PublishSubscribe
internal static async Task MainAsync(IDependencyResolver dependencyResolver, Func waitForDone)
{
var config = new TapetiConfig(dependencyResolver)
+ .WithDataAnnotations()
.RegisterAllControllers()
.Build();
diff --git a/Messaging.TapetiExample/Messaging.TapetiExample.csproj b/Messaging.TapetiExample/Messaging.TapetiExample.csproj
index 56cdff2..9b27198 100644
--- a/Messaging.TapetiExample/Messaging.TapetiExample.csproj
+++ b/Messaging.TapetiExample/Messaging.TapetiExample.csproj
@@ -8,4 +8,8 @@
+
+
+
+
diff --git a/Messaging.TapetiExample/PublishSubscribeMessage.cs b/Messaging.TapetiExample/PublishSubscribeMessage.cs
index 0a14f37..b378698 100644
--- a/Messaging.TapetiExample/PublishSubscribeMessage.cs
+++ b/Messaging.TapetiExample/PublishSubscribeMessage.cs
@@ -7,7 +7,7 @@ namespace Messaging.TapetiExample
///
public class PublishSubscribeMessage
{
- [Required]
+ [Required(ErrorMessage = "Don't be impolite, supply a {0}")]
public string Greeting { get; set; }
}
}
diff --git a/Messaging.TapetiExample/QuoteRequestMessage.cs b/Messaging.TapetiExample/QuoteRequestMessage.cs
new file mode 100644
index 0000000..a67367c
--- /dev/null
+++ b/Messaging.TapetiExample/QuoteRequestMessage.cs
@@ -0,0 +1,16 @@
+using Tapeti.Annotations;
+
+namespace Messaging.TapetiExample
+{
+ [Request(Response = typeof(QuoteResponseMessage))]
+ public class QuoteRequestMessage
+ {
+ public int Amount { get; set; }
+ }
+
+
+ public class QuoteResponseMessage
+ {
+ public string Quote { get; set; }
+ }
+}
diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs
index 9d64ded..6796cc0 100644
--- a/Tapeti.Flow/Default/FlowProvider.cs
+++ b/Tapeti.Flow/Default/FlowProvider.cs
@@ -199,30 +199,43 @@ namespace Tapeti.Flow.Default
if (!(yieldPoint is DelegateYieldPoint executableYieldPoint))
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Method.Name}");
- var messageContext = context.ControllerMessageContext;
- if (messageContext == null || !messageContext.Get(ContextItems.FlowContext, out FlowContext flowContext))
- {
- flowContext = new FlowContext
- {
- HandlerContext = context
- };
-
- messageContext?.Store(ContextItems.FlowContext, flowContext);
- }
+ FlowContext flowContext = null;
+ var disposeFlowContext = false;
try
{
- await executableYieldPoint.Execute(flowContext);
- }
- catch (YieldPointException e)
- {
- // Useful for debugging
- e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName;
- e.Data["Tapeti.Controller.Method"] = context.Method.Name;
- throw;
- }
+ var messageContext = context.ControllerMessageContext;
+ if (messageContext == null || !messageContext.Get(ContextItems.FlowContext, out flowContext))
+ {
+ flowContext = new FlowContext
+ {
+ HandlerContext = context
+ };
- flowContext.EnsureStoreOrDeleteIsCalled();
+ // If we ended up here it is because of a Start. No point in storing the new FlowContext
+ // in the messageContext as the yield point is the last to execute.
+ disposeFlowContext = true;
+ }
+
+ try
+ {
+ await executableYieldPoint.Execute(flowContext);
+ }
+ catch (YieldPointException e)
+ {
+ // Useful for debugging
+ e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName;
+ e.Data["Tapeti.Controller.Method"] = context.Method.Name;
+ throw;
+ }
+
+ flowContext.EnsureStoreOrDeleteIsCalled();
+ }
+ finally
+ {
+ if (disposeFlowContext)
+ flowContext.Dispose();
+ }
}
diff --git a/Tapeti.sln b/Tapeti.sln
index 90289af..c51a070 100644
--- a/Tapeti.sln
+++ b/Tapeti.sln
@@ -35,6 +35,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Messaging.TapetiExample", "
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "02-DeclareDurableQueues", "02-DeclareDurableQueues\02-DeclareDurableQueues.csproj", "{85511282-EF91-4B56-B7DC-9E8706556D6E}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "03-FlowRequestResponse", "03-FlowRequestResponse\03-FlowRequestResponse.csproj", "{463A12CE-E221-450D-ADEA-91A599612DFA}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -101,6 +103,10 @@ Global
{85511282-EF91-4B56-B7DC-9E8706556D6E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{85511282-EF91-4B56-B7DC-9E8706556D6E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{85511282-EF91-4B56-B7DC-9E8706556D6E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {463A12CE-E221-450D-ADEA-91A599612DFA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {463A12CE-E221-450D-ADEA-91A599612DFA}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {463A12CE-E221-450D-ADEA-91A599612DFA}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {463A12CE-E221-450D-ADEA-91A599612DFA}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -110,6 +116,7 @@ Global
{F3B38753-06B4-4932-84B4-A07692AD802D} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
{D24120D4-50A2-44B6-A4EA-6ADAAEBABA84} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
{85511282-EF91-4B56-B7DC-9E8706556D6E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
+ {463A12CE-E221-450D-ADEA-91A599612DFA} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B09CC2BF-B2AF-4CB6-8728-5D1D8E5C50FA}