diff --git a/Tapeti.Flow/ContextItems.cs b/Tapeti.Flow/ContextItems.cs
deleted file mode 100644
index d82ae7d..0000000
--- a/Tapeti.Flow/ContextItems.cs
+++ /dev/null
@@ -1,20 +0,0 @@
-namespace Tapeti.Flow
-{
- ///
- /// Key names as used in the message context store. For internal use.
- ///
- public static class ContextItems
- {
- ///
- /// Key given to the FlowContext object as stored in the message context.
- ///
- public const string FlowContext = "Tapeti.Flow.FlowContext";
-
- ///
- /// Indicates if the current message handler is the last one to be called before a
- /// parallel flow is done and the convergeMethod will be called.
- /// Temporarily disables storing the flow state.
- ///
- public const string FlowIsConverging = "Tapeti.Flow.IsConverging";
- }
-}
diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs
index 8adc5a5..1e21847 100644
--- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs
+++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs
@@ -74,16 +74,16 @@ namespace Tapeti.Flow.Default
}
- private static Task HandleYieldPoint(IControllerMessageContext context, IYieldPoint yieldPoint)
+ private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint)
{
var flowHandler = context.Config.DependencyResolver.Resolve();
return flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);
}
- private static Task HandleParallelResponse(IControllerMessageContext context)
+ private static Task HandleParallelResponse(IMessageContext context)
{
- if (context.Get(ContextItems.FlowIsConverging, out _))
+ if (context.TryGet(out var flowPayload) && flowPayload.FlowIsConverging)
return Task.CompletedTask;
var flowHandler = context.Config.DependencyResolver.Resolve();
diff --git a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs
index b07c819..3f34b49 100644
--- a/Tapeti.Flow/Default/FlowContinuationMiddleware.cs
+++ b/Tapeti.Flow/Default/FlowContinuationMiddleware.cs
@@ -12,24 +12,31 @@ namespace Tapeti.Flow.Default
///
internal class FlowContinuationMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware
{
- public async Task Filter(IControllerMessageContext context, Func next)
+ public async Task Filter(IMessageContext context, Func next)
{
+ if (!context.TryGet(out var controllerPayload))
+ return;
+
var flowContext = await EnrichWithFlowContext(context);
if (flowContext?.ContinuationMetadata == null)
return;
- if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method))
+ if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
return;
await next();
}
- public async Task Handle(IControllerMessageContext context, Func next)
+ public async Task Handle(IMessageContext context, Func next)
{
- if (context.Get(ContextItems.FlowContext, out FlowContext flowContext))
- {
- Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller);
+ if (!context.TryGet(out var controllerPayload))
+ return;
+
+ if (context.TryGet(out var flowPayload))
+ {
+ var flowContext = flowPayload.FlowContext;
+ Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, controllerPayload.Controller);
// Remove Continuation now because the IYieldPoint result handler will store the new state
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
@@ -38,28 +45,33 @@ namespace Tapeti.Flow.Default
if (converge)
// Indicate to the FlowBindingMiddleware that the state must not to be stored
- context.Store(ContextItems.FlowIsConverging, null);
+ flowPayload.FlowIsConverging = true;
await next();
if (converge)
- await CallConvergeMethod(context,
- flowContext.ContinuationMetadata.ConvergeMethodName,
- flowContext.ContinuationMetadata.ConvergeMethodSync);
+ await CallConvergeMethod(context, controllerPayload,
+ flowContext.ContinuationMetadata.ConvergeMethodName,
+ flowContext.ContinuationMetadata.ConvergeMethodSync);
}
else
await next();
}
- public async Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next)
+ public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func next)
{
await next();
- if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext))
+ if (!context.TryGet(out var controllerPayload))
return;
- if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method))
+ if (!context.TryGet(out var flowPayload))
+ return;
+
+ var flowContext = flowPayload.FlowContext;
+
+ if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
// Do not call when the controller method was filtered, if the same message has two methods
return;
@@ -76,10 +88,10 @@ namespace Tapeti.Flow.Default
- private static async Task EnrichWithFlowContext(IControllerMessageContext context)
+ private static async Task EnrichWithFlowContext(IMessageContext context)
{
- if (context.Get(ContextItems.FlowContext, out FlowContext flowContext))
- return flowContext;
+ if (context.TryGet(out var flowPayload))
+ return flowPayload.FlowContext;
if (context.Properties.CorrelationId == null)
@@ -100,7 +112,7 @@ namespace Tapeti.Flow.Default
if (flowState == null)
return null;
- flowContext = new FlowContext
+ var flowContext = new FlowContext
{
HandlerContext = new FlowHandlerContext(context),
@@ -112,26 +124,28 @@ namespace Tapeti.Flow.Default
};
// IDisposable items in the IMessageContext are automatically disposed
- context.Store(ContextItems.FlowContext, flowContext);
+ context.Store(new FlowMessageContextPayload(flowContext));
return flowContext;
}
- private static async Task CallConvergeMethod(IControllerMessageContext context, string methodName, bool sync)
+ private static async Task CallConvergeMethod(IMessageContext context, ControllerMessageContextPayload controllerPayload, string methodName, bool sync)
{
IYieldPoint yieldPoint;
+
+
- var method = context.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
+ var method = controllerPayload.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
- throw new ArgumentException($"Unknown converge method in controller {context.Controller.GetType().Name}: {methodName}");
+ throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {methodName}");
if (sync)
- yieldPoint = (IYieldPoint)method.Invoke(context.Controller, new object[] {});
+ yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] {});
else
- yieldPoint = await (Task)method.Invoke(context.Controller, new object[] { });
+ yieldPoint = await (Task)method.Invoke(controllerPayload.Controller, new object[] { });
if (yieldPoint == null)
- throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for converge method {methodName}");
+ throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {methodName}");
var flowHandler = context.Config.DependencyResolver.Resolve();
await flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);
diff --git a/Tapeti.Flow/Default/FlowHandlerContext.cs b/Tapeti.Flow/Default/FlowHandlerContext.cs
index 6bcbab9..c1e8c38 100644
--- a/Tapeti.Flow/Default/FlowHandlerContext.cs
+++ b/Tapeti.Flow/Default/FlowHandlerContext.cs
@@ -18,15 +18,18 @@ namespace Tapeti.Flow.Default
///
///
- public FlowHandlerContext(IControllerMessageContext source)
+ public FlowHandlerContext(IMessageContext source)
{
if (source == null)
return;
+ if (!source.TryGet(out var controllerPayload))
+ return;
+
Config = source.Config;
- Controller = source.Controller;
- Method = source.Binding.Method;
- ControllerMessageContext = source;
+ Controller = controllerPayload.Controller;
+ Method = controllerPayload.Binding.Method;
+ MessageContext = source;
}
@@ -45,6 +48,6 @@ namespace Tapeti.Flow.Default
public MethodInfo Method { get; set; }
///
- public IControllerMessageContext ControllerMessageContext { get; set; }
+ public IMessageContext MessageContext { get; set; }
}
}
diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs
index d7b28b4..821328e 100644
--- a/Tapeti.Flow/Default/FlowProvider.cs
+++ b/Tapeti.Flow/Default/FlowProvider.cs
@@ -162,16 +162,16 @@ namespace Tapeti.Flow.Default
private static ReplyMetadata GetReply(IFlowHandlerContext context)
{
- var requestAttribute = context.ControllerMessageContext?.Message?.GetType().GetCustomAttribute();
+ var requestAttribute = context.MessageContext?.Message?.GetType().GetCustomAttribute();
if (requestAttribute?.Response == null)
return null;
return new ReplyMetadata
{
- CorrelationId = context.ControllerMessageContext.Properties.CorrelationId,
- ReplyTo = context.ControllerMessageContext.Properties.ReplyTo,
+ CorrelationId = context.MessageContext.Properties.CorrelationId,
+ ReplyTo = context.MessageContext.Properties.ReplyTo,
ResponseTypeName = requestAttribute.Response.FullName,
- Mandatory = context.ControllerMessageContext.Properties.Persistent.GetValueOrDefault(true)
+ Mandatory = context.MessageContext.Properties.Persistent.GetValueOrDefault(true)
};
}
@@ -206,8 +206,8 @@ namespace Tapeti.Flow.Default
try
{
- var messageContext = context.ControllerMessageContext;
- if (messageContext == null || !messageContext.Get(ContextItems.FlowContext, out flowContext))
+ var messageContext = context.MessageContext;
+ if (messageContext == null || !messageContext.TryGet(out var flowPayload))
{
flowContext = new FlowContext
{
@@ -218,7 +218,9 @@ namespace Tapeti.Flow.Default
// in the messageContext as the yield point is the last to execute.
disposeFlowContext = true;
}
-
+ else
+ flowContext = flowPayload.FlowContext;
+
try
{
await executableYieldPoint.Execute(flowContext);
diff --git a/Tapeti.Flow/FlowMessageContextPayload.cs b/Tapeti.Flow/FlowMessageContextPayload.cs
new file mode 100644
index 0000000..6fb97ce
--- /dev/null
+++ b/Tapeti.Flow/FlowMessageContextPayload.cs
@@ -0,0 +1,33 @@
+using System;
+using Tapeti.Config;
+using Tapeti.Flow.Default;
+
+namespace Tapeti.Flow
+{
+ ///
+ /// Contains information about the flow for the current message. For internal use.
+ ///
+ internal class FlowMessageContextPayload : IMessageContextPayload, IDisposable
+ {
+ public FlowContext FlowContext { get; }
+
+ ///
+ /// Indicates if the current message handler is the last one to be called before a
+ /// parallel flow is done and the convergeMethod will be called.
+ /// Temporarily disables storing the flow state.
+ ///
+ public bool FlowIsConverging { get; set; }
+
+
+ public FlowMessageContextPayload(FlowContext flowContext)
+ {
+ FlowContext = flowContext;
+ }
+
+
+ public void Dispose()
+ {
+ FlowContext?.Dispose();
+ }
+ }
+}
diff --git a/Tapeti.Flow/IFlowHandlerContext.cs b/Tapeti.Flow/IFlowHandlerContext.cs
index 08cce12..921dd4e 100644
--- a/Tapeti.Flow/IFlowHandlerContext.cs
+++ b/Tapeti.Flow/IFlowHandlerContext.cs
@@ -29,9 +29,9 @@ namespace Tapeti.Flow
///
- /// Access to the controller message context if this is a continuated flow.
+ /// Access to the message context if this is a continuated flow.
/// Will be null when in a starting flow.
///
- IControllerMessageContext ControllerMessageContext { get; }
+ IMessageContext MessageContext { get; }
}
}
diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs
index a05d11c..98c7864 100644
--- a/Tapeti.Serilog/TapetiSeriLogger.cs
+++ b/Tapeti.Serilog/TapetiSeriLogger.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Text;
using Tapeti.Config;
using ISerilogLogger = Serilog.ILogger;
@@ -12,6 +13,21 @@ namespace Tapeti.Serilog
///
public class TapetiSeriLogger: IBindingLogger
{
+ ///
+ /// Implements the Tapeti ILogger interface for Serilog output. This version
+ /// includes the message body and information if available when an error occurs.
+ ///
+ public class WithMessageLogging : TapetiSeriLogger
+ {
+ ///
+ public WithMessageLogging(ISerilogLogger seriLogger) : base(seriLogger) { }
+
+ internal override bool IncludeMessageInfo() => true;
+ }
+
+
+
+
private readonly ISerilogLogger seriLogger;
@@ -69,20 +85,38 @@ namespace Tapeti.Serilog
///
public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
+ var message = new StringBuilder("Tapeti: exception in message handler");
+ var messageParams = new List();
+
var contextLogger = seriLogger
.ForContext("consumeResult", consumeResult)
.ForContext("exchange", messageContext.Exchange)
.ForContext("queue", messageContext.Queue)
.ForContext("routingKey", messageContext.RoutingKey);
- if (messageContext is IControllerMessageContext controllerMessageContext)
+ if (messageContext.TryGet(out var controllerPayload))
{
contextLogger = contextLogger
- .ForContext("controller", controllerMessageContext.Binding.Controller.FullName)
- .ForContext("method", controllerMessageContext.Binding.Method.Name);
+ .ForContext("controller", controllerPayload.Binding.Controller.FullName)
+ .ForContext("method", controllerPayload.Binding.Method.Name);
+
+ message.Append(" {controller}.{method}");
+ messageParams.Add(controllerPayload.Binding.Controller.FullName);
+ messageParams.Add(controllerPayload.Binding.Method.Name);
+ }
+
+ if (IncludeMessageInfo())
+ {
+ message.Append(" on exchange {exchange}, queue {queue}, routingKey {routingKey}, replyTo {replyTo}, correlationId {correlationId} with body {body}");
+ messageParams.Add(messageContext.Exchange);
+ messageParams.Add(messageContext.Queue);
+ messageParams.Add(messageContext.RoutingKey);
+ messageParams.Add(messageContext.Properties.ReplyTo);
+ messageParams.Add(messageContext.Properties.CorrelationId);
+ messageParams.Add(messageContext.RawBody != null ? Encoding.UTF8.GetString(messageContext.RawBody) : null);
}
- contextLogger.Error(exception, "Tapeti: exception in message handler");
+ contextLogger.Error(exception, message.ToString(), messageParams.ToArray());
}
///
@@ -134,5 +168,7 @@ namespace Tapeti.Serilog
else
seriLogger.Information("Tapeti: obsolete queue {queue} has been unbound but not yet deleted, {messageCount} messages remaining", queueName, messageCount);
}
+
+ internal virtual bool IncludeMessageInfo() => false;
}
}
diff --git a/Tapeti/Config/ControllerMessageContextPayload.cs b/Tapeti/Config/ControllerMessageContextPayload.cs
new file mode 100644
index 0000000..700d8a3
--- /dev/null
+++ b/Tapeti/Config/ControllerMessageContextPayload.cs
@@ -0,0 +1,32 @@
+namespace Tapeti.Config
+{
+ ///
+ ///
+ /// Extends the message context with information about the controller.
+ ///
+ public class ControllerMessageContextPayload : IMessageContextPayload
+ {
+ ///
+ /// An instance of the controller referenced by the binding. Note: can be null during Cleanup.
+ ///
+ public object Controller { get; }
+
+
+ ///
+ /// Provides access to the binding which is currently processing the message.
+ ///
+ public IControllerMethodBinding Binding { get; }
+
+
+ ///
+ /// Constructs the payload to enrich the message context with information about the controller.
+ ///
+ /// An instance of the controller referenced by the binding
+ /// The binding which is currently processing the message
+ public ControllerMessageContextPayload(object controller, IControllerMethodBinding binding)
+ {
+ Controller = controller;
+ Binding = binding;
+ }
+ }
+}
diff --git a/Tapeti/Config/IControllerBindingContext.cs b/Tapeti/Config/IControllerBindingContext.cs
index 1c21cd7..37fb4d4 100644
--- a/Tapeti/Config/IControllerBindingContext.cs
+++ b/Tapeti/Config/IControllerBindingContext.cs
@@ -11,7 +11,7 @@ namespace Tapeti.Config
/// Injects a value for a controller method parameter.
///
///
- public delegate object ValueFactory(IControllerMessageContext context);
+ public delegate object ValueFactory(IMessageContext context);
///
@@ -19,7 +19,7 @@ namespace Tapeti.Config
///
///
///
- public delegate Task ResultHandler(IControllerMessageContext context, object value);
+ public delegate Task ResultHandler(IMessageContext context, object value);
///
diff --git a/Tapeti/Config/IControllerCleanupMiddleware.cs b/Tapeti/Config/IControllerCleanupMiddleware.cs
index 2f16269..86ef003 100644
--- a/Tapeti/Config/IControllerCleanupMiddleware.cs
+++ b/Tapeti/Config/IControllerCleanupMiddleware.cs
@@ -14,6 +14,6 @@ namespace Tapeti.Config
///
///
/// Always call to allow the next in the chain to clean up
- Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func next);
+ Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func next);
}
}
diff --git a/Tapeti/Config/IControllerFilterMiddleware.cs b/Tapeti/Config/IControllerFilterMiddleware.cs
index ec8391a..6a30e20 100644
--- a/Tapeti/Config/IControllerFilterMiddleware.cs
+++ b/Tapeti/Config/IControllerFilterMiddleware.cs
@@ -15,6 +15,6 @@ namespace Tapeti.Config
///
///
///
- Task Filter(IControllerMessageContext context, Func next);
+ Task Filter(IMessageContext context, Func next);
}
}
diff --git a/Tapeti/Config/IControllerMessageContext.cs b/Tapeti/Config/IControllerMessageContext.cs
deleted file mode 100644
index 16b650d..0000000
--- a/Tapeti/Config/IControllerMessageContext.cs
+++ /dev/null
@@ -1,20 +0,0 @@
-namespace Tapeti.Config
-{
- ///
- ///
- /// Extends the message context with information about the controller.
- ///
- public interface IControllerMessageContext : IMessageContext
- {
- ///
- /// An instance of the controller referenced by the binding. Note: is null during Cleanup.
- ///
- object Controller { get; }
-
-
- ///
- /// Provides access to the binding which is currently processing the message.
- ///
- new IControllerMethodBinding Binding { get; }
- }
-}
diff --git a/Tapeti/Config/IControllerMessageMiddleware.cs b/Tapeti/Config/IControllerMessageMiddleware.cs
index 65e777f..c381270 100644
--- a/Tapeti/Config/IControllerMessageMiddleware.cs
+++ b/Tapeti/Config/IControllerMessageMiddleware.cs
@@ -14,6 +14,6 @@ namespace Tapeti.Config
///
///
/// Call to pass the message to the next handler in the chain or call the controller method
- Task Handle(IControllerMessageContext context, Func next);
+ Task Handle(IMessageContext context, Func next);
}
}
diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs
index 7c23127..e3a0e05 100644
--- a/Tapeti/Config/IMessageContext.cs
+++ b/Tapeti/Config/IMessageContext.cs
@@ -1,5 +1,7 @@
using System;
+// ReSharper disable UnusedMemberInSuper.Global - public API
+
namespace Tapeti.Config
{
///
@@ -27,6 +29,11 @@ namespace Tapeti.Config
///
string RoutingKey { get; }
+ ///
+ /// Contains the raw body of the message.
+ ///
+ byte[] RawBody { get; }
+
///
/// Contains the decoded message instance.
///
@@ -42,6 +49,36 @@ namespace Tapeti.Config
///
IBinding Binding { get; }
+ ///
+ /// Stores additional properties in the message context which can be passed between middleware stages.
+ ///
+ ///
+ /// Only one instance of type T is stored, if Enrich was called before for this type an InvalidOperationException will be thrown.
+ ///
+ /// A class implementing IMessageContextPayload
+ void Store(T payload) where T : IMessageContextPayload;
+
+ ///
+ /// Stored a new payload, or updates an existing one.
+ ///
+ /// A method returning the new payload to be stored
+ /// A method called when the payload exists
+ /// The payload type as passed to Enrich
+ void StoreOrUpdate(Func onAdd, Action onUpdate) where T : IMessageContextPayload;
+
+ ///
+ /// Returns the properties as previously stored with Enrich. Throws a KeyNotFoundException
+ /// if the payload is not stored in this message context.
+ ///
+ /// The payload type as passed to Enrich
+ T Get() where T : IMessageContextPayload;
+
+
+ ///
+ /// Returns true and the payload value if this message context was previously enriched with the payload T.
+ ///
+ /// The payload type as passed to Enrich
+ bool TryGet(out T payload) where T : IMessageContextPayload;
///
/// Stores a key-value pair in the context for passing information between the various
@@ -49,6 +86,7 @@ namespace Tapeti.Config
///
/// A unique key. It is recommended to prefix it with the package name which hosts the middleware to prevent conflicts
/// Will be disposed if the value implements IDisposable or IAsyncDisposable
+ [Obsolete("For backwards compatibility only. Use Store payload for typed properties instead")]
void Store(string key, object value);
///
@@ -57,6 +95,18 @@ namespace Tapeti.Config
///
///
/// True if the value was found, False otherwise
+ [Obsolete("For backwards compatibility only. Use Get payload overload for typed properties instead")]
bool Get(string key, out T value) where T : class;
}
+
+
+ ///
+ /// Base interface for additional properties added to the message context.
+ ///
+ ///
+ /// Descendants implementing IDisposable or IAsyncDisposable will be disposed along with the message context.
+ ///
+ public interface IMessageContextPayload
+ {
+ }
}
diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs
index 014fb20..58fddeb 100644
--- a/Tapeti/Connection/TapetiConsumer.cs
+++ b/Tapeti/Connection/TapetiConsumer.cs
@@ -57,6 +57,7 @@ namespace Tapeti.Connection
return await DispatchMessage(message, new MessageContextData
{
+ RawBody = body,
Exchange = exchange,
RoutingKey = routingKey,
Properties = properties
@@ -70,6 +71,7 @@ namespace Tapeti.Connection
Queue = queueName,
Exchange = exchange,
RoutingKey = routingKey,
+ RawBody = body,
Message = message,
Properties = properties,
Binding = null
@@ -112,6 +114,7 @@ namespace Tapeti.Connection
Queue = queueName,
Exchange = messageContextData.Exchange,
RoutingKey = messageContextData.RoutingKey,
+ RawBody = messageContextData.RawBody,
Message = message,
Properties = messageContextData.Properties,
Binding = binding
@@ -174,6 +177,7 @@ namespace Tapeti.Connection
private struct MessageContextData
{
+ public byte[] RawBody;
public string Exchange;
public string RoutingKey;
public IMessageProperties Properties;
diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs
index 41a8c08..d2787f3 100644
--- a/Tapeti/Default/ConsoleLogger.cs
+++ b/Tapeti/Default/ConsoleLogger.cs
@@ -11,6 +11,19 @@ namespace Tapeti.Default
///
public class ConsoleLogger : IBindingLogger
{
+ ///
+ /// Default ILogger implementation for console applications. This version
+ /// includes the message body if available when an error occurs.
+ ///
+ public class WithMessageLogging : ConsoleLogger
+ {
+ ///
+ public WithMessageLogging() : base() { }
+
+ internal override bool IncludeMessageBody() => true;
+ }
+
+
///
public void Connect(IConnectContext connectContext)
{
@@ -39,17 +52,23 @@ namespace Tapeti.Default
public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
Console.WriteLine("[Tapeti] Exception while handling message");
- Console.WriteLine($" Result : {consumeResult}");
- Console.WriteLine($" Exchange : {messageContext.Exchange}");
- Console.WriteLine($" Queue : {messageContext.Queue}");
- Console.WriteLine($" RoutingKey : {messageContext.RoutingKey}");
+ Console.WriteLine($" Result : {consumeResult}");
+ Console.WriteLine($" Exchange : {messageContext.Exchange}");
+ Console.WriteLine($" Queue : {messageContext.Queue}");
+ Console.WriteLine($" RoutingKey : {messageContext.RoutingKey}");
+ Console.WriteLine($" ReplyTo : {messageContext.Properties.ReplyTo}");
+ Console.WriteLine($" CorrelationId : {messageContext.Properties.CorrelationId}");
- if (messageContext is IControllerMessageContext controllerMessageContext)
+ if (messageContext.TryGet(out var controllerPayload))
{
- Console.WriteLine($" Controller : {controllerMessageContext.Binding.Controller.FullName}");
- Console.WriteLine($" Method : {controllerMessageContext.Binding.Method.Name}");
+ Console.WriteLine($" Controller : {controllerPayload.Binding.Controller.FullName}");
+ Console.WriteLine($" Method : {controllerPayload.Binding.Method.Name}");
}
+ if (IncludeMessageBody())
+ Console.WriteLine($" Body : {(messageContext.RawBody != null ? Encoding.UTF8.GetString(messageContext.RawBody) : "")}");
+
+
Console.WriteLine();
Console.WriteLine(exception);
}
@@ -102,5 +121,7 @@ namespace Tapeti.Default
? $"[Tapeti] Obsolete queue was deleted: {queueName}"
: $"[Tapeti] Obsolete queue bindings removed: {queueName}, {messageCount} messages remaining");
}
+
+ internal virtual bool IncludeMessageBody() => false;
}
}
diff --git a/Tapeti/Default/ControllerMessageContext.cs b/Tapeti/Default/ControllerMessageContext.cs
deleted file mode 100644
index 3d8fb55..0000000
--- a/Tapeti/Default/ControllerMessageContext.cs
+++ /dev/null
@@ -1,71 +0,0 @@
-using System.Threading.Tasks;
-using Tapeti.Config;
-
-namespace Tapeti.Default
-{
- internal class ControllerMessageContext : IControllerMessageContext
- {
- private readonly IMessageContext decoratedContext;
-
- ///
- public object Controller { get; set; }
-
- ///
- public ITapetiConfig Config => decoratedContext.Config;
-
- ///
- public string Queue => decoratedContext.Queue;
-
- ///
- public string Exchange => decoratedContext.Exchange;
-
- ///
- public string RoutingKey => decoratedContext.RoutingKey;
-
- ///
- public object Message => decoratedContext.Message;
-
- ///
- public IMessageProperties Properties => decoratedContext.Properties;
-
-
- IBinding IMessageContext.Binding => decoratedContext.Binding;
- IControllerMethodBinding IControllerMessageContext.Binding => decoratedContext.Binding as IControllerMethodBinding;
-
-
- public ControllerMessageContext(IMessageContext decoratedContext)
- {
- this.decoratedContext = decoratedContext;
- }
-
-
- ///
- public void Dispose()
- {
- // Do not call decoratedContext.Dispose - by design
- }
-
-
- ///
- public ValueTask DisposeAsync()
- {
- // Do not call decoratedContext.DisposeAsync - by design
- return default;
- }
-
-
-
- ///
- public void Store(string key, object value)
- {
- decoratedContext.Store(key, value);
- }
-
-
- ///
- public bool Get(string key, out T value) where T : class
- {
- return decoratedContext.Get(key, out value);
- }
- }
-}
diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs
index bedb7cd..03b44da 100644
--- a/Tapeti/Default/ControllerMethodBinding.cs
+++ b/Tapeti/Default/ControllerMethodBinding.cs
@@ -160,39 +160,30 @@ namespace Tapeti.Default
public async Task Invoke(IMessageContext context)
{
var controller = dependencyResolver.Resolve(bindingInfo.ControllerType);
-
- await using var controllerContext = new ControllerMessageContext(context)
- {
- Controller = controller
- };
+ context.Store(new ControllerMessageContextPayload(controller, context.Binding as IControllerMethodBinding));
- if (!await FilterAllowed(controllerContext))
+ if (!await FilterAllowed(context))
return;
await MiddlewareHelper.GoAsync(
bindingInfo.MessageMiddleware,
- async (handler, next) => await handler.Handle(controllerContext, next),
- async () => await messageHandler(controllerContext));
+ async (handler, next) => await handler.Handle(context, next),
+ async () => await messageHandler(context));
}
///
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
{
- await using var controllerContext = new ControllerMessageContext(context)
- {
- Controller = null
- };
-
await MiddlewareHelper.GoAsync(
bindingInfo.CleanupMiddleware,
- async (handler, next) => await handler.Cleanup(controllerContext, consumeResult, next),
+ async (handler, next) => await handler.Cleanup(context, consumeResult, next),
() => Task.CompletedTask);
}
- private async Task FilterAllowed(IControllerMessageContext context)
+ private async Task FilterAllowed(IMessageContext context)
{
var allowed = false;
await MiddlewareHelper.GoAsync(
@@ -208,7 +199,7 @@ namespace Tapeti.Default
}
- private delegate Task MessageHandlerFunc(IControllerMessageContext context);
+ private delegate Task MessageHandlerFunc(IMessageContext context);
private MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable parameterFactories, ResultHandler resultHandler)
@@ -233,9 +224,10 @@ namespace Tapeti.Default
{
return context =>
{
+ var controllerPayload = context.Get();
try
{
- var result = method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
+ var result = method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray());
return resultHandler(context, result);
}
catch (Exception e)
@@ -250,9 +242,10 @@ namespace Tapeti.Default
{
return context =>
{
+ var controllerPayload = context.Get();
try
{
- method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
+ method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray());
return Task.CompletedTask;
}
catch (Exception e)
@@ -268,9 +261,10 @@ namespace Tapeti.Default
{
return context =>
{
+ var controllerPayload = context.Get();
try
{
- return (Task) method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
+ return (Task) method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray());
}
catch (Exception e)
{
@@ -285,9 +279,10 @@ namespace Tapeti.Default
{
return context =>
{
+ var controllerPayload = context.Get();
try
{
- return (Task)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
+ return (Task)method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray());
}
catch (Exception e)
{
@@ -302,9 +297,10 @@ namespace Tapeti.Default
{
return context =>
{
+ var controllerPayload = context.Get();
try
{
- return Task.FromResult(method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()));
+ return Task.FromResult(method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()));
}
catch (Exception e)
{
diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs
index 1729dc3..6a001af 100644
--- a/Tapeti/Default/MessageContext.cs
+++ b/Tapeti/Default/MessageContext.cs
@@ -7,7 +7,7 @@ namespace Tapeti.Default
{
internal class MessageContext : IMessageContext
{
- private readonly Dictionary items = new();
+ private readonly Dictionary payloads = new();
///
@@ -22,6 +22,9 @@ namespace Tapeti.Default
///
public string RoutingKey { get; set; }
+ ///
+ public byte[] RawBody { get; set; }
+
///
public object Message { get; set; }
@@ -31,21 +34,52 @@ namespace Tapeti.Default
///
public IBinding Binding { get; set; }
+
+ public void Store(T payload) where T : IMessageContextPayload
+ {
+ payloads.Add(typeof(T), payload);
+ }
+
+ public void StoreOrUpdate(Func onAdd, Action onUpdate) where T : IMessageContextPayload
+ {
+ if (payloads.TryGetValue(typeof(T), out var payload))
+ onUpdate((T)payload);
+ else
+ payloads.Add(typeof(T), onAdd());
+ }
+
+ public T Get() where T : IMessageContextPayload
+ {
+ return (T)payloads[typeof(T)];
+ }
+
+ public bool TryGet(out T payload) where T : IMessageContextPayload
+ {
+ if (payloads.TryGetValue(typeof(T), out var payloadValue))
+ {
+ payload = (T)payloadValue;
+ return true;
+ }
+
+ payload = default;
+ return false;
+ }
+
///
public void Dispose()
{
- foreach (var item in items.Values)
- (item as IDisposable)?.Dispose();
+ foreach (var payload in payloads.Values)
+ (payload as IDisposable)?.Dispose();
}
///
public async ValueTask DisposeAsync()
{
- foreach (var item in items.Values)
+ foreach (var payload in payloads.Values)
{
- if (item is IAsyncDisposable asyncDisposable)
+ if (payload is IAsyncDisposable asyncDisposable)
await asyncDisposable.DisposeAsync();
}
}
@@ -55,21 +89,66 @@ namespace Tapeti.Default
///
public void Store(string key, object value)
{
- items.Add(key, value);
+ StoreOrUpdate(
+ () => new KeyValuePayload(key, value),
+ payload => payload.Add(key, value));
}
///
public bool Get(string key, out T value) where T : class
{
- if (!items.TryGetValue(key, out var objectValue))
+ if (!TryGet(out var payload) ||
+ !payload.TryGetValue(key, out var objectValue))
{
- value = default(T);
+ value = null;
return false;
}
value = (T)objectValue;
return true;
}
+
+
+ // ReSharper disable once InconsistentNaming
+ public class KeyValuePayload : IMessageContextPayload, IDisposable, IAsyncDisposable
+ {
+ private readonly Dictionary items = new();
+
+
+ public KeyValuePayload(string key, object value)
+ {
+ Add(key, value);
+ }
+
+
+ public void Add(string key, object value)
+ {
+ items.Add(key, value);
+ }
+
+
+ public bool TryGetValue(string key, out object value)
+ {
+ return items.TryGetValue(key, out value);
+ }
+
+
+ public void Dispose()
+ {
+ foreach (var item in items.Values)
+ (item as IDisposable)?.Dispose();
+ }
+
+
+ public async ValueTask DisposeAsync()
+ {
+ foreach (var item in items.Values)
+ {
+ if (item is IAsyncDisposable asyncDisposable)
+ await asyncDisposable.DisposeAsync();
+ }
+ }
+ }
}
}