diff --git a/Tapeti.Annotations/DurableQueueAttribute.cs b/Tapeti.Annotations/DurableQueueAttribute.cs
index 281d91f..8971044 100644
--- a/Tapeti.Annotations/DurableQueueAttribute.cs
+++ b/Tapeti.Annotations/DurableQueueAttribute.cs
@@ -14,9 +14,12 @@ namespace Tapeti.Annotations
/// for deploy-time management of durable queues (shameless plug intended).
///
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
- [MeansImplicitUse]
+ [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)]
public class DurableQueueAttribute : Attribute
{
+ ///
+ /// Specifies the name of the durable queue (must already be declared).
+ ///
public string Name { get; set; }
diff --git a/Tapeti.Annotations/DynamicQueueAttribute.cs b/Tapeti.Annotations/DynamicQueueAttribute.cs
index 3743edf..240b001 100644
--- a/Tapeti.Annotations/DynamicQueueAttribute.cs
+++ b/Tapeti.Annotations/DynamicQueueAttribute.cs
@@ -12,6 +12,11 @@ namespace Tapeti.Annotations
[MeansImplicitUse]
public class DynamicQueueAttribute : Attribute
{
+ ///
+ /// An optional prefix. If specified, Tapeti will compose the queue name using the
+ /// prefix and a unique ID. If not specified, an empty queue name will be passed
+ /// to RabbitMQ thus letting it create a unique queue name.
+ ///
public string Prefix { get; set; }
diff --git a/Tapeti.Annotations/MessageControllerAttribute.cs b/Tapeti.Annotations/MessageControllerAttribute.cs
index 150fefc..6a18416 100644
--- a/Tapeti.Annotations/MessageControllerAttribute.cs
+++ b/Tapeti.Annotations/MessageControllerAttribute.cs
@@ -9,7 +9,7 @@ namespace Tapeti.Annotations
/// when using the RegisterAllControllers method. It is not required when manually registering a controller.
///
[AttributeUsage(AttributeTargets.Class)]
- [MeansImplicitUse]
+ [MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)]
public class MessageControllerAttribute : Attribute
{
}
diff --git a/Tapeti.Annotations/MessageHandlerAttribute.cs b/Tapeti.Annotations/MessageHandlerAttribute.cs
deleted file mode 100644
index d13724e..0000000
--- a/Tapeti.Annotations/MessageHandlerAttribute.cs
+++ /dev/null
@@ -1,17 +0,0 @@
-using System;
-using JetBrains.Annotations;
-
-namespace Tapeti.Annotations
-{
- ///
- ///
- /// This attribute does nothing in runtime and is not required. It is only used as
- /// a hint to ReSharper, and maybe developers as well, to indicate the method is
- /// indeed used.
- ///
- [AttributeUsage(AttributeTargets.Method)]
- [MeansImplicitUse]
- public class MessageHandlerAttribute : Attribute
- {
- }
-}
diff --git a/Tapeti.Annotations/RequestAttribute.cs b/Tapeti.Annotations/RequestAttribute.cs
index 2f14097..f298c50 100644
--- a/Tapeti.Annotations/RequestAttribute.cs
+++ b/Tapeti.Annotations/RequestAttribute.cs
@@ -13,6 +13,9 @@ namespace Tapeti.Annotations
[AttributeUsage(AttributeTargets.Class)]
public class RequestAttribute : Attribute
{
+ ///
+ /// The type of the message class which must be returned as the response.
+ ///
public Type Response { get; set; }
}
}
diff --git a/Tapeti.Annotations/Tapeti.Annotations.csproj b/Tapeti.Annotations/Tapeti.Annotations.csproj
index 9f5c4f4..be5c9ef 100644
--- a/Tapeti.Annotations/Tapeti.Annotations.csproj
+++ b/Tapeti.Annotations/Tapeti.Annotations.csproj
@@ -2,6 +2,7 @@
netstandard2.0
+ true
diff --git a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj
index 3df5704..52e0d73 100644
--- a/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj
+++ b/Tapeti.DataAnnotations/Tapeti.DataAnnotations.csproj
@@ -2,6 +2,7 @@
netstandard2.0
+ true
diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
index b80a796..eaa2e91 100644
--- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
+++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj
@@ -2,6 +2,7 @@
netstandard2.0
+ true
diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs
index 289b246..5c6d9d9 100644
--- a/Tapeti.Flow/Default/FlowProvider.cs
+++ b/Tapeti.Flow/Default/FlowProvider.cs
@@ -105,9 +105,9 @@ namespace Tapeti.Flow.Default
// TODO disallow if replyto is not specified?
if (reply.ReplyTo != null)
- await publisher.PublishDirect(message, reply.ReplyTo, properties, true);
+ await publisher.PublishDirect(message, reply.ReplyTo, properties, reply.Mandatory);
else
- await publisher.Publish(message, properties, true);
+ await publisher.Publish(message, properties, reply.Mandatory);
await context.Delete();
}
@@ -129,8 +129,8 @@ namespace Tapeti.Flow.Default
throw new ArgumentException("responseHandler must be a registered message handler", nameof(responseHandler));
var requestAttribute = request.GetType().GetCustomAttribute();
- if (requestAttribute?.Response != null && requestAttribute.Response != binding.MessageClass)
- throw new ArgumentException($"responseHandler must accept message of type {binding.MessageClass}", nameof(responseHandler));
+ if (requestAttribute?.Response != null && !binding.Accept(requestAttribute.Response))
+ throw new ArgumentException($"responseHandler must accept message of type {requestAttribute.Response}", nameof(responseHandler));
var continuationAttribute = binding.Method.GetCustomAttribute();
if (continuationAttribute == null)
@@ -157,7 +157,8 @@ namespace Tapeti.Flow.Default
{
CorrelationId = context.Properties.CorrelationId,
ReplyTo = context.Properties.ReplyTo,
- ResponseTypeName = requestAttribute.Response.FullName
+ ResponseTypeName = requestAttribute.Response.FullName,
+ Mandatory = context.Properties.Persistent
};
}
diff --git a/Tapeti.Flow/Default/FlowState.cs b/Tapeti.Flow/Default/FlowState.cs
index d600a8f..e32430c 100644
--- a/Tapeti.Flow/Default/FlowState.cs
+++ b/Tapeti.Flow/Default/FlowState.cs
@@ -57,6 +57,8 @@ namespace Tapeti.Flow.Default
public string CorrelationId { get; set; }
public string ResponseTypeName { get; set; }
+ public bool Mandatory { get; set; }
+
public ReplyMetadata Clone()
{
@@ -64,7 +66,8 @@ namespace Tapeti.Flow.Default
{
ReplyTo = ReplyTo,
CorrelationId = CorrelationId,
- ResponseTypeName = ResponseTypeName
+ ResponseTypeName = ResponseTypeName,
+ Mandatory = Mandatory
};
}
}
diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs
index 2597007..b98923d 100644
--- a/Tapeti.Flow/Default/FlowStore.cs
+++ b/Tapeti.Flow/Default/FlowStore.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Tapeti.Flow.FlowHelpers;
@@ -16,6 +17,7 @@ namespace Tapeti.Flow.Default
private readonly IFlowRepository repository;
private volatile bool inUse;
+ private volatile bool loaded;
public FlowStore(IFlowRepository repository)
{
@@ -40,17 +42,25 @@ namespace Tapeti.Flow.Default
foreach (var continuation in flowStateRecord.Value.Continuations)
continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key);
}
+
+ loaded = true;
}
public Task FindFlowID(Guid continuationID)
{
+ if (!loaded)
+ throw new InvalidOperationException("Flow store is not yet loaded.");
+
return Task.FromResult(continuationLookup.TryGetValue(continuationID, out var result) ? result : (Guid?)null);
}
public async Task LockFlowState(Guid flowID)
{
+ if (!loaded)
+ throw new InvalidOperationException("Flow store should be loaded before storing flows.");
+
inUse = true;
var flowStatelock = new FlowStateLock(this, flowID, await locks.GetLock(flowID));
diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj
index cc0d9d9..105aa14 100644
--- a/Tapeti.Flow/Tapeti.Flow.csproj
+++ b/Tapeti.Flow/Tapeti.Flow.csproj
@@ -2,6 +2,7 @@
netstandard2.0
+ true
diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj
index ec64a23..b33e71b 100644
--- a/Tapeti.Serilog/Tapeti.Serilog.csproj
+++ b/Tapeti.Serilog/Tapeti.Serilog.csproj
@@ -2,6 +2,7 @@
netstandard2.0
+ true
diff --git a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
index 25ed29e..ed72a19 100644
--- a/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
+++ b/Tapeti.SimpleInjector/Tapeti.SimpleInjector.csproj
@@ -2,6 +2,7 @@
netstandard2.0
+ true
diff --git a/Tapeti.Tests/TransientFilterMiddleware.cs b/Tapeti.Tests/TransientFilterMiddleware.cs
new file mode 100644
index 0000000..d311f03
--- /dev/null
+++ b/Tapeti.Tests/TransientFilterMiddleware.cs
@@ -0,0 +1,14 @@
+using System;
+using System.Threading.Tasks;
+using Tapeti.Config;
+
+namespace Tapeti.Tests
+{
+ public class TransientFilterMiddleware : IMessageFilterMiddleware
+ {
+ public Task Handle(IMessageContext context, Func next)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tapeti.Transient/ConfigExtentions.cs b/Tapeti.Transient/ConfigExtentions.cs
new file mode 100644
index 0000000..7401578
--- /dev/null
+++ b/Tapeti.Transient/ConfigExtentions.cs
@@ -0,0 +1,13 @@
+using System;
+
+namespace Tapeti.Transient
+{
+ public static class ConfigExtensions
+ {
+ public static TapetiConfig WithTransient(this TapetiConfig config, TimeSpan defaultTimeout, string dynamicQueuePrefix = "transient")
+ {
+ config.Use(new TransientMiddleware(defaultTimeout, dynamicQueuePrefix));
+ return config;
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tapeti.Transient/ITransientPublisher.cs b/Tapeti.Transient/ITransientPublisher.cs
new file mode 100644
index 0000000..2765259
--- /dev/null
+++ b/Tapeti.Transient/ITransientPublisher.cs
@@ -0,0 +1,9 @@
+using System.Threading.Tasks;
+
+namespace Tapeti.Transient
+{
+ public interface ITransientPublisher
+ {
+ Task RequestResponse(TRequest request);
+ }
+}
\ No newline at end of file
diff --git a/Tapeti.Transient/Tapeti.Transient.csproj b/Tapeti.Transient/Tapeti.Transient.csproj
new file mode 100644
index 0000000..f3cca6f
--- /dev/null
+++ b/Tapeti.Transient/Tapeti.Transient.csproj
@@ -0,0 +1,12 @@
+
+
+
+ netstandard2.0
+ true
+
+
+
+
+
+
+
diff --git a/Tapeti.Transient/Tapeti.Transient.nuspec b/Tapeti.Transient/Tapeti.Transient.nuspec
new file mode 100644
index 0000000..ad29af0
--- /dev/null
+++ b/Tapeti.Transient/Tapeti.Transient.nuspec
@@ -0,0 +1,23 @@
+
+
+
+ Tapeti.Transient
+ $version$
+ Tapeti Transient
+ Menno van Lavieren, Mark van Renswoude
+ Mark van Renswoude
+ https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE
+ https://github.com/MvRens/Tapeti
+ https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.Flow.png
+ false
+ Transient extension for Tapeti
+
+ rabbitmq tapeti transient
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs
new file mode 100644
index 0000000..f28643d
--- /dev/null
+++ b/Tapeti.Transient/TransientGenericBinding.cs
@@ -0,0 +1,52 @@
+using System;
+using System.Reflection;
+using System.Threading.Tasks;
+using Tapeti.Config;
+
+namespace Tapeti.Transient
+{
+ public class TransientGenericBinding : ICustomBinding
+ {
+ private readonly TransientRouter router;
+
+ public TransientGenericBinding(TransientRouter router, string dynamicQueuePrefix)
+ {
+ this.router = router;
+ DynamicQueuePrefix = dynamicQueuePrefix;
+ Method = typeof(TransientRouter).GetMethod("GenericHandleResponse");
+ }
+
+ public Type Controller => typeof(TransientRouter);
+
+ public MethodInfo Method { get; }
+
+ public QueueBindingMode QueueBindingMode => QueueBindingMode.DirectToQueue;
+
+ public string StaticQueueName => null;
+
+ public string DynamicQueuePrefix { get; }
+
+ public Type MessageClass => null;
+
+ public bool Accept(Type messageClass)
+ {
+ return true;
+ }
+
+ public bool Accept(IMessageContext context, object message)
+ {
+ return true;
+ }
+
+ public Task Invoke(IMessageContext context, object message)
+ {
+ router.GenericHandleResponse(message, context);
+ return Task.CompletedTask;
+ }
+
+ public void SetQueueName(string queueName)
+ {
+ router.TransientResponseQueueName = queueName;
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tapeti.Transient/TransientMiddleware.cs b/Tapeti.Transient/TransientMiddleware.cs
new file mode 100644
index 0000000..5077fa5
--- /dev/null
+++ b/Tapeti.Transient/TransientMiddleware.cs
@@ -0,0 +1,34 @@
+using System;
+using System.Collections.Generic;
+using Tapeti.Config;
+
+namespace Tapeti.Transient
+{
+ public class TransientMiddleware : ITapetiExtension, ITapetiExtentionBinding
+ {
+ private string dynamicQueuePrefix;
+ private readonly TransientRouter router;
+
+ public TransientMiddleware(TimeSpan defaultTimeout, string dynamicQueuePrefix)
+ {
+ this.dynamicQueuePrefix = dynamicQueuePrefix;
+ this.router = new TransientRouter(defaultTimeout);
+ }
+
+ public void RegisterDefaults(IDependencyContainer container)
+ {
+ container.RegisterDefaultSingleton(router);
+ container.RegisterDefault();
+ }
+
+ public IEnumerable