diff --git a/Examples/01-PublishSubscribe/ExampleMessageController.cs b/Examples/01-PublishSubscribe/ExampleMessageController.cs
index 6cce0a1..eeb7581 100644
--- a/Examples/01-PublishSubscribe/ExampleMessageController.cs
+++ b/Examples/01-PublishSubscribe/ExampleMessageController.cs
@@ -1,7 +1,7 @@
using System;
using ExampleLib;
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
namespace _01_PublishSubscribe
{
diff --git a/Examples/02-DeclareDurableQueues/ExampleMessageController.cs b/Examples/02-DeclareDurableQueues/ExampleMessageController.cs
index ee5266e..d091a7e 100644
--- a/Examples/02-DeclareDurableQueues/ExampleMessageController.cs
+++ b/Examples/02-DeclareDurableQueues/ExampleMessageController.cs
@@ -1,7 +1,7 @@
using System;
using ExampleLib;
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
namespace _02_DeclareDurableQueues
{
diff --git a/Examples/03-FlowRequestResponse/ParallelFlowController.cs b/Examples/03-FlowRequestResponse/ParallelFlowController.cs
index 00f9687..34fe2c4 100644
--- a/Examples/03-FlowRequestResponse/ParallelFlowController.cs
+++ b/Examples/03-FlowRequestResponse/ParallelFlowController.cs
@@ -2,7 +2,7 @@
using System.Threading.Tasks;
using ExampleLib;
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
using Tapeti.Flow;
using Tapeti.Flow.Annotations;
diff --git a/Examples/03-FlowRequestResponse/ReceivingMessageController.cs b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs
index 97c498c..11d0e16 100644
--- a/Examples/03-FlowRequestResponse/ReceivingMessageController.cs
+++ b/Examples/03-FlowRequestResponse/ReceivingMessageController.cs
@@ -1,6 +1,6 @@
using System.Threading.Tasks;
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
namespace _03_FlowRequestResponse
{
diff --git a/Examples/03-FlowRequestResponse/SimpleFlowController.cs b/Examples/03-FlowRequestResponse/SimpleFlowController.cs
index e093ccd..0ea6aff 100644
--- a/Examples/03-FlowRequestResponse/SimpleFlowController.cs
+++ b/Examples/03-FlowRequestResponse/SimpleFlowController.cs
@@ -1,7 +1,7 @@
using System;
using ExampleLib;
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
using Tapeti.Flow;
using Tapeti.Flow.Annotations;
diff --git a/Examples/04-Transient/UsersMessageController.cs b/Examples/04-Transient/UsersMessageController.cs
index 5565c49..a5b147f 100644
--- a/Examples/04-Transient/UsersMessageController.cs
+++ b/Examples/04-Transient/UsersMessageController.cs
@@ -1,7 +1,7 @@
using System;
using System.Threading.Tasks;
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
namespace _04_Transient
{
diff --git a/Examples/05-SpeedTest/SpeedMessageController.cs b/Examples/05-SpeedTest/SpeedMessageController.cs
index b0e5386..8ff04e7 100644
--- a/Examples/05-SpeedTest/SpeedMessageController.cs
+++ b/Examples/05-SpeedTest/SpeedMessageController.cs
@@ -1,5 +1,5 @@
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
namespace _05_SpeedTest
{
diff --git a/Examples/06-StatelessRequestResponse/ExampleMessageController.cs b/Examples/06-StatelessRequestResponse/ExampleMessageController.cs
index bc908ab..6f08afd 100644
--- a/Examples/06-StatelessRequestResponse/ExampleMessageController.cs
+++ b/Examples/06-StatelessRequestResponse/ExampleMessageController.cs
@@ -1,7 +1,7 @@
using System;
using ExampleLib;
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
namespace _06_StatelessRequestResponse
{
diff --git a/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs b/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs
index 4a2704b..9257a68 100644
--- a/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs
+++ b/Examples/06-StatelessRequestResponse/ReceivingMessageController.cs
@@ -1,5 +1,5 @@
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
namespace _06_StatelessRequestResponse
{
diff --git a/Examples/07-ParallelizationTest/ParallelizationMessageController.cs b/Examples/07-ParallelizationTest/ParallelizationMessageController.cs
index 209ba39..bc6c06b 100644
--- a/Examples/07-ParallelizationTest/ParallelizationMessageController.cs
+++ b/Examples/07-ParallelizationTest/ParallelizationMessageController.cs
@@ -1,6 +1,6 @@
using System.Threading.Tasks;
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
namespace _07_ParallelizationTest
{
diff --git a/Examples/08-MessageHandlerLogging/SlowMessageController.cs b/Examples/08-MessageHandlerLogging/SlowMessageController.cs
index 33695b2..93be67d 100644
--- a/Examples/08-MessageHandlerLogging/SlowMessageController.cs
+++ b/Examples/08-MessageHandlerLogging/SlowMessageController.cs
@@ -2,7 +2,7 @@
using System.Threading.Tasks;
using ExampleLib;
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
namespace _08_MessageHandlerLogging
{
diff --git a/Examples/08-MessageHandlerLogging/SpeedyMessageController.cs b/Examples/08-MessageHandlerLogging/SpeedyMessageController.cs
index 0409d10..f021c41 100644
--- a/Examples/08-MessageHandlerLogging/SpeedyMessageController.cs
+++ b/Examples/08-MessageHandlerLogging/SpeedyMessageController.cs
@@ -1,6 +1,6 @@
using System;
using Messaging.TapetiExample;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
using Tapeti.Serilog;
namespace _08_MessageHandlerLogging
diff --git a/Tapeti.Autofac/Tapeti.Autofac.csproj b/Tapeti.Autofac/Tapeti.Autofac.csproj
index 10e4b50..e6664b3 100644
--- a/Tapeti.Autofac/Tapeti.Autofac.csproj
+++ b/Tapeti.Autofac/Tapeti.Autofac.csproj
@@ -11,7 +11,6 @@
https://github.com/MvRens/Tapeti
Tapeti.SimpleInjector.png
2.0.0
- 9
enable
diff --git a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj
index bd1af38..892f497 100644
--- a/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj
+++ b/Tapeti.CastleWindsor/Tapeti.CastleWindsor.csproj
@@ -11,7 +11,6 @@
https://github.com/MvRens/Tapeti
Tapeti.SimpleInjector.png
2.0.0
- 9
enable
diff --git a/Tapeti.CastleWindsor/WindsorDependencyResolver.cs b/Tapeti.CastleWindsor/WindsorDependencyResolver.cs
index 95d5f5e..46018bf 100644
--- a/Tapeti.CastleWindsor/WindsorDependencyResolver.cs
+++ b/Tapeti.CastleWindsor/WindsorDependencyResolver.cs
@@ -1,12 +1,14 @@
using System;
using Castle.MicroKernel.Registration;
using Castle.Windsor;
+using JetBrains.Annotations;
namespace Tapeti.CastleWindsor
{
///
/// Dependency resolver and container implementation for Castle Windsor.
///
+ [PublicAPI]
public class WindsorDependencyResolver : IDependencyContainer
{
private readonly IWindsorContainer container;
diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj
index 713493f..276f6ba 100644
--- a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj
+++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj
@@ -11,7 +11,6 @@
https://github.com/MvRens/Tapeti
Tapeti.DataAnnotations.png
2.0.0
- 9
enable
diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
index 9650958..db7eff7 100644
--- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
+++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
@@ -1,4 +1,4 @@
-
+
net6.0;net7.0
@@ -11,7 +11,6 @@
https://github.com/MvRens/Tapeti
Tapeti.Flow.SQL.png
2.0.0
- 9
enable
@@ -19,11 +18,6 @@
1701;1702
-
-
- IDE0063
-
-
diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs
index c501feb..6f6a887 100644
--- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs
+++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs
@@ -154,7 +154,9 @@ namespace Tapeti.Flow.Default
var flowHandler = context.Config.DependencyResolver.Resolve();
return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext =>
{
- await flowContext.Store(context.Binding.QueueType == QueueType.Durable);
+ // IFlowParallelRequest.AddRequest will store the flow immediately
+ if (!flowPayload.FlowContext.IsStoredOrDeleted())
+ await flowContext.Store(context.Binding.QueueType == QueueType.Durable);
}));
}
diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs
index 51098d3..ebd4fc6 100644
--- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs
+++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs
@@ -1,7 +1,7 @@
using System;
using System.Threading.Tasks;
using Tapeti.Config;
-using Tapeti.Flow.FlowHelpers;
+using Tapeti.Helpers;
namespace Tapeti.Flow.Default
{
diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs
index 91ffb39..62e3176 100644
--- a/Tapeti.Flow/Default/FlowProvider.cs
+++ b/Tapeti.Flow/Default/FlowProvider.cs
@@ -1,4 +1,4 @@
-using System;
+using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
@@ -8,7 +8,7 @@ using Tapeti.Annotations;
using Tapeti.Config;
using Tapeti.Default;
using Tapeti.Flow.Annotations;
-using Tapeti.Flow.FlowHelpers;
+using Tapeti.Helpers;
namespace Tapeti.Flow.Default
{
@@ -55,7 +55,7 @@ namespace Tapeti.Flow.Default
///
public IFlowParallelRequestBuilder YieldWithParallelRequest()
{
- return new ParallelRequestBuilder(config, this);
+ return new ParallelRequestBuilder(config, this, publisher);
}
///
@@ -71,8 +71,8 @@ namespace Tapeti.Flow.Default
}
- internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
- string? convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true)
+ internal async Task PrepareRequest(FlowContext context, ResponseHandlerInfo responseHandlerInfo,
+ string convergeMethodName = null, bool convergeMethodTaskSync = false)
{
if (!context.HasFlowStateAndLock)
{
@@ -96,8 +96,15 @@ namespace Tapeti.Flow.Default
ReplyTo = responseHandlerInfo.ReplyToQueue
};
- if (store)
- await context.Store(responseHandlerInfo.IsDurableQueue);
+ return properties;
+ }
+
+
+ internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
+ string convergeMethodName = null, bool convergeMethodTaskSync = false)
+ {
+ var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync);
+ await context.Store(responseHandlerInfo.IsDurableQueue);
await publisher.Publish(message, properties, true);
}
@@ -134,7 +141,7 @@ namespace Tapeti.Flow.Default
{
await context.Delete();
- if (context.HasFlowStateAndLock && context.FlowState.Metadata.Reply != null)
+ if (context is { HasFlowStateAndLock: true, FlowState.Metadata.Reply: { } })
throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}");
}
@@ -200,7 +207,7 @@ namespace Tapeti.Flow.Default
flowContext.SetFlowState(flowState, flowStateLock);
}
-
+
///
public async ValueTask Execute(IFlowHandlerContext context, IYieldPoint yieldPoint)
{
@@ -222,7 +229,7 @@ namespace Tapeti.Flow.Default
}
else
flowContext = flowPayload.FlowContext;
-
+
try
{
await executableYieldPoint.Execute(flowContext);
@@ -327,13 +334,15 @@ namespace Tapeti.Flow.Default
private readonly ITapetiConfig config;
private readonly FlowProvider flowProvider;
+ private readonly IInternalPublisher publisher;
private readonly List requests = new();
- public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider)
+ public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider, IInternalPublisher publisher)
{
this.config = config;
this.flowProvider = flowProvider;
+ this.publisher = publisher;
}
@@ -407,18 +416,21 @@ namespace Tapeti.Flow.Default
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller?.GetType())
throw new YieldPointException("Converge method must be in the same controller class");
+ var preparedRequests = new List();
+
foreach (var requestInfo in requests)
{
- await flowProvider.SendRequest(
+ var properties = await flowProvider.PrepareRequest(
context,
- requestInfo.Message,
requestInfo.ResponseHandlerInfo,
convergeMethod.Method.Name,
- convergeMethodSync,
- false);
+ convergeMethodSync);
+
+ preparedRequests.Add(new PreparedRequest(requestInfo.Message, properties));
}
await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue));
+ await Task.WhenAll(preparedRequests.Select(r => publisher.Publish(r.Message, r.Properties, true)));
});
}
}
@@ -465,12 +477,11 @@ namespace Tapeti.Flow.Default
throw new InvalidOperationException("No ContinuationMetadata in FlowContext");
return flowProvider.SendRequest(
- flowContext,
- message,
- responseHandlerInfo,
- flowContext.ContinuationMetadata.ConvergeMethodName,
- flowContext.ContinuationMetadata.ConvergeMethodSync,
- false);
+ flowContext,
+ message,
+ responseHandlerInfo,
+ flowContext.ContinuationMetadata.ConvergeMethodName,
+ flowContext.ContinuationMetadata.ConvergeMethodSync);
}
}
@@ -489,5 +500,19 @@ namespace Tapeti.Flow.Default
IsDurableQueue = isDurableQueue;
}
}
+
+
+ internal class PreparedRequest
+ {
+ public object Message { get; }
+ public MessageProperties Properties { get; }
+
+
+ public PreparedRequest(object message, MessageProperties properties)
+ {
+ Message = message;
+ Properties = properties;
+ }
+ }
}
}
diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs
index 3d26ca5..6a7306c 100644
--- a/Tapeti.Flow/Default/FlowStore.cs
+++ b/Tapeti.Flow/Default/FlowStore.cs
@@ -6,6 +6,7 @@ using System.Linq;
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Flow.FlowHelpers;
+using Tapeti.Helpers;
namespace Tapeti.Flow.Default
{
diff --git a/Tapeti.Flow/ReSharper/JetBrains.Annotations.cs b/Tapeti.Flow/ReSharper/JetBrains.Annotations.cs
deleted file mode 100644
index 39940b0..0000000
--- a/Tapeti.Flow/ReSharper/JetBrains.Annotations.cs
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Stripped version of the ReSharper Annotations source. Enables hinting without referencing the
- * ReSharper.Annotations NuGet package.
- *
- * If you need more annotations, this code was generated using
- * ReSharper - Options - Code Annotations - Copy C# implementation to clipboard
- */
-
-
-/* MIT License
-
-Copyright (c) 2016 JetBrains http://www.jetbrains.com
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE. */
-
-using System;
-
-#pragma warning disable 1591
-// ReSharper disable UnusedMember.Global
-// ReSharper disable MemberCanBePrivate.Global
-// ReSharper disable UnusedAutoPropertyAccessor.Global
-// ReSharper disable IntroduceOptionalParameters.Global
-// ReSharper disable MemberCanBeProtected.Global
-// ReSharper disable InconsistentNaming
-// ReSharper disable InheritdocConsiderUsage
-
-// ReSharper disable once CheckNamespace
-namespace JetBrains.Annotations
-{
- ///
- /// Indicates that the value of the marked element could be null sometimes,
- /// so the check for null is necessary before its usage.
- ///
- ///
- /// [CanBeNull] object Test() => null;
- ///
- /// void UseTest() {
- /// var p = Test();
- /// var s = p.ToString(); // Warning: Possible 'System.NullReferenceException'
- /// }
- ///
- [AttributeUsage(
- AttributeTargets.Method | AttributeTargets.Parameter | AttributeTargets.Property |
- AttributeTargets.Delegate | AttributeTargets.Field | AttributeTargets.Event |
- AttributeTargets.Class | AttributeTargets.Interface | AttributeTargets.GenericParameter)]
- internal sealed class CanBeNullAttribute : Attribute { }
-
- ///
- /// Indicates that the value of the marked element could never be null.
- ///
- ///
- /// [NotNull] object Foo() {
- /// return null; // Warning: Possible 'null' assignment
- /// }
- ///
- [AttributeUsage(
- AttributeTargets.Method | AttributeTargets.Parameter | AttributeTargets.Property |
- AttributeTargets.Delegate | AttributeTargets.Field | AttributeTargets.Event |
- AttributeTargets.Class | AttributeTargets.Interface | AttributeTargets.GenericParameter)]
- internal sealed class NotNullAttribute : Attribute { }
-
- ///
- /// Indicates that the marked symbol is used implicitly (e.g. via reflection, in external library),
- /// so this symbol will not be marked as unused (as well as by other usage inspections).
- ///
- [AttributeUsage(AttributeTargets.All)]
- internal sealed class UsedImplicitlyAttribute : Attribute
- {
- public UsedImplicitlyAttribute()
- : this(ImplicitUseKindFlags.Default, ImplicitUseTargetFlags.Default) { }
-
- public UsedImplicitlyAttribute(ImplicitUseKindFlags useKindFlags)
- : this(useKindFlags, ImplicitUseTargetFlags.Default) { }
-
- public UsedImplicitlyAttribute(ImplicitUseTargetFlags targetFlags)
- : this(ImplicitUseKindFlags.Default, targetFlags) { }
-
- public UsedImplicitlyAttribute(ImplicitUseKindFlags useKindFlags, ImplicitUseTargetFlags targetFlags)
- {
- UseKindFlags = useKindFlags;
- TargetFlags = targetFlags;
- }
-
- public ImplicitUseKindFlags UseKindFlags { get; }
-
- public ImplicitUseTargetFlags TargetFlags { get; }
- }
-
- ///
- /// Should be used on attributes and causes ReSharper to not mark symbols marked with such attributes
- /// as unused (as well as by other usage inspections)
- ///
- [AttributeUsage(AttributeTargets.Class | AttributeTargets.GenericParameter)]
- internal sealed class MeansImplicitUseAttribute : Attribute
- {
- public MeansImplicitUseAttribute()
- : this(ImplicitUseKindFlags.Default, ImplicitUseTargetFlags.Default) { }
-
- public MeansImplicitUseAttribute(ImplicitUseKindFlags useKindFlags)
- : this(useKindFlags, ImplicitUseTargetFlags.Default) { }
-
- public MeansImplicitUseAttribute(ImplicitUseTargetFlags targetFlags)
- : this(ImplicitUseKindFlags.Default, targetFlags) { }
-
- public MeansImplicitUseAttribute(ImplicitUseKindFlags useKindFlags, ImplicitUseTargetFlags targetFlags)
- {
- UseKindFlags = useKindFlags;
- TargetFlags = targetFlags;
- }
-
- [UsedImplicitly] public ImplicitUseKindFlags UseKindFlags { get; private set; }
-
- [UsedImplicitly] public ImplicitUseTargetFlags TargetFlags { get; private set; }
- }
-
- [Flags]
- internal enum ImplicitUseKindFlags
- {
- Default = Access | Assign | InstantiatedWithFixedConstructorSignature,
- /// Only entity marked with attribute considered used.
- Access = 1,
- /// Indicates implicit assignment to a member.
- Assign = 2,
- ///
- /// Indicates implicit instantiation of a type with fixed constructor signature.
- /// That means any unused constructor parameters won't be reported as such.
- ///
- InstantiatedWithFixedConstructorSignature = 4,
- /// Indicates implicit instantiation of a type.
- InstantiatedNoFixedConstructorSignature = 8
- }
-
- ///
- /// Specify what is considered used implicitly when marked
- /// with or .
- ///
- [Flags]
- internal enum ImplicitUseTargetFlags
- {
- Default = Itself,
- Itself = 1,
- /// Members of entity marked with attribute are considered used.
- Members = 2,
- /// Entity marked with attribute and all its members considered used.
- WithMembers = Itself | Members
- }
-
- ///
- /// This attribute is intended to mark publicly available API
- /// which should not be removed and so is treated as used.
- ///
- [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)]
- internal sealed class PublicAPIAttribute : Attribute
- {
- public PublicAPIAttribute() { }
-
- public PublicAPIAttribute(string comment)
- {
- Comment = comment;
- }
-
- public string? Comment { get; }
- }
-}
\ No newline at end of file
diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj
index b772e71..dd287a4 100644
--- a/Tapeti.Flow/Tapeti.Flow.csproj
+++ b/Tapeti.Flow/Tapeti.Flow.csproj
@@ -11,7 +11,6 @@
https://github.com/MvRens/Tapeti
Tapeti.Flow.png
2.0.0
- 9
enable
@@ -19,11 +18,6 @@
1701;1702
-
-
- IDE0066
-
-
diff --git a/Tapeti.Ninject/Tapeti.Ninject.csproj b/Tapeti.Ninject/Tapeti.Ninject.csproj
index d31c98b..0c9fd16 100644
--- a/Tapeti.Ninject/Tapeti.Ninject.csproj
+++ b/Tapeti.Ninject/Tapeti.Ninject.csproj
@@ -11,7 +11,6 @@
https://github.com/MvRens/Tapeti
Tapeti.SimpleInjector.png
2.0.0
- 9
enable
diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj
index ddef97e..9bd340c 100644
--- a/Tapeti.Serilog/Tapeti.Serilog.csproj
+++ b/Tapeti.Serilog/Tapeti.Serilog.csproj
@@ -11,7 +11,6 @@
https://github.com/MvRens/Tapeti
Tapeti.Serilog.png
2.0.0
- 9
enable
diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
index c0c5614..d258990 100644
--- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
+++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
@@ -11,7 +11,6 @@
https://github.com/MvRens/Tapeti
Tapeti.SimpleInjector.png
2.0.0
- 9
enable
diff --git a/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs b/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs
new file mode 100644
index 0000000..d58a145
--- /dev/null
+++ b/Tapeti.Tests/Client/Controller/RequestResponseFilterController.cs
@@ -0,0 +1,65 @@
+using System.Threading.Tasks;
+using Tapeti.Config.Annotations;
+
+namespace Tapeti.Tests.Client.Controller
+{
+ [Annotations.Request(Response = typeof(FilteredResponseMessage))]
+ public class FilteredRequestMessage
+ {
+ public int ExpectedHandler { get; set; }
+ }
+
+ public class FilteredResponseMessage
+ {
+ public int ExpectedHandler { get; set; }
+ }
+
+
+ #pragma warning disable CA1822 // Mark members as static
+ [MessageController]
+ [DurableQueue("request.response.filter")]
+ public class RequestResponseFilterController
+ {
+ public static TaskCompletionSource ValidResponse { get; private set; } = new();
+ public static TaskCompletionSource InvalidResponse { get; private set; } = new();
+
+
+ public FilteredResponseMessage EchoRequest(FilteredRequestMessage message)
+ {
+ return new FilteredResponseMessage
+ {
+ ExpectedHandler = message.ExpectedHandler
+ };
+ }
+
+
+ [NoBinding]
+ public static void ResetCompletionSource()
+ {
+ ValidResponse = new TaskCompletionSource();
+ InvalidResponse = new TaskCompletionSource();
+ }
+
+
+
+ [ResponseHandler]
+ public void Handler1(FilteredResponseMessage message)
+ {
+ if (message.ExpectedHandler != 1)
+ InvalidResponse.TrySetResult(1);
+ else
+ ValidResponse.SetResult(1);
+ }
+
+
+ [ResponseHandler]
+ public void Handler2(FilteredResponseMessage message)
+ {
+ if (message.ExpectedHandler != 2)
+ InvalidResponse.TrySetResult(2);
+ else
+ ValidResponse.SetResult(2);
+ }
+ }
+ #pragma warning restore CA1822
+}
diff --git a/Tapeti.Tests/Client/ControllerTests.cs b/Tapeti.Tests/Client/ControllerTests.cs
new file mode 100644
index 0000000..dacaba0
--- /dev/null
+++ b/Tapeti.Tests/Client/ControllerTests.cs
@@ -0,0 +1,87 @@
+using System.Threading.Tasks;
+using FluentAssertions;
+using SimpleInjector;
+using Tapeti.Config;
+using Tapeti.SimpleInjector;
+using Tapeti.Tests.Client.Controller;
+using Tapeti.Tests.Mock;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Tapeti.Tests.Client
+{
+ [Collection(RabbitMQCollection.Name)]
+ [Trait("Category", "Requires Docker")]
+ public class ControllerTests : IAsyncLifetime
+ {
+ private readonly RabbitMQFixture fixture;
+ private readonly Container container = new();
+
+ private TapetiConnection? connection;
+
+
+ public ControllerTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper)
+ {
+ this.fixture = fixture;
+
+ container.RegisterInstance(new MockLogger(testOutputHelper));
+ }
+
+
+ public Task InitializeAsync()
+ {
+ return Task.CompletedTask;
+ }
+
+
+ public async Task DisposeAsync()
+ {
+ if (connection != null)
+ await connection.DisposeAsync();
+ }
+
+
+
+ [Fact]
+ public async Task RequestResponseFilter()
+ {
+ var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
+ .EnableDeclareDurableQueues()
+ .RegisterController()
+ .Build();
+
+ connection = CreateConnection(config);
+ await connection!.Subscribe();
+
+
+ await connection.GetPublisher().PublishRequest(new FilteredRequestMessage
+ {
+ ExpectedHandler = 2
+ }, c => c.Handler2);
+
+
+ var handler = await RequestResponseFilterController.ValidResponse.Task;
+ handler.Should().Be(2);
+
+ var invalidHandler = await Task.WhenAny(RequestResponseFilterController.InvalidResponse.Task, Task.Delay(1000));
+ invalidHandler.Should().NotBe(RequestResponseFilterController.InvalidResponse.Task);
+ }
+
+
+ private TapetiConnection CreateConnection(ITapetiConfig config)
+ {
+ return new TapetiConnection(config)
+ {
+ Params = new TapetiConnectionParams
+ {
+ HostName = "127.0.0.1",
+ Port = fixture.RabbitMQPort,
+ ManagementPort = fixture.RabbitMQManagementPort,
+ Username = RabbitMQFixture.RabbitMQUsername,
+ Password = RabbitMQFixture.RabbitMQPassword,
+ PrefetchCount = 1
+ }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tapeti.Tests/Client/RabbitMQFixture.cs b/Tapeti.Tests/Client/RabbitMQFixture.cs
index 4a46634..3e8a581 100644
--- a/Tapeti.Tests/Client/RabbitMQFixture.cs
+++ b/Tapeti.Tests/Client/RabbitMQFixture.cs
@@ -63,7 +63,7 @@ namespace Tapeti.Tests.Client
testcontainers = testcontainersBuilder.Build();
- await testcontainers.StartAsync();
+ await testcontainers!.StartAsync();
RabbitMQPort = testcontainers.GetMappedPublicPort(DefaultRabbitMQPort);
RabbitMQManagementPort = testcontainers.GetMappedPublicPort(DefaultRabbitMQManagementPort);
diff --git a/Tapeti.Tests/Client/TapetiClientTests.cs b/Tapeti.Tests/Client/TapetiClientTests.cs
index 0ffd716..d3a7112 100644
--- a/Tapeti.Tests/Client/TapetiClientTests.cs
+++ b/Tapeti.Tests/Client/TapetiClientTests.cs
@@ -1,5 +1,4 @@
-using System;
-using System.Collections.Generic;
+using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
diff --git a/Tapeti.Tests/Config/QueueArgumentsTest.cs b/Tapeti.Tests/Config/QueueArgumentsTest.cs
index 98a74c2..d8d0502 100644
--- a/Tapeti.Tests/Config/QueueArgumentsTest.cs
+++ b/Tapeti.Tests/Config/QueueArgumentsTest.cs
@@ -7,7 +7,7 @@ using System.Threading.Tasks;
using FluentAssertions;
using FluentAssertions.Execution;
using Moq;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
using Tapeti.Config;
using Tapeti.Connection;
using Xunit;
@@ -84,10 +84,10 @@ namespace Tapeti.Tests.Config
{
var config = GetControllerConfig();
- var binding1 = config.Bindings.Single(b => b is IControllerMethodBinding cmb && cmb.Method.Name == "HandleMessage1");
+ var binding1 = config.Bindings.Single(b => b is IControllerMethodBinding { Method.Name: "HandleMessage1" });
binding1.Should().NotBeNull();
- var binding2 = config.Bindings.Single(b => b is IControllerMethodBinding cmb && cmb.Method.Name == "HandleMessage2");
+ var binding2 = config.Bindings.Single(b => b is IControllerMethodBinding { Method.Name: "HandleMessage2" });
binding2.Should().NotBeNull();
diff --git a/Tapeti.Tests/Config/SimpleControllerTest.cs b/Tapeti.Tests/Config/SimpleControllerTest.cs
index 8dca8ad..f909b79 100644
--- a/Tapeti.Tests/Config/SimpleControllerTest.cs
+++ b/Tapeti.Tests/Config/SimpleControllerTest.cs
@@ -1,6 +1,6 @@
using System.Linq;
using FluentAssertions;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
using Tapeti.Config;
using Xunit;
diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj
index a5d8368..1ce8661 100644
--- a/Tapeti.Tests/Tapeti.Tests.csproj
+++ b/Tapeti.Tests/Tapeti.Tests.csproj
@@ -11,9 +11,10 @@
-
+
+
@@ -23,6 +24,7 @@
+
diff --git a/Tapeti.Transient/Tapeti.Transient.csproj b/Tapeti.Transient/Tapeti.Transient.csproj
index 27af569..bb56b5b 100644
--- a/Tapeti.Transient/Tapeti.Transient.csproj
+++ b/Tapeti.Transient/Tapeti.Transient.csproj
@@ -11,7 +11,6 @@
https://github.com/MvRens/Tapeti
Tapeti.Flow.png
2.0.0
- 9
enable
diff --git a/Tapeti/Config/Annotations/BackwardsCompatibilityHelpers.cs b/Tapeti/Config/Annotations/BackwardsCompatibilityHelpers.cs
new file mode 100644
index 0000000..595e02e
--- /dev/null
+++ b/Tapeti/Config/Annotations/BackwardsCompatibilityHelpers.cs
@@ -0,0 +1,77 @@
+using System;
+using System.Reflection;
+
+#pragma warning disable CS0618 // Obsolete
+#pragma warning disable CS1591 // Missing documentation
+
+
+namespace Tapeti.Config.Annotations
+{
+ ///
+ /// Provides extensions methods to support moved (marked obsolete) attributes from Tapeti.Annotations.
+ ///
+ public static class BackwardsCompatibilityHelpers
+ {
+ public static DurableQueueAttribute? GetDurableQueueAttribute(this MemberInfo member)
+ {
+ return member.GetCustomAttribute() ?? Upgrade(member.GetCustomAttribute());
+ }
+
+ public static DynamicQueueAttribute? GetDynamicQueueAttribute(this MemberInfo member)
+ {
+ return member.GetCustomAttribute() ?? Upgrade(member.GetCustomAttribute());
+ }
+
+ public static QueueArgumentsAttribute? GetQueueArgumentsAttribute(this MemberInfo member)
+ {
+ return member.GetCustomAttribute() ?? Upgrade(member.GetCustomAttribute());
+ }
+
+ public static ResponseHandlerAttribute? GetResponseHandlerAttribute(this MemberInfo member)
+ {
+ return member.GetCustomAttribute() ?? Upgrade(member.GetCustomAttribute());
+ }
+
+
+ public static bool HasMessageControllerAttribute(this MemberInfo member)
+ {
+ return member.IsDefined(typeof(MessageControllerAttribute)) || member.IsDefined(typeof(Tapeti.Annotations.MessageControllerAttribute));
+ }
+
+
+ private static DurableQueueAttribute? Upgrade(Tapeti.Annotations.DurableQueueAttribute? attribute)
+ {
+ return attribute == null ? null : new DurableQueueAttribute(attribute.Name);
+ }
+
+ private static DynamicQueueAttribute? Upgrade(Tapeti.Annotations.DynamicQueueAttribute? attribute)
+ {
+ return attribute == null ? null : new DynamicQueueAttribute(attribute.Prefix);
+ }
+
+ private static QueueArgumentsAttribute? Upgrade(Tapeti.Annotations.QueueArgumentsAttribute? attribute)
+ {
+ return attribute == null
+ ? null
+ : new QueueArgumentsAttribute(attribute.CustomArguments)
+ {
+ MaxLength = attribute.MaxLength,
+ MaxLengthBytes = attribute.MaxLengthBytes,
+ Overflow = attribute.Overflow switch
+ {
+ Tapeti.Annotations.RabbitMQOverflow.NotSpecified => RabbitMQOverflow.NotSpecified,
+ Tapeti.Annotations.RabbitMQOverflow.DropHead => RabbitMQOverflow.DropHead,
+ Tapeti.Annotations.RabbitMQOverflow.RejectPublish => RabbitMQOverflow.RejectPublish,
+ Tapeti.Annotations.RabbitMQOverflow.RejectPublishDeadletter => RabbitMQOverflow.RejectPublishDeadletter,
+ _ => throw new ArgumentOutOfRangeException(nameof(attribute.Overflow))
+ },
+ MessageTTL = attribute.MessageTTL
+ };
+ }
+
+ private static ResponseHandlerAttribute? Upgrade(Tapeti.Annotations.ResponseHandlerAttribute? attribute)
+ {
+ return attribute == null ? null : new ResponseHandlerAttribute();
+ }
+ }
+}
diff --git a/Tapeti/Config/Annotations/DurableQueueAttribute.cs b/Tapeti/Config/Annotations/DurableQueueAttribute.cs
new file mode 100644
index 0000000..7a884a1
--- /dev/null
+++ b/Tapeti/Config/Annotations/DurableQueueAttribute.cs
@@ -0,0 +1,28 @@
+using System;
+using JetBrains.Annotations;
+
+namespace Tapeti.Config.Annotations
+{
+ ///
+ /// Binds to an existing durable queue to receive messages. Can be used
+ /// on an entire MessageController class or on individual methods.
+ ///
+ [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
+ [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)]
+ [PublicAPI]
+ public class DurableQueueAttribute : Attribute
+ {
+ ///
+ /// Specifies the name of the durable queue (must already be declared).
+ ///
+ public string Name { get; set; }
+
+
+ ///
+ /// The name of the durable queue
+ public DurableQueueAttribute(string name)
+ {
+ Name = name;
+ }
+ }
+}
diff --git a/Tapeti/Config/Annotations/DynamicQueueAttribute.cs b/Tapeti/Config/Annotations/DynamicQueueAttribute.cs
new file mode 100644
index 0000000..ba14342
--- /dev/null
+++ b/Tapeti/Config/Annotations/DynamicQueueAttribute.cs
@@ -0,0 +1,32 @@
+using System;
+using JetBrains.Annotations;
+
+namespace Tapeti.Config.Annotations
+{
+ ///
+ /// Creates a non-durable auto-delete queue to receive messages. Can be used
+ /// on an entire MessageController class or on individual methods.
+ ///
+ [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
+ [MeansImplicitUse]
+ [PublicAPI]
+ public class DynamicQueueAttribute : Attribute
+ {
+ ///
+ /// An optional prefix. If specified, Tapeti will compose the queue name using the
+ /// prefix and a unique ID. If not specified, an empty queue name will be passed
+ /// to RabbitMQ thus letting it create a unique queue name.
+ ///
+ public string? Prefix { get; set; }
+
+
+ ///
+ /// An optional prefix. If specified, Tapeti will compose the queue name using the
+ /// prefix and a unique ID. If not specified, an empty queue name will be passed
+ /// to RabbitMQ thus letting it create a unique queue name.
+ public DynamicQueueAttribute(string? prefix = null)
+ {
+ Prefix = prefix;
+ }
+ }
+}
diff --git a/Tapeti/Config/Annotations/MessageControllerAttribute.cs b/Tapeti/Config/Annotations/MessageControllerAttribute.cs
new file mode 100644
index 0000000..8f9b720
--- /dev/null
+++ b/Tapeti/Config/Annotations/MessageControllerAttribute.cs
@@ -0,0 +1,16 @@
+using System;
+using JetBrains.Annotations;
+
+namespace Tapeti.Config.Annotations
+{
+ ///
+ /// Attaching this attribute to a class includes it in the auto-discovery of message controllers
+ /// when using the RegisterAllControllers method. It is not required when manually registering a controller.
+ ///
+ [AttributeUsage(AttributeTargets.Class)]
+ [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)]
+ [PublicAPI]
+ public class MessageControllerAttribute : Attribute
+ {
+ }
+}
diff --git a/Tapeti/Config/Annotations/NoBindingAttribute.cs b/Tapeti/Config/Annotations/NoBindingAttribute.cs
new file mode 100644
index 0000000..4150fa0
--- /dev/null
+++ b/Tapeti/Config/Annotations/NoBindingAttribute.cs
@@ -0,0 +1,14 @@
+using System;
+using JetBrains.Annotations;
+
+namespace Tapeti.Config.Annotations
+{
+ ///
+ /// Indicates that the method is not a message handler and should not be bound by Tapeti.
+ ///
+ [AttributeUsage(AttributeTargets.Method)]
+ [PublicAPI]
+ public class NoBindingAttribute : Attribute
+ {
+ }
+}
diff --git a/Tapeti/Config/Annotations/QueueArgumentsAttribute.cs b/Tapeti/Config/Annotations/QueueArgumentsAttribute.cs
new file mode 100644
index 0000000..7ace9e9
--- /dev/null
+++ b/Tapeti/Config/Annotations/QueueArgumentsAttribute.cs
@@ -0,0 +1,106 @@
+using System;
+using System.Collections.Generic;
+using JetBrains.Annotations;
+
+namespace Tapeti.Config.Annotations
+{
+ ///
+ /// Determines the overflow behaviour of a queue that has reached it's maximum as set by or .
+ ///
+ [PublicAPI]
+ public enum RabbitMQOverflow
+ {
+ ///
+ /// The argument will not be explicitly specified and use the RabbitMQ default, which is equivalent to .
+ ///
+ NotSpecified,
+
+ ///
+ /// Discards or dead-letters the oldest published message. This is the default value.
+ ///
+ DropHead,
+
+ ///
+ /// Discards the most recently published messages and nacks the message.
+ ///
+ RejectPublish,
+
+ ///
+ /// Dead-letters the most recently published messages and nacks the message.
+ ///
+ RejectPublishDeadletter
+ }
+
+
+ ///
+ /// Specifies the optional queue arguments (also known as 'x-arguments') used when declaring
+ /// the queue.
+ ///
+ ///
+ /// The QueueArguments attribute can be applied to any controller or method and will affect the queue
+ /// that is used in that context. For durable queues, at most one QueueArguments attribute can be specified
+ /// per unique queue name.
+ ///
+ /// Also note that queue arguments can not be changed after a queue is declared. You should declare a new queue
+ /// and make the old one Obsolete to have Tapeti automatically removed it once it is empty. Tapeti will use the
+ /// existing queue, but log a warning at startup time.
+ ///
+ [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
+ [PublicAPI]
+ public class QueueArgumentsAttribute : Attribute
+ {
+ ///
+ /// The maximum number of messages in the queue. Set to determine the overflow behaviour.
+ ///
+ ///
+ /// Corresponds to 'max-length'. See
+ ///
+ public int MaxLength { get; set; }
+
+ ///
+ /// The maximum number of bytes in the queue (counting only the message bodies). Set to determine the overflow behaviour.
+ ///
+ ///
+ /// Corresponds to 'x-max-length-bytes'. See
+ ///
+ public int MaxLengthBytes { get; set; }
+
+
+ ///
+ ///
+ /// Corresponds to 'x-overflow'. Default is to drop or deadletter the oldest messages in the queue. See
+ ///
+ public RabbitMQOverflow Overflow { get; set; } = RabbitMQOverflow.NotSpecified;
+
+
+ ///
+ /// Specifies the maximum Time-to-Live for messages in the queue, in milliseconds.
+ ///
+ ///
+ /// Corresponds to 'x-message-ttl'. See
+ ///
+ public int MessageTTL { get; set; }
+
+
+ ///
+ /// Any arguments to add which are not supported by properties of QueueArguments.
+ ///
+ public IReadOnlyDictionary CustomArguments { get; private set; }
+
+
+ ///
+ /// Any arguments to add which are not supported by properties of QueueArguments. Must be a multiple of 2, specify each key followed by the value.
+ public QueueArgumentsAttribute(params object[] customArguments)
+ {
+ if (customArguments.Length % 2 != 0)
+ throw new ArgumentException("customArguments must be a multiple of 2 to specify each key-value combination", nameof(customArguments));
+
+ var customArgumentsPairs = new Dictionary();
+
+ for (var i = 0; i < customArguments.Length; i += 2)
+ customArgumentsPairs[(string)customArguments[i]] = customArguments[i + 1];
+
+ CustomArguments = customArgumentsPairs;
+ }
+ }
+}
diff --git a/Tapeti/Config/Annotations/ResponseHandlerAttribute.cs b/Tapeti/Config/Annotations/ResponseHandlerAttribute.cs
new file mode 100644
index 0000000..66bdf10
--- /dev/null
+++ b/Tapeti/Config/Annotations/ResponseHandlerAttribute.cs
@@ -0,0 +1,15 @@
+using JetBrains.Annotations;
+using System;
+
+namespace Tapeti.Config.Annotations
+{
+ ///
+ /// Indicates that the method only handles response messages which are sent directly
+ /// to the queue. No binding will be created.
+ ///
+ [AttributeUsage(AttributeTargets.Method)]
+ [PublicAPI]
+ public class ResponseHandlerAttribute : Attribute
+ {
+ }
+}
diff --git a/Tapeti/Config/ITapetiConfigBuilder.cs b/Tapeti/Config/ITapetiConfigBuilder.cs
index 4fc1713..5d517c9 100644
--- a/Tapeti/Config/ITapetiConfigBuilder.cs
+++ b/Tapeti/Config/ITapetiConfigBuilder.cs
@@ -1,4 +1,5 @@
using System;
+using JetBrains.Annotations;
// ReSharper disable UnusedMember.Global
@@ -8,6 +9,7 @@ namespace Tapeti.Config
/// Configures Tapeti. Every method other than Build returns the builder instance
/// for method chaining.
///
+ [PublicAPI]
public interface ITapetiConfigBuilder
{
///
diff --git a/Tapeti/Connection/TapetiPublisher.cs b/Tapeti/Connection/TapetiPublisher.cs
index 8f18b10..8f7d06b 100644
--- a/Tapeti/Connection/TapetiPublisher.cs
+++ b/Tapeti/Connection/TapetiPublisher.cs
@@ -4,6 +4,7 @@ using System.Reflection;
using System.Threading.Tasks;
using Tapeti.Annotations;
using Tapeti.Config;
+using Tapeti.Config.Annotations;
using Tapeti.Default;
using Tapeti.Helpers;
@@ -72,7 +73,7 @@ namespace Tapeti.Connection
if (!binding.Accept(requestAttribute.Response))
throw new ArgumentException($"responseHandler must accept message of type {requestAttribute.Response}", nameof(responseHandler));
- var responseHandleAttribute = binding.Method.GetCustomAttribute();
+ var responseHandleAttribute = binding.Method.GetResponseHandlerAttribute();
if (responseHandleAttribute == null)
throw new ArgumentException("responseHandler must be marked with the ResponseHandler attribute", nameof(responseHandler));
@@ -82,6 +83,7 @@ namespace Tapeti.Connection
var properties = new MessageProperties
{
+ CorrelationId = ResponseFilterMiddleware.CorrelationIdRequestPrefix + MethodSerializer.Serialize(responseHandler),
ReplyTo = binding.QueueName
};
diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs
index 10d8ae4..93ca8c1 100644
--- a/Tapeti/Default/ControllerMethodBinding.cs
+++ b/Tapeti/Default/ControllerMethodBinding.cs
@@ -120,7 +120,7 @@ namespace Tapeti.Default
QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments);
else
{
- await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments);
+ await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments);
QueueName = bindingInfo.QueueInfo.Name;
}
@@ -131,7 +131,7 @@ namespace Tapeti.Default
QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments);
else
{
- await target.BindDurableDirect(bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments);
+ await target.BindDurableDirect(bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments);
QueueName = bindingInfo.QueueInfo.Name;
}
@@ -143,7 +143,7 @@ namespace Tapeti.Default
}
else if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Durable)
{
- await target.BindDurableObsolete(bindingInfo.QueueInfo.Name);
+ await target.BindDurableObsolete(bindingInfo.QueueInfo.Name!);
QueueName = bindingInfo.QueueInfo.Name;
}
}
@@ -317,7 +317,7 @@ namespace Tapeti.Default
///
/// The name of the durable queue, or optional prefix of the dynamic queue.
///
- public string Name { get; set; }
+ public string? Name { get; set; }
///
/// Optional arguments (x-arguments) passed when declaring the queue.
@@ -330,7 +330,7 @@ namespace Tapeti.Default
public bool IsValid => QueueType == QueueType.Dynamic || !string.IsNullOrEmpty(Name);
- public QueueInfo(QueueType queueType, string name)
+ public QueueInfo(QueueType queueType, string? name)
{
QueueType = queueType;
Name = name;
diff --git a/Tapeti/Default/DependencyResolverBinding.cs b/Tapeti/Default/DependencyResolverBinding.cs
index 3f2c81e..bd99f19 100644
--- a/Tapeti/Default/DependencyResolverBinding.cs
+++ b/Tapeti/Default/DependencyResolverBinding.cs
@@ -15,7 +15,7 @@ namespace Tapeti.Default
{
next();
- foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType.IsClass))
+ foreach (var parameter in context.Parameters.Where(p => p is { HasBinding: false, Info.ParameterType.IsClass: true }))
parameter.SetBinding(messageContext => messageContext.Config.DependencyResolver.Resolve(parameter.Info.ParameterType));
}
}
diff --git a/Tapeti/Default/ResponseFilterMiddleware.cs b/Tapeti/Default/ResponseFilterMiddleware.cs
new file mode 100644
index 0000000..8d990b6
--- /dev/null
+++ b/Tapeti/Default/ResponseFilterMiddleware.cs
@@ -0,0 +1,37 @@
+using System;
+using System.Threading.Tasks;
+using Tapeti.Config;
+using Tapeti.Helpers;
+
+namespace Tapeti.Default
+{
+ /// />
+ ///
+ /// Handles methods marked with the ResponseHandler attribute.
+ ///
+ internal class ResponseFilterMiddleware : IControllerFilterMiddleware//, IControllerMessageMiddleware
+ {
+ internal const string CorrelationIdRequestPrefix = "request|";
+
+
+ public async ValueTask Filter(IMessageContext context, Func next)
+ {
+ if (!context.TryGet(out var controllerPayload))
+ return;
+
+ // If no CorrelationId is present, this could be a request-response in flight from a previous version of
+ // Tapeti so we should not filter the response handler.
+ if (!string.IsNullOrEmpty(context.Properties.CorrelationId))
+ {
+ if (!context.Properties.CorrelationId.StartsWith(CorrelationIdRequestPrefix))
+ return;
+
+ var methodName = context.Properties.CorrelationId[CorrelationIdRequestPrefix.Length..];
+ if (methodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
+ return;
+ }
+
+ await next();
+ }
+ }
+}
diff --git a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs b/Tapeti/Helpers/MethodSerializer.cs
similarity index 97%
rename from Tapeti.Flow/FlowHelpers/MethodSerializer.cs
rename to Tapeti/Helpers/MethodSerializer.cs
index 38c6f93..e0ea4a4 100644
--- a/Tapeti.Flow/FlowHelpers/MethodSerializer.cs
+++ b/Tapeti/Helpers/MethodSerializer.cs
@@ -1,7 +1,7 @@
using System.Reflection;
using System.Text.RegularExpressions;
-namespace Tapeti.Flow.FlowHelpers
+namespace Tapeti.Helpers
{
///
/// Converts a method into a unique string representation.
diff --git a/Tapeti/MessageController.cs b/Tapeti/MessageController.cs
index 138ca97..2cac796 100644
--- a/Tapeti/MessageController.cs
+++ b/Tapeti/MessageController.cs
@@ -1,4 +1,4 @@
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
// ReSharper disable UnusedMember.Global
diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj
index 67eaba9..7d3a967 100644
--- a/Tapeti/Tapeti.csproj
+++ b/Tapeti/Tapeti.csproj
@@ -11,7 +11,6 @@
Unlicense
https://github.com/MvRens/Tapeti
Tapeti.png
- 9
enable
@@ -19,20 +18,12 @@
1701;1702
-
-
-
-
+
-
-
-
-
-
True
diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs
index 8459445..7371eca 100644
--- a/Tapeti/TapetiConfigControllers.cs
+++ b/Tapeti/TapetiConfigControllers.cs
@@ -2,7 +2,7 @@ using System;
using System.Linq;
using System.Reflection;
using System.Text;
-using Tapeti.Annotations;
+using Tapeti.Config.Annotations;
using Tapeti.Config;
using Tapeti.Connection;
using Tapeti.Default;
@@ -48,12 +48,18 @@ namespace Tapeti
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false)
.Select(m => (MethodInfo)m))
{
+ if (method.GetCustomAttributes().Any())
+ continue;
+
var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute() != null;
var context = new ControllerBindingContext(controller, method, method.GetParameters(), method.ReturnParameter);
- if (method.GetCustomAttribute() != null)
+ if (method.GetResponseHandlerAttribute() != null)
+ {
context.SetBindingTargetMode(BindingTargetMode.Direct);
+ context.Use(new ResponseFilterMiddleware());
+ }
var allowBinding = false;
@@ -100,6 +106,14 @@ namespace Tapeti
}
+ ///
+ public static ITapetiConfigBuilder RegisterController(this ITapetiConfigBuilder builder) where TController : class
+ {
+ return RegisterController(builder, typeof(TController));
+ }
+
+
+
///
/// Registers all controllers in the specified assembly which are marked with the MessageController attribute.
///
@@ -107,7 +121,7 @@ namespace Tapeti
/// The assembly to scan for controllers.
public static ITapetiConfigBuilder RegisterAllControllers(this ITapetiConfigBuilder builder, Assembly assembly)
{
- foreach (var type in assembly.GetTypes().Where(t => t.IsDefined(typeof(MessageControllerAttribute))))
+ foreach (var type in assembly.GetTypes().Where(t => t.HasMessageControllerAttribute()))
RegisterController(builder, type);
return builder;
@@ -130,9 +144,9 @@ namespace Tapeti
private static ControllerMethodBinding.QueueInfo? GetQueueInfo(MemberInfo member, ControllerMethodBinding.QueueInfo? fallbackQueueInfo)
{
- var dynamicQueueAttribute = member.GetCustomAttribute();
- var durableQueueAttribute = member.GetCustomAttribute();
- var queueArgumentsAttribute = member.GetCustomAttribute();
+ var dynamicQueueAttribute = member.GetDynamicQueueAttribute();
+ var durableQueueAttribute = member.GetDurableQueueAttribute();
+ var queueArgumentsAttribute = member.GetQueueArgumentsAttribute();
if (dynamicQueueAttribute != null && durableQueueAttribute != null)
throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on controller {member.DeclaringType?.Name} method {member.Name}");
@@ -142,7 +156,7 @@ namespace Tapeti
QueueType queueType;
- string name;
+ string? name;
if (dynamicQueueAttribute != null)
@@ -180,10 +194,8 @@ namespace Tapeti
string stringValue => Encoding.UTF8.GetBytes(stringValue),
_ => p.Value
}
- ))
- {
+ ));
- };
if (queueArgumentsAttribute.MaxLength > 0)
arguments.Add(@"x-max-length", queueArgumentsAttribute.MaxLength);