diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
index 2da63e5..44bd845 100644
--- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
+++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
@@ -10,7 +10,7 @@
-
+
diff --git a/Tapeti.Flow/Annotations/ContinuationAttribute.cs b/Tapeti.Flow/Annotations/ContinuationAttribute.cs
index 8749bf8..2612a30 100644
--- a/Tapeti.Flow/Annotations/ContinuationAttribute.cs
+++ b/Tapeti.Flow/Annotations/ContinuationAttribute.cs
@@ -2,6 +2,11 @@
namespace Tapeti.Flow.Annotations
{
+ ///
+ ///
+ /// Marks a message handler as a response message handler which continues a Tapeti Flow.
+ /// The method only receives direct messages, and does not create a routing key based binding to the queue.
+ ///
[AttributeUsage(AttributeTargets.Method)]
public class ContinuationAttribute : Attribute
{
diff --git a/Tapeti.Flow/Annotations/StartAttribute.cs b/Tapeti.Flow/Annotations/StartAttribute.cs
index 3f8e767..8c1fd2e 100644
--- a/Tapeti.Flow/Annotations/StartAttribute.cs
+++ b/Tapeti.Flow/Annotations/StartAttribute.cs
@@ -3,6 +3,11 @@ using JetBrains.Annotations;
namespace Tapeti.Flow.Annotations
{
+ ///
+ ///
+ /// Marks this method as the start of a Tapeti Flow. Use IFlowStarter.Start to begin a new flow and
+ /// call this method. Must return an IYieldPoint.
+ ///
[AttributeUsage(AttributeTargets.Method)]
[MeansImplicitUse]
public class StartAttribute : Attribute
diff --git a/Tapeti.Flow/ConfigExtensions.cs b/Tapeti.Flow/ConfigExtensions.cs
index e016172..474da6a 100644
--- a/Tapeti.Flow/ConfigExtensions.cs
+++ b/Tapeti.Flow/ConfigExtensions.cs
@@ -2,11 +2,20 @@
namespace Tapeti.Flow
{
+ ///
+ /// ITapetiConfigBuilder extension for enabling Flow.
+ ///
public static class ConfigExtensions
{
+ ///
+ /// Enables Tapeti Flow.
+ ///
+ ///
+ /// An optional IFlowRepository implementation to persist flow state. If not provided, flow state will be lost when the application restarts.
+ ///
public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository flowRepository = null)
{
- config.Use(new FlowMiddleware(flowRepository));
+ config.Use(new FlowExtension(flowRepository));
return config;
}
}
diff --git a/Tapeti.Flow/ContextItems.cs b/Tapeti.Flow/ContextItems.cs
index 452de54..8bebc23 100644
--- a/Tapeti.Flow/ContextItems.cs
+++ b/Tapeti.Flow/ContextItems.cs
@@ -1,7 +1,13 @@
namespace Tapeti.Flow
{
+ ///
+ /// Key names as used in the message context store. For internal use.
+ ///
public static class ContextItems
{
+ ///
+ /// Key given to the FlowContext object as stored in the message context.
+ ///
public const string FlowContext = "Tapeti.Flow.FlowContext";
}
}
diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs
index 7a65642..00f8469 100644
--- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs
+++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs
@@ -15,9 +15,6 @@ namespace Tapeti.Flow.Default
if (context.Method.GetCustomAttribute() != null)
return;
- if (context.Method.GetCustomAttribute() != null)
- context.SetBindingTargetMode(BindingTargetMode.Direct);
-
RegisterYieldPointResult(context);
RegisterContinuationFilter(context);
@@ -33,7 +30,8 @@ namespace Tapeti.Flow.Default
if (continuationAttribute == null)
return;
- context.Use(new FlowMiddleware());
+ context.SetBindingTargetMode(BindingTargetMode.Direct);
+ context.Use(new FlowContinuationMiddleware());
if (context.Result.HasHandler)
return;
diff --git a/Tapeti.Flow/Default/FlowMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs
similarity index 91%
rename from Tapeti.Flow/Default/FlowMiddleware.cs
rename to Tapeti.Flow/Default/FlowContinuationMiddleware.cs
index 1dd1571..d138b08 100644
--- a/Tapeti.Flow/Default/FlowMiddleware.cs
+++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs
@@ -6,11 +6,11 @@ using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default
{
- public class FlowMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware
+ public class FlowContinuationMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware
{
public async Task Filter(IControllerMessageContext context, Func next)
{
- var flowContext = await CreateFlowContext(context);
+ var flowContext = await EnrichWithFlowContext(context);
if (flowContext?.ContinuationMetadata == null)
return;
@@ -44,7 +44,7 @@ namespace Tapeti.Flow.Default
}
- public async Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next)
+ public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func next)
{
await next();
@@ -62,7 +62,7 @@ namespace Tapeti.Flow.Default
- private static async Task CreateFlowContext(IControllerMessageContext context)
+ private static async Task EnrichWithFlowContext(IControllerMessageContext context)
{
if (context.Get(ContextItems.FlowContext, out FlowContext flowContext))
return flowContext;
diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs
index 9da8a7d..fe71a05 100644
--- a/Tapeti.Flow/Default/FlowProvider.cs
+++ b/Tapeti.Flow/Default/FlowProvider.cs
@@ -12,6 +12,10 @@ using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default
{
+ /// />
+ ///
+ /// Default implementation for IFlowProvider.
+ ///
public class FlowProvider : IFlowProvider, IFlowHandler
{
private readonly ITapetiConfig config;
@@ -25,28 +29,33 @@ namespace Tapeti.Flow.Default
}
+ ///
public IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler)
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
+ ///
public IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler)
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
+ ///
public IFlowParallelRequestBuilder YieldWithParallelRequest()
{
return new ParallelRequestBuilder(config, SendRequest);
}
+ ///
public IYieldPoint EndWithResponse(TResponse message)
{
return new DelegateYieldPoint(context => SendResponse(context, message));
}
+ ///
public IYieldPoint End()
{
return new DelegateYieldPoint(EndFlow);
@@ -179,6 +188,7 @@ namespace Tapeti.Flow.Default
};
}
+ ///
public async Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint)
{
if (!(yieldPoint is DelegateYieldPoint executableYieldPoint))
diff --git a/Tapeti.Flow/Default/FlowStarter.cs b/Tapeti.Flow/Default/FlowStarter.cs
index 04b4dc9..359589a 100644
--- a/Tapeti.Flow/Default/FlowStarter.cs
+++ b/Tapeti.Flow/Default/FlowStarter.cs
@@ -47,17 +47,19 @@ namespace Tapeti.Flow.Default
var controller = config.DependencyResolver.Resolve();
var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters));
- var context = new ControllerMessageContext
+ /*
+ var context = new ControllerMessageContext()
{
Config = config,
Controller = controller
};
+ */
var flowHandler = config.DependencyResolver.Resolve();
try
{
- await flowHandler.Execute(context, yieldPoint);
+ //await flowHandler.Execute(context, yieldPoint);
//handlingResult.ConsumeResponse = ConsumeResponse.Ack;
}
finally
diff --git a/Tapeti.Flow/Default/FlowState.cs b/Tapeti.Flow/Default/FlowState.cs
index e32430c..eb7961d 100644
--- a/Tapeti.Flow/Default/FlowState.cs
+++ b/Tapeti.Flow/Default/FlowState.cs
@@ -4,20 +4,34 @@ using System.Linq;
namespace Tapeti.Flow.Default
{
+ ///
+ /// Represents the state stored for active flows.
+ ///
public class FlowState
{
private FlowMetadata metadata;
private Dictionary continuations;
+ ///
+ /// Contains metadata about the flow.
+ ///
public FlowMetadata Metadata
{
get => metadata ?? (metadata = new FlowMetadata());
set => metadata = value;
}
+
+ ///
+ /// Contains the serialized state which is restored when a flow continues.
+ ///
public string Data { get; set; }
+
+ ///
+ /// Contains metadata about continuations awaiting a response.
+ ///
public Dictionary Continuations
{
get => continuations ?? (continuations = new Dictionary());
@@ -25,6 +39,9 @@ namespace Tapeti.Flow.Default
}
+ ///
+ /// Creates a deep clone of this FlowState.
+ ///
public FlowState Clone()
{
return new FlowState {
@@ -36,11 +53,20 @@ namespace Tapeti.Flow.Default
}
+ ///
+ /// Contains metadata about the flow.
+ ///
public class FlowMetadata
{
+ ///
+ /// Contains information about the expected response for this flow.
+ ///
public ReplyMetadata Reply { get; set; }
+ ///
+ /// Creates a deep clone of this FlowMetadata.
+ ///
public FlowMetadata Clone()
{
return new FlowMetadata
@@ -51,15 +77,36 @@ namespace Tapeti.Flow.Default
}
+ ///
+ /// Contains information about the expected response for this flow.
+ ///
public class ReplyMetadata
{
+ ///
+ /// The queue to which the response should be sent.
+ ///
public string ReplyTo { get; set; }
+
+ ///
+ /// The correlation ID included in the original request.
+ ///
public string CorrelationId { get; set; }
+
+ ///
+ /// The expected response message class.
+ ///
public string ResponseTypeName { get; set; }
+ ///
+ /// Indicates whether the response should be sent a mandatory.
+ /// False for requests originating from a dynamic queue.
+ ///
public bool Mandatory { get; set; }
+ ///
+ /// Creates a deep clone of this ReplyMetadata.
+ ///
public ReplyMetadata Clone()
{
return new ReplyMetadata
@@ -73,13 +120,30 @@ namespace Tapeti.Flow.Default
}
+ ///
+ /// Contains metadata about a continuation awaiting a response.
+ ///
public class ContinuationMetadata
{
+ ///
+ /// The name of the method which will handle the response.
+ ///
public string MethodName { get; set; }
+
+ ///
+ /// The name of the method which is called when all responses have been processed.
+ ///
public string ConvergeMethodName { get; set; }
+
+ ///
+ /// Determines if the converge method is synchronous or asynchronous.
+ ///
public bool ConvergeMethodSync { get; set; }
+ ///
+ /// Creates a deep clone of this ContinuationMetadata.
+ ///
public ContinuationMetadata Clone()
{
return new ContinuationMetadata
diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs
index b98923d..6df6955 100644
--- a/Tapeti.Flow/Default/FlowStore.cs
+++ b/Tapeti.Flow/Default/FlowStore.cs
@@ -1,13 +1,16 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
-using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default
{
+ ///
+ ///
+ /// Default implementation of IFlowStore.
+ ///
public class FlowStore : IFlowStore
{
private readonly ConcurrentDictionary flowStates = new ConcurrentDictionary();
diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowExtension.cs
similarity index 75%
rename from Tapeti.Flow/FlowMiddleware.cs
rename to Tapeti.Flow/FlowExtension.cs
index a4b9cdb..05a17d2 100644
--- a/Tapeti.Flow/FlowMiddleware.cs
+++ b/Tapeti.Flow/FlowExtension.cs
@@ -4,15 +4,21 @@ using Tapeti.Flow.Default;
namespace Tapeti.Flow
{
- public class FlowMiddleware : ITapetiExtension
+ ///
+ ///
+ /// Provides the Flow middleware.
+ ///
+ public class FlowExtension : ITapetiExtension
{
private readonly IFlowRepository flowRepository;
- public FlowMiddleware(IFlowRepository flowRepository)
+ ///
+ public FlowExtension(IFlowRepository flowRepository)
{
this.flowRepository = flowRepository;
}
+ ///
public void RegisterDefaults(IDependencyContainer container)
{
container.RegisterDefault();
@@ -22,6 +28,7 @@ namespace Tapeti.Flow
container.RegisterDefaultSingleton();
}
+ ///
public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver)
{
yield return new FlowBindingMiddleware();
diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs
index b5fd107..4cbfeec 100644
--- a/Tapeti.Flow/IFlowProvider.cs
+++ b/Tapeti.Flow/IFlowProvider.cs
@@ -7,49 +7,162 @@ using Tapeti.Config;
namespace Tapeti.Flow
{
+ ///
+ /// Provides methods to build an IYieldPoint to indicate if and how Flow should continue.
+ ///
public interface IFlowProvider
{
+ ///
+ /// Publish a request message and continue the flow when the response arrives.
+ /// The request message must be marked with the [Request] attribute, and the
+ /// Response type must match. Used for asynchronous response handlers.
+ ///
+ ///
+ ///
+ ///
+ ///
IYieldPoint YieldWithRequest(TRequest message, Func> responseHandler);
- // One does not simply overload methods with Task vs non-Task Funcs. "Ambiguous call".
- // Apparantly this is because a return type of a method is not part of its signature,
- // according to: http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter
+
+ ///
+ /// Publish a request message and continue the flow when the response arrives.
+ /// The request message must be marked with the [Request] attribute, and the
+ /// Response type must match. Used for synchronous response handlers.
+ ///
+ ///
+ /// The reason why this requires the extra 'Sync' in the name: one does not simply overload methods
+ /// with Task vs non-Task Funcs. "Ambiguous call". Apparantly this is because a return type
+ /// of a method is not part of its signature,according to:
+ /// http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
IYieldPoint YieldWithRequestSync(TRequest message, Func responseHandler);
+
+ ///
+ /// Create a request builder to publish one or more requests messages. Call Yield on the resulting builder
+ /// to acquire an IYieldPoint.
+ ///
IFlowParallelRequestBuilder YieldWithParallelRequest();
+
+ ///
+ /// End the flow by publishing the specified response message. Only allowed, and required, when the
+ /// current flow was started by a message handler for a Request message.
+ ///
+ ///
+ ///
IYieldPoint EndWithResponse(TResponse message);
+
+
+ ///
+ /// End the flow and dispose any state.
+ ///
IYieldPoint End();
}
+
///
/// Allows starting a flow outside of a message handler.
///
public interface IFlowStarter
{
+ ///
+ /// Starts a new flow.
+ ///
+ ///
Task Start(Expression>> methodSelector) where TController : class;
+
+ ///
+ /// Starts a new flow.
+ ///
+ ///
Task Start(Expression>>> methodSelector) where TController : class;
+
+ ///
+ /// Starts a new flow and passes the parameter to the method.
+ ///
+ ///
+ ///
Task Start(Expression>> methodSelector, TParameter parameter) where TController : class;
+
+ ///
+ /// Starts a new flow and passes the parameter to the method.
+ ///
+ ///
+ ///
Task Start(Expression>>> methodSelector, TParameter parameter) where TController : class;
}
+
///
/// Internal interface. Do not call directly.
///
public interface IFlowHandler
{
+ ///
+ /// Executes the YieldPoint for the given message context.
+ ///
+ ///
+ ///
Task Execute(IControllerMessageContext context, IYieldPoint yieldPoint);
}
+
+ ///
+ /// Builder to publish one or more request messages and continuing the flow when the responses arrive.
+ ///
public interface IFlowParallelRequestBuilder
{
+ ///
+ /// Publish a request message and continue the flow when the response arrives.
+ /// Note that the response handler can not influence the flow as it does not return a YieldPoint.
+ /// It can instead store state in the controller for the continuation passed to the Yield method.
+ /// Used for asynchronous response handlers.
+ ///
+ ///
+ ///
IFlowParallelRequestBuilder AddRequest(TRequest message, Func responseHandler);
+
+ ///
+ /// Publish a request message and continue the flow when the response arrives.
+ /// Note that the response handler can not influence the flow as it does not return a YieldPoint.
+ /// It can instead store state in the controller for the continuation passed to the Yield method.
+ /// Used for synchronous response handlers.
+ ///
+ ///
+ ///
IFlowParallelRequestBuilder AddRequestSync(TRequest message, Action responseHandler);
+ ///
+ /// Constructs an IYieldPoint to continue the flow when responses arrive.
+ /// The continuation method is called when all responses have arrived.
+ /// Response handlers and the continuation method are guaranteed thread-safe access to the
+ /// controller and can store state.
+ /// Used for asynchronous continuation methods.
+ ///
+ ///
IYieldPoint Yield(Func> continuation);
+
+ ///
+ /// Constructs an IYieldPoint to continue the flow when responses arrive.
+ /// The continuation method is called when all responses have arrived.
+ /// Response handlers and the continuation method are guaranteed thread-safe access to the
+ /// controller and can store state.
+ /// Used for synchronous continuation methods.
+ ///
+ ///
IYieldPoint YieldSync(Func continuation);
}
+
+ ///
+ /// Defines if and how the Flow should continue. Construct using any of the IFlowProvider methods.
+ ///
public interface IYieldPoint
{
}
diff --git a/Tapeti.Flow/IFlowStore.cs b/Tapeti.Flow/IFlowStore.cs
index 17da5e8..fec1fb0 100644
--- a/Tapeti.Flow/IFlowStore.cs
+++ b/Tapeti.Flow/IFlowStore.cs
@@ -6,19 +6,57 @@ using Tapeti.Flow.Default;
namespace Tapeti.Flow
{
+ ///
+ /// Provides a way to store and load flow state.
+ ///
public interface IFlowStore
{
+ ///
+ /// Must be called during application startup before subscribing or starting a flow.
+ /// If using an IFlowRepository that requires an update (such as creating tables) make
+ /// sure it is called before calling Load.
+ ///
Task Load();
+
+ ///
+ /// Looks up the FlowID corresponding to a ContinuationID. For internal use.
+ ///
+ ///
Task FindFlowID(Guid continuationID);
+
+ ///
+ /// Acquires a lock on the flow with the specified FlowID.
+ ///
+ ///
Task LockFlowState(Guid flowID);
}
+
+ ///
+ ///
+ /// Represents a lock on the flow state, to provide thread safety.
+ ///
public interface IFlowStateLock : IDisposable
{
+ ///
+ /// The unique ID of the flow state.
+ ///
Guid FlowID { get; }
+ ///
+ /// Acquires a copy of the flow state.
+ ///
Task GetFlowState();
+
+ ///
+ /// Stores the new flow state.
+ ///
+ ///
Task StoreFlowState(FlowState flowState);
+
+ ///
+ /// Disposes of the flow state corresponding to this Flow ID.
+ ///
Task DeleteFlowState();
}
}
diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj
index a2c4094..c6b101f 100644
--- a/Tapeti.Flow/Tapeti.Flow.csproj
+++ b/Tapeti.Flow/Tapeti.Flow.csproj
@@ -6,7 +6,7 @@
- 1701;1702;1591
+ 1701;1702
diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj
index dbe7a51..c643fa4 100644
--- a/Tapeti.Serilog/Tapeti.Serilog.csproj
+++ b/Tapeti.Serilog/Tapeti.Serilog.csproj
@@ -10,7 +10,7 @@
-
+
diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
index 436911d..4804e37 100644
--- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
+++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
@@ -10,7 +10,7 @@
-
+
diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj
index 41ef4bd..e05f517 100644
--- a/Tapeti.Tests/Tapeti.Tests.csproj
+++ b/Tapeti.Tests/Tapeti.Tests.csproj
@@ -9,9 +9,12 @@
-
-
-
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers
+
diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs
index f2f3e5e..2d65410 100644
--- a/Tapeti.Transient/TransientGenericBinding.cs
+++ b/Tapeti.Transient/TransientGenericBinding.cs
@@ -29,7 +29,7 @@ namespace Tapeti.Transient
///
public async Task Apply(IBindingTarget target)
{
- QueueName = await target.BindDirectDynamic(dynamicQueuePrefix);
+ QueueName = await target.BindDynamicDirect(dynamicQueuePrefix);
router.TransientResponseQueueName = QueueName;
}
@@ -47,5 +47,12 @@ namespace Tapeti.Transient
router.HandleMessage(context);
return Task.CompletedTask;
}
+
+
+ ///
+ public Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
+ {
+ return Task.CompletedTask;
+ }
}
}
\ No newline at end of file
diff --git a/Tapeti/Config/IBinding.cs b/Tapeti/Config/IBinding.cs
index e717a62..eb204be 100644
--- a/Tapeti/Config/IBinding.cs
+++ b/Tapeti/Config/IBinding.cs
@@ -33,6 +33,15 @@ namespace Tapeti.Config
///
///
Task Invoke(IMessageContext context);
+
+
+ ///
+ /// Called after the handler is invoked and any exception handling has been done.
+ ///
+ ///
+ ///
+ ///
+ Task Cleanup(IMessageContext context, ConsumeResult consumeResult);
}
@@ -67,7 +76,7 @@ namespace Tapeti.Config
/// Used for direct-to-queue messages.
///
/// The name of the durable queue
- Task BindDirectDurable(string queueName);
+ Task BindDurableDirect(string queueName);
///
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
@@ -76,7 +85,7 @@ namespace Tapeti.Config
/// The message class which will be handled on the queue. It is not actually bound to the queue.
/// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.
/// The generated name of the dynamic queue
- Task BindDirectDynamic(Type messageClass = null, string queuePrefix = null);
+ Task BindDynamicDirect(Type messageClass = null, string queuePrefix = null);
///
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
@@ -84,6 +93,6 @@ namespace Tapeti.Config
///
/// An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.
/// The generated name of the dynamic queue
- Task BindDirectDynamic(string queuePrefix = null);
+ Task BindDynamicDirect(string queuePrefix = null);
}
}
diff --git a/Tapeti/Config/IControllerBindingContext.cs b/Tapeti/Config/IControllerBindingContext.cs
index 9877ff0..a7a63db 100644
--- a/Tapeti/Config/IControllerBindingContext.cs
+++ b/Tapeti/Config/IControllerBindingContext.cs
@@ -43,10 +43,16 @@ namespace Tapeti.Config
public interface IControllerBindingContext
{
///
- /// The message class for this method.
+ /// The message class for this method. Can be null if not yet set by the default MessageBinding or other middleware.
+ /// If required, call next first to ensure it is available.
///
Type MessageClass { get; }
+ ///
+ /// Determines if SetMessageClass has already been called.
+ ///
+ bool HasMessageClass { get; }
+
///
/// The controller class for this binding.
///
@@ -69,7 +75,7 @@ namespace Tapeti.Config
///
- /// Sets the message class for this method. Can only be called once, which is always done first by the default MessageBinding.
+ /// Sets the message class for this method. Can only be called once, which is normally done by the default MessageBinding.
///
///
void SetMessageClass(Type messageClass);
diff --git a/Tapeti/Config/IControllerCleanupMiddleware.cs b/Tapeti/Config/IControllerCleanupMiddleware.cs
index fc6c686..c089b82 100644
--- a/Tapeti/Config/IControllerCleanupMiddleware.cs
+++ b/Tapeti/Config/IControllerCleanupMiddleware.cs
@@ -3,11 +3,10 @@ using System.Threading.Tasks;
namespace Tapeti.Config
{
- ///
///
/// Denotes middleware that runs after controller methods.
///
- public interface IControllerCleanupMiddleware : IControllerMiddlewareBase
+ public interface IControllerCleanupMiddleware
{
///
/// Called after the message handler method, even if exceptions occured.
@@ -15,6 +14,6 @@ namespace Tapeti.Config
///
///
/// Always call to allow the next in the chain to clean up
- Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next);
+ Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func next);
}
}
diff --git a/Tapeti/Config/IControllerMessageContext.cs b/Tapeti/Config/IControllerMessageContext.cs
index 25108c9..3eec194 100644
--- a/Tapeti/Config/IControllerMessageContext.cs
+++ b/Tapeti/Config/IControllerMessageContext.cs
@@ -16,22 +16,5 @@
/// Provides access to the binding which is currently processing the message.
///
new IControllerMethodBinding Binding { get; }
-
-
- ///
- /// Stores a key-value pair in the context for passing information between the various
- /// controller middleware stages (IControllerMiddlewareBase descendants).
- ///
- /// A unique key. It is recommended to prefix it with the package name which hosts the middleware to prevent conflicts
- /// Will be disposed if the value implements IDisposable
- void Store(string key, object value);
-
- ///
- /// Retrieves a previously stored value.
- ///
- ///
- ///
- /// True if the value was found, False otherwise
- bool Get(string key, out T value) where T : class;
}
}
diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs
index f8a839f..c3314c4 100644
--- a/Tapeti/Config/IMessageContext.cs
+++ b/Tapeti/Config/IMessageContext.cs
@@ -42,5 +42,22 @@ namespace Tapeti.Config
/// Provides access to the binding which is currently processing the message.
///
IBinding Binding { get; }
+
+
+ ///
+ /// Stores a key-value pair in the context for passing information between the various
+ /// middleware stages (mostly for IControllerMiddlewareBase descendants).
+ ///
+ /// A unique key. It is recommended to prefix it with the package name which hosts the middleware to prevent conflicts
+ /// Will be disposed if the value implements IDisposable
+ void Store(string key, object value);
+
+ ///
+ /// Retrieves a previously stored value.
+ ///
+ ///
+ ///
+ /// True if the value was found, False otherwise
+ bool Get(string key, out T value) where T : class;
}
}
diff --git a/Tapeti/Config/ITapetiConfigBuilder.cs b/Tapeti/Config/ITapetiConfigBuilder.cs
index dab033d..51962ab 100644
--- a/Tapeti/Config/ITapetiConfigBuilder.cs
+++ b/Tapeti/Config/ITapetiConfigBuilder.cs
@@ -107,7 +107,7 @@ namespace Tapeti.Config
IDependencyResolver DependencyResolver { get; }
///
- /// Applies the currently registered binding middleware to
+ /// Applies the currently registered binding middleware to the specified context.
///
///
///
diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs
index c8b3bcd..cbe1cfb 100644
--- a/Tapeti/Connection/TapetiClient.cs
+++ b/Tapeti/Connection/TapetiClient.cs
@@ -97,7 +97,8 @@ namespace Tapeti.Connection
///
public async Task Publish(byte[] body, IMessageProperties properties, string exchange, string routingKey, bool mandatory)
{
- var publishProperties = new RabbitMQMessageProperties(new BasicProperties(), properties);
+ if (string.IsNullOrEmpty(routingKey))
+ throw new ArgumentNullException(nameof(routingKey));
await taskQueue.Value.Add(async () =>
{
@@ -131,7 +132,18 @@ namespace Tapeti.Connection
else
mandatory = false;
- channel.BasicPublish(exchange, routingKey, mandatory, publishProperties.BasicProperties, body);
+ try
+ {
+ var publishProperties = new RabbitMQMessageProperties(channel.CreateBasicProperties(), properties);
+ channel.BasicPublish(exchange ?? "", routingKey, mandatory, publishProperties.BasicProperties, body);
+ }
+ catch
+ {
+ messageInfo.CompletionSource.SetCanceled();
+ publishResultTask = null;
+
+ throw;
+ }
});
diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs
index ff3ea78..1b57bb8 100644
--- a/Tapeti/Connection/TapetiConsumer.cs
+++ b/Tapeti/Connection/TapetiConsumer.cs
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
-using System.Runtime.ExceptionServices;
using Tapeti.Config;
using Tapeti.Default;
using System.Threading.Tasks;
@@ -56,9 +55,6 @@ namespace Tapeti.Connection
}
catch (Exception dispatchException)
{
- // TODO check if this is still necessary:
- // var exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch));
-
using (var emptyContext = new MessageContext
{
Config = config,
@@ -119,15 +115,18 @@ namespace Tapeti.Connection
try
{
await MiddlewareHelper.GoAsync(config.Middleware.Message,
- (handler, next) => handler.Handle(context, next),
+ async (handler, next) => await handler.Handle(context, next),
async () => { await binding.Invoke(context); });
+ await binding.Cleanup(context, ConsumeResult.Success);
return ConsumeResult.Success;
}
catch (Exception invokeException)
{
var exceptionContext = new ExceptionStrategyContext(context, invokeException);
HandleException(exceptionContext);
+
+ await binding.Cleanup(context, exceptionContext.ConsumeResult);
return exceptionContext.ConsumeResult;
}
}
diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs
index 6013b61..b5cc87a 100644
--- a/Tapeti/Connection/TapetiSubscriber.cs
+++ b/Tapeti/Connection/TapetiSubscriber.cs
@@ -96,7 +96,7 @@ namespace Tapeti.Connection
public abstract Task BindDurable(Type messageClass, string queueName);
- public abstract Task BindDirectDurable(string queueName);
+ public abstract Task BindDurableDirect(string queueName);
public async Task BindDynamic(Type messageClass, string queuePrefix = null)
@@ -115,14 +115,14 @@ namespace Tapeti.Connection
}
- public async Task BindDirectDynamic(Type messageClass, string queuePrefix = null)
+ public async Task BindDynamicDirect(Type messageClass, string queuePrefix = null)
{
var result = await DeclareDynamicQueue(messageClass, queuePrefix);
return result.QueueName;
}
- public async Task BindDirectDynamic(string queuePrefix = null)
+ public async Task BindDynamicDirect(string queuePrefix = null)
{
// If we don't know the routing key, always create a new queue to ensure there is no overlap.
// Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either.
@@ -215,7 +215,7 @@ namespace Tapeti.Connection
}
- public override Task BindDirectDurable(string queueName)
+ public override Task BindDurableDirect(string queueName)
{
if (!durableQueues.ContainsKey(queueName))
durableQueues.Add(queueName, new List());
@@ -259,7 +259,7 @@ namespace Tapeti.Connection
await VerifyDurableQueue(queueName);
}
- public override async Task BindDirectDurable(string queueName)
+ public override async Task BindDurableDirect(string queueName)
{
await VerifyDurableQueue(queueName);
}
diff --git a/Tapeti/Default/ControllerBindingContext.cs b/Tapeti/Default/ControllerBindingContext.cs
new file mode 100644
index 0000000..9f7b4c2
--- /dev/null
+++ b/Tapeti/Default/ControllerBindingContext.cs
@@ -0,0 +1,179 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using Tapeti.Config;
+
+namespace Tapeti.Default
+{
+ ///
+ ///
+ /// Default implementation for IControllerBindingContext
+ ///
+ public class ControllerBindingContext : IControllerBindingContext
+ {
+ private BindingTargetMode? bindingTargetMode;
+ private readonly List middleware = new List();
+ private readonly List parameters;
+ private readonly ControllerBindingResult result;
+
+ ///
+ /// Determines how the binding target is configured.
+ ///
+ public BindingTargetMode BindingTargetMode => bindingTargetMode ?? BindingTargetMode.Default;
+
+
+ ///
+ /// Provides access to the registered middleware for this method.
+ ///
+ public IReadOnlyList Middleware => middleware;
+
+
+ ///
+ public Type MessageClass { get; set; }
+
+ ///
+ public bool HasMessageClass => MessageClass != null;
+
+ ///
+ public Type Controller { get; set; }
+
+ ///
+ public MethodInfo Method { get; set; }
+
+ ///
+ public IReadOnlyList Parameters => parameters;
+
+ ///
+ public IBindingResult Result => result;
+
+
+ ///
+ public ControllerBindingContext(IEnumerable parameters, ParameterInfo result)
+ {
+ this.parameters = parameters.Select(parameter => new ControllerBindingParameter(parameter)).ToList();
+
+ this.result = new ControllerBindingResult(result);
+ }
+
+
+ ///
+ public void SetMessageClass(Type messageClass)
+ {
+ if (HasMessageClass)
+ throw new InvalidOperationException("SetMessageClass can only be called once");
+
+ MessageClass = messageClass;
+ }
+
+
+ ///
+ public void SetBindingTargetMode(BindingTargetMode mode)
+ {
+ if (bindingTargetMode.HasValue)
+ throw new InvalidOperationException("SetBindingTargetMode can only be called once");
+
+ bindingTargetMode = mode;
+ }
+
+
+ ///
+ public void Use(IControllerMiddlewareBase handler)
+ {
+ middleware.Add(handler);
+ }
+
+
+ ///
+ /// Returns the configured bindings for the parameters.
+ ///
+ public IEnumerable GetParameterHandlers()
+ {
+ return parameters.Select(p => p.Binding);
+ }
+
+
+ ///
+ /// Returns the configured result handler.
+ ///
+ ///
+ public ResultHandler GetResultHandler()
+ {
+ return result.Handler;
+ }
+ }
+
+
+ ///
+ ///
+ /// Default implementation for IBindingParameter
+ ///
+ public class ControllerBindingParameter : IBindingParameter
+ {
+ ///
+ /// Provides access to the configured binding.
+ ///
+ public ValueFactory Binding { get; set; }
+
+
+ ///
+ public ParameterInfo Info { get; }
+
+ ///
+ public bool HasBinding => Binding != null;
+
+
+ ///
+ public ControllerBindingParameter(ParameterInfo info)
+ {
+ Info = info;
+ }
+
+
+ ///
+ public void SetBinding(ValueFactory valueFactory)
+ {
+ if (Binding != null)
+ throw new InvalidOperationException("SetBinding can only be called once");
+
+ Binding = valueFactory;
+ }
+ }
+
+
+ ///
+ ///
+ /// Default implementation for IBindingResult
+ ///
+ public class ControllerBindingResult : IBindingResult
+ {
+ ///
+ /// Provides access to the configured handler.
+ ///
+ public ResultHandler Handler { get; set; }
+
+
+ ///
+ public ParameterInfo Info { get; }
+
+ ///
+ public bool HasHandler => Handler != null;
+
+
+ ///
+ public ControllerBindingResult(ParameterInfo info)
+ {
+ Info = info;
+ }
+
+
+ ///
+ public void SetHandler(ResultHandler resultHandler)
+ {
+ if (Handler != null)
+ throw new InvalidOperationException("SetHandler can only be called once");
+
+ Handler = resultHandler;
+ }
+ }
+}
diff --git a/Tapeti/Default/ControllerMessageContext.cs b/Tapeti/Default/ControllerMessageContext.cs
index 4fc410a..8aaddc6 100644
--- a/Tapeti/Default/ControllerMessageContext.cs
+++ b/Tapeti/Default/ControllerMessageContext.cs
@@ -5,46 +5,60 @@ using Tapeti.Config;
namespace Tapeti.Default
{
///
- public class ControllerMessageContext : MessageContext, IControllerMessageContext
+ public class ControllerMessageContext : IControllerMessageContext
{
- private readonly Dictionary items = new Dictionary();
-
+ private readonly IMessageContext decoratedContext;
///
public object Controller { get; set; }
///
- public new IControllerMethodBinding Binding { get; set; }
+ public ITapetiConfig Config => decoratedContext.Config;
+
+ ///
+ public string Queue => decoratedContext.Queue;
+
+ ///
+ public string Exchange => decoratedContext.Exchange;
+
+ ///
+ public string RoutingKey => decoratedContext.RoutingKey;
+
+ ///
+ public object Message => decoratedContext.Message;
+
+ ///
+ public IMessageProperties Properties => decoratedContext.Properties;
+
+
+ IBinding IMessageContext.Binding => decoratedContext.Binding;
+ IControllerMethodBinding IControllerMessageContext.Binding => decoratedContext.Binding as IControllerMethodBinding;
///
- public override void Dispose()
+ public ControllerMessageContext(IMessageContext decoratedContext)
{
- foreach (var item in items.Values)
- (item as IDisposable)?.Dispose();
+ this.decoratedContext = decoratedContext;
+ }
- base.Dispose();
+
+ ///
+ public void Dispose()
+ {
}
///
public void Store(string key, object value)
{
- items.Add(key, value);
+ decoratedContext.Store(key, value);
}
///
public bool Get(string key, out T value) where T : class
{
- if (!items.TryGetValue(key, out var objectValue))
- {
- value = default(T);
- return false;
- }
-
- value = (T)objectValue;
- return true;
+ return decoratedContext.Get(key, out value);
}
}
}
diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs
index b2b8986..54eb163 100644
--- a/Tapeti/Default/ControllerMethodBinding.cs
+++ b/Tapeti/Default/ControllerMethodBinding.cs
@@ -1,7 +1,10 @@
using System;
+using System.Collections.Generic;
+using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Tapeti.Config;
+using Tapeti.Helpers;
namespace Tapeti.Default
{
@@ -13,9 +16,68 @@ namespace Tapeti.Default
///
public class ControllerMethodBinding : IBinding
{
- private readonly Type controller;
- private readonly MethodInfo method;
- private readonly QueueInfo queueInfo;
+ ///
+ /// Contains all the required information to bind a controller method to a queue.
+ ///
+ public struct BindingInfo
+ {
+ ///
+ /// The controller type associated with this binding.
+ ///
+ public Type ControllerType;
+
+ ///
+ /// The method called when this binding is invoked.
+ ///
+ public MethodInfo Method;
+
+ ///
+ /// The queue this binding consumes.
+ ///
+ public QueueInfo QueueInfo;
+
+ ///
+ /// The message class handled by this binding's method.
+ ///
+ public Type MessageClass;
+
+ ///
+ /// Indicates whether this method accepts messages to the exchange by routing key, or direct-to-queue only.
+ ///
+ public BindingTargetMode BindingTargetMode;
+
+ ///
+ /// Value factories for the method parameters.
+ ///
+ public IEnumerable ParameterFactories;
+
+ ///
+ /// The return value handler.
+ ///
+ public ResultHandler ResultHandler;
+
+
+ ///
+ /// Filter middleware as registered by the binding middleware.
+ ///
+ public IReadOnlyList FilterMiddleware;
+
+ ///
+ /// Message middleware as registered by the binding middleware.
+ ///
+ public IReadOnlyList MessageMiddleware;
+
+ ///
+ /// Cleanup middleware as registered by the binding middleware.
+ ///
+ public IReadOnlyList CleanupMiddleware;
+ }
+
+
+ private readonly IDependencyResolver dependencyResolver;
+ private readonly BindingInfo bindingInfo;
+
+ private readonly MessageHandlerFunc messageHandler;
///
@@ -23,37 +85,170 @@ namespace Tapeti.Default
///
- public ControllerMethodBinding(Type controller, MethodInfo method, QueueInfo queueInfo)
+ public ControllerMethodBinding(IDependencyResolver dependencyResolver, BindingInfo bindingInfo)
{
- this.controller = controller;
- this.method = method;
- this.queueInfo = queueInfo;
+ this.dependencyResolver = dependencyResolver;
+ this.bindingInfo = bindingInfo;
+
+ messageHandler = WrapMethod(bindingInfo.Method, bindingInfo.ParameterFactories, bindingInfo.ResultHandler);
}
///
- public Task Apply(IBindingTarget target)
+ public async Task Apply(IBindingTarget target)
{
- // TODO ControllerMethodBinding
- throw new NotImplementedException();
+ switch (bindingInfo.BindingTargetMode)
+ {
+ case BindingTargetMode.Default:
+ if (bindingInfo.QueueInfo.Dynamic)
+ QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
+ else
+ {
+ await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
+ QueueName = bindingInfo.QueueInfo.Name;
+ }
+
+ break;
+
+ case BindingTargetMode.Direct:
+ if (bindingInfo.QueueInfo.Dynamic)
+ QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name);
+ else
+ {
+ await target.BindDurableDirect(bindingInfo.QueueInfo.Name);
+ QueueName = bindingInfo.QueueInfo.Name;
+ }
+
+ break;
+
+ default:
+ throw new ArgumentOutOfRangeException(nameof(bindingInfo.BindingTargetMode), bindingInfo.BindingTargetMode, "Invalid BindingTargetMode");
+ }
}
///
public bool Accept(Type messageClass)
{
- throw new NotImplementedException();
+ return messageClass == bindingInfo.MessageClass;
}
+
///
- public Task Invoke(IMessageContext context)
+ public async Task Invoke(IMessageContext context)
{
- throw new NotImplementedException();
+ var controller = dependencyResolver.Resolve(bindingInfo.ControllerType);
+
+ using (var controllerContext = new ControllerMessageContext(context)
+ {
+ Controller = controller
+ })
+ {
+ if (!await FilterAllowed(controllerContext))
+ return;
+
+
+ await MiddlewareHelper.GoAsync(
+ bindingInfo.MessageMiddleware,
+ async (handler, next) => await handler.Handle(controllerContext, next),
+ async () => await messageHandler(controllerContext));
+ }
}
+ ///
+ public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
+ {
+ await MiddlewareHelper.GoAsync(
+ bindingInfo.CleanupMiddleware,
+ async (handler, next) => await handler.Cleanup(context, ConsumeResult.Success, next),
+ () => Task.CompletedTask);
+ }
+
+
+ private async Task FilterAllowed(IControllerMessageContext context)
+ {
+ var allowed = false;
+ await MiddlewareHelper.GoAsync(
+ bindingInfo.FilterMiddleware,
+ async (handler, next) => await handler.Filter(context, next),
+ () =>
+ {
+ allowed = true;
+ return Task.CompletedTask;
+ });
+
+ return allowed;
+ }
+
+
+ private delegate Task MessageHandlerFunc(IControllerMessageContext context);
+
+
+ private static MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable parameterFactories, ResultHandler resultHandler)
+ {
+ if (resultHandler != null)
+ return WrapResultHandlerMethod(method, parameterFactories, resultHandler);
+
+ if (method.ReturnType == typeof(void))
+ return WrapNullMethod(method, parameterFactories);
+
+ if (method.ReturnType == typeof(Task))
+ return WrapTaskMethod(method, parameterFactories);
+
+ if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
+ return WrapGenericTaskMethod(method, parameterFactories);
+
+ return WrapObjectMethod(method, parameterFactories);
+ }
+
+
+ private static MessageHandlerFunc WrapResultHandlerMethod(MethodBase method, IEnumerable parameterFactories, ResultHandler resultHandler)
+ {
+ return context =>
+ {
+ var result = method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
+ return resultHandler(context, result);
+ };
+ }
+
+ private static MessageHandlerFunc WrapNullMethod(MethodBase method, IEnumerable parameterFactories)
+ {
+ return context =>
+ {
+ method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
+ return Task.CompletedTask;
+ };
+ }
+
+
+ private static MessageHandlerFunc WrapTaskMethod(MethodBase method, IEnumerable parameterFactories)
+ {
+ return context => (Task)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
+ }
+
+
+ private static MessageHandlerFunc WrapGenericTaskMethod(MethodBase method, IEnumerable parameterFactories)
+ {
+ return context =>
+ {
+ return (Task)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
+ };
+ }
+
+
+ private static MessageHandlerFunc WrapObjectMethod(MethodBase method, IEnumerable parameterFactories)
+ {
+ return context =>
+ {
+ return Task.FromResult(method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()));
+ };
+ }
+
+
+
///
- ///
+ /// Contains information about the queue linked to the controller method.
///
public class QueueInfo
{
diff --git a/Tapeti/Default/MessageBinding.cs b/Tapeti/Default/MessageBinding.cs
index 34ad212..2265a88 100644
--- a/Tapeti/Default/MessageBinding.cs
+++ b/Tapeti/Default/MessageBinding.cs
@@ -13,15 +13,18 @@ namespace Tapeti.Default
///
public void Handle(IControllerBindingContext context, Action next)
{
- if (context.Parameters.Count == 0)
- throw new TopologyConfigurationException($"First parameter of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class");
+ if (!context.HasMessageClass)
+ {
+ if (context.Parameters.Count == 0)
+ throw new TopologyConfigurationException($"First parameter of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class");
- var parameter = context.Parameters[0];
- if (!parameter.Info.ParameterType.IsClass)
- throw new TopologyConfigurationException($"First parameter {parameter.Info.Name} of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class");
+ var parameter = context.Parameters[0];
+ if (!parameter.Info.ParameterType.IsClass)
+ throw new TopologyConfigurationException($"First parameter {parameter.Info.Name} of method {context.Method.Name} in controller {context.Method.DeclaringType?.Name} must be a message class");
- parameter.SetBinding(messageContext => messageContext.Message);
- context.SetMessageClass(parameter.Info.ParameterType);
+ parameter.SetBinding(messageContext => messageContext.Message);
+ context.SetMessageClass(parameter.Info.ParameterType);
+ }
next();
}
diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs
index 8f0b8b6..584de53 100644
--- a/Tapeti/Default/MessageContext.cs
+++ b/Tapeti/Default/MessageContext.cs
@@ -1,10 +1,15 @@
-using Tapeti.Config;
+using System;
+using System.Collections.Generic;
+using Tapeti.Config;
namespace Tapeti.Default
{
///
public class MessageContext : IMessageContext
{
+ private readonly Dictionary items = new Dictionary();
+
+
///
public ITapetiConfig Config { get; set; }
@@ -26,9 +31,33 @@ namespace Tapeti.Default
///
public IBinding Binding { get; set; }
+
///
- public virtual void Dispose()
+ public void Dispose()
{
+ foreach (var item in items.Values)
+ (item as IDisposable)?.Dispose();
+ }
+
+
+ ///
+ public void Store(string key, object value)
+ {
+ items.Add(key, value);
+ }
+
+
+ ///
+ public bool Get(string key, out T value) where T : class
+ {
+ if (!items.TryGetValue(key, out var objectValue))
+ {
+ value = default(T);
+ return false;
+ }
+
+ value = (T)objectValue;
+ return true;
}
}
}
diff --git a/Tapeti/Default/RabbitMQMessageProperties.cs b/Tapeti/Default/RabbitMQMessageProperties.cs
index c560e56..b58836d 100644
--- a/Tapeti/Default/RabbitMQMessageProperties.cs
+++ b/Tapeti/Default/RabbitMQMessageProperties.cs
@@ -61,16 +61,16 @@ namespace Tapeti.Default
///
- public RabbitMQMessageProperties(IBasicProperties BasicProperties)
+ public RabbitMQMessageProperties(IBasicProperties basicProperties)
{
- this.BasicProperties = BasicProperties;
+ BasicProperties = basicProperties;
}
///
- public RabbitMQMessageProperties(IBasicProperties BasicProperties, IMessageProperties source)
+ public RabbitMQMessageProperties(IBasicProperties basicProperties, IMessageProperties source)
{
- this.BasicProperties = BasicProperties;
+ BasicProperties = basicProperties;
if (source == null)
return;
diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj
index 95868f0..d115af9 100644
--- a/Tapeti/Tapeti.csproj
+++ b/Tapeti/Tapeti.csproj
@@ -10,8 +10,8 @@
-
-
+
+
diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs
index 9247f11..7a542a1 100644
--- a/Tapeti/TapetiConfig.cs
+++ b/Tapeti/TapetiConfig.cs
@@ -32,6 +32,12 @@ namespace Tapeti
public TapetiConfig(IDependencyResolver dependencyResolver)
{
config = new Config(dependencyResolver);
+
+ Use(new DependencyResolverBinding());
+ Use(new PublishResultBinding());
+
+ // Registered last so it runs first and the MessageClass is known to other middleware
+ Use(new MessageBinding());
}
@@ -41,12 +47,6 @@ namespace Tapeti
if (config == null)
throw new InvalidOperationException("TapetiConfig.Build must only be called once");
- Use(new DependencyResolverBinding());
- Use(new PublishResultBinding());
-
- // Registered last so it runs first and the MessageClass is known to other middleware
- Use(new MessageBinding());
-
RegisterDefaults();
(config.DependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config);
@@ -201,7 +201,7 @@ namespace Tapeti
if (config == null)
throw new InvalidOperationException("TapetiConfig can not be updated after Build");
- return null;
+ return config;
}
@@ -308,130 +308,4 @@ namespace Tapeti
}
}
}
-
-
- /*
- public delegate Task MessageHandlerFunc(IMessageContext context, object message);
-
-
- protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method)
- {
- var allowBinding= false;
-
- MiddlewareHelper.Go(bindingMiddleware,
- (handler, next) => handler.Handle(context, next),
- () =>
- {
- allowBinding = true;
- });
-
- if (!allowBinding)
- return null;
-
- if (context.MessageClass == null)
- throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} does not resolve to a message class");
-
-
- var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList();
-
- // ReSharper disable once InvertIf
- if (invalidBindings.Count > 0)
- {
- var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name));
- throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}");
- }
-
- var resultHandler = ((IBindingResultAccess) context.Result).GetHandler();
-
- return WrapMethod(method, context.Parameters.Select(p => ((IBindingParameterAccess)p).GetBinding()), resultHandler);
- }
-
-
- protected MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable parameters, ResultHandler resultHandler)
- {
- if (resultHandler != null)
- return WrapResultHandlerMethod(method, parameters, resultHandler);
-
- if (method.ReturnType == typeof(void))
- return WrapNullMethod(method, parameters);
-
- if (method.ReturnType == typeof(Task))
- return WrapTaskMethod(method, parameters);
-
- if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
- return WrapGenericTaskMethod(method, parameters);
-
- return WrapObjectMethod(method, parameters);
- }
-
-
- protected MessageHandlerFunc WrapResultHandlerMethod(MethodInfo method, IEnumerable parameters, ResultHandler resultHandler)
- {
- return (context, message) =>
- {
- var result = method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
- return resultHandler(context, result);
- };
- }
-
- protected MessageHandlerFunc WrapNullMethod(MethodInfo method, IEnumerable parameters)
- {
- return (context, message) =>
- {
- method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
- return Task.CompletedTask;
- };
- }
-
-
- protected MessageHandlerFunc WrapTaskMethod(MethodInfo method, IEnumerable parameters)
- {
- return (context, message) => (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
- }
-
-
- protected MessageHandlerFunc WrapGenericTaskMethod(MethodInfo method, IEnumerable parameters)
- {
- return (context, message) =>
- {
- return (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
- };
- }
-
-
- protected MessageHandlerFunc WrapObjectMethod(MethodInfo method, IEnumerable parameters)
- {
- return (context, message) =>
- {
- return Task.FromResult(method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()));
- };
- }
-
-
- protected void AddStaticRegistration(IBindingQueueInfo binding)
- {
- if (staticRegistrations.ContainsKey(binding.QueueInfo.Name))
- {
- var existing = staticRegistrations[binding.QueueInfo.Name];
-
- // TODO allow multiple only if there is a filter which guarantees uniqueness? and/or move to independant validation middleware
- //if (existing.Any(h => h.MessageClass == binding.MessageClass))
- // throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}");
-
- existing.Add(binding);
- }
- else
- staticRegistrations.Add(binding.QueueInfo.Name, new List { binding });
- }
-
-
- protected string GetDynamicQueueName(string prefix)
- {
- if (String.IsNullOrEmpty(prefix))
- return "";
-
- return prefix + "." + Guid.NewGuid().ToString("N");
- }
- }
- */
}
diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs
index dd41f2b..399a154 100644
--- a/Tapeti/TapetiConfigControllers.cs
+++ b/Tapeti/TapetiConfigControllers.cs
@@ -1,4 +1,5 @@
using System;
+using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using Tapeti.Annotations;
@@ -44,40 +45,53 @@ namespace Tapeti
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false)
.Select(m => (MethodInfo)m))
{
- // TODO create binding for method
-
- /*
- var context = new BindingContext(method);
- var messageHandler = GetMessageHandler(context, method);
- if (messageHandler == null)
- continue;
- */
-
var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo;
if (methodQueueInfo == null || !methodQueueInfo.IsValid)
throw new TopologyConfigurationException(
$"Method {method.Name} or controller {controller.Name} requires a queue attribute");
- /*
- var handlerInfo = new Binding
+
+ var context = new ControllerBindingContext(method.GetParameters(), method.ReturnParameter)
{
Controller = controller,
- Method = method,
- QueueInfo = methodQueueInfo,
- QueueBindingMode = context.QueueBindingMode,
- MessageClass = context.MessageClass,
- MessageHandler = messageHandler,
- MessageMiddleware = context.MessageMiddleware,
- MessageFilterMiddleware = context.MessageFilterMiddleware
+ Method = method
};
- if (methodQueueInfo.Dynamic.GetValueOrDefault())
- AddDynamicRegistration(handlerInfo);
- else
- AddStaticRegistration(handlerInfo);
- */
- builder.RegisterBinding(new ControllerMethodBinding(controller, method, methodQueueInfo));
+ var allowBinding = false;
+ builderAccess.ApplyBindingMiddleware(context, () => { allowBinding = true; });
+
+ if (!allowBinding)
+ continue;
+
+
+ if (context.MessageClass == null)
+ throw new TopologyConfigurationException($"Method {method.Name} in controller {controller.Name} does not resolve to a message class");
+
+
+ var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList();
+ if (invalidBindings.Count > 0)
+ {
+ var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name));
+ throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}");
+ }
+
+
+ builder.RegisterBinding(new ControllerMethodBinding(builderAccess.DependencyResolver, new ControllerMethodBinding.BindingInfo
+ {
+ ControllerType = controller,
+ Method = method,
+ QueueInfo = methodQueueInfo,
+ MessageClass = context.MessageClass,
+ BindingTargetMode = context.BindingTargetMode,
+ ParameterFactories = context.GetParameterHandlers(),
+ ResultHandler = context.GetResultHandler(),
+
+ FilterMiddleware = context.Middleware.Where(m => m is IControllerFilterMiddleware).Cast().ToList(),
+ MessageMiddleware = context.Middleware.Where(m => m is IControllerMessageMiddleware).Cast().ToList(),
+ CleanupMiddleware = context.Middleware.Where(m => m is IControllerCleanupMiddleware).Cast().ToList()
+ }));
+
}
return builder;
diff --git a/Test/FlowEndController.cs b/Test/FlowEndController.cs
index 5c2f450..ad72d4f 100644
--- a/Test/FlowEndController.cs
+++ b/Test/FlowEndController.cs
@@ -18,6 +18,8 @@ namespace Test
this.flowProvider = flowProvider;
}
+
+ [Start]
public IYieldPoint StartFlow(PingMessage message)
{
Console.WriteLine("PingMessage received, calling flowProvider.End() directly");
diff --git a/Test/Program.cs b/Test/Program.cs
index 3969269..ccca437 100644
--- a/Test/Program.cs
+++ b/Test/Program.cs
@@ -1,12 +1,10 @@
using System;
-using System.Runtime.CompilerServices;
using SimpleInjector;
using Tapeti;
using Tapeti.DataAnnotations;
using Tapeti.Flow;
using Tapeti.SimpleInjector;
using System.Threading;
-using Tapeti.Annotations;
using Tapeti.Transient;
namespace Test