1
0
mirror of synced 2025-01-22 16:13:07 +01:00

Implemented #31: Include message details in exception logging (optionally)

Refactored IControllerMessageContext into context payloads to get access to it in the exception handler
This commit is contained in:
Mark van Renswoude 2021-09-02 16:16:11 +02:00
parent 5a90c1e0a5
commit be576a2409
20 changed files with 356 additions and 197 deletions

View File

@ -1,20 +0,0 @@
namespace Tapeti.Flow
{
/// <summary>
/// Key names as used in the message context store. For internal use.
/// </summary>
public static class ContextItems
{
/// <summary>
/// Key given to the FlowContext object as stored in the message context.
/// </summary>
public const string FlowContext = "Tapeti.Flow.FlowContext";
/// <summary>
/// Indicates if the current message handler is the last one to be called before a
/// parallel flow is done and the convergeMethod will be called.
/// Temporarily disables storing the flow state.
/// </summary>
public const string FlowIsConverging = "Tapeti.Flow.IsConverging";
}
}

View File

@ -74,16 +74,16 @@ namespace Tapeti.Flow.Default
}
private static Task HandleYieldPoint(IControllerMessageContext context, IYieldPoint yieldPoint)
private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint)
{
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);
}
private static Task HandleParallelResponse(IControllerMessageContext context)
private static Task HandleParallelResponse(IMessageContext context)
{
if (context.Get<object>(ContextItems.FlowIsConverging, out _))
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload) && flowPayload.FlowIsConverging)
return Task.CompletedTask;
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();

View File

@ -12,24 +12,31 @@ namespace Tapeti.Flow.Default
/// </summary>
internal class FlowContinuationMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware
{
public async Task Filter(IControllerMessageContext context, Func<Task> next)
public async Task Filter(IMessageContext context, Func<Task> next)
{
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return;
var flowContext = await EnrichWithFlowContext(context);
if (flowContext?.ContinuationMetadata == null)
return;
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method))
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
return;
await next();
}
public async Task Handle(IControllerMessageContext context, Func<Task> next)
public async Task Handle(IMessageContext context, Func<Task> next)
{
if (context.Get(ContextItems.FlowContext, out FlowContext flowContext))
{
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, context.Controller);
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return;
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload))
{
var flowContext = flowPayload.FlowContext;
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, controllerPayload.Controller);
// Remove Continuation now because the IYieldPoint result handler will store the new state
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
@ -38,28 +45,33 @@ namespace Tapeti.Flow.Default
if (converge)
// Indicate to the FlowBindingMiddleware that the state must not to be stored
context.Store(ContextItems.FlowIsConverging, null);
flowPayload.FlowIsConverging = true;
await next();
if (converge)
await CallConvergeMethod(context,
flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync);
await CallConvergeMethod(context, controllerPayload,
flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync);
}
else
await next();
}
public async Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func<Task> next)
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next)
{
await next();
if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext))
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return;
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method))
if (!context.TryGet<FlowMessageContextPayload>(out var flowPayload))
return;
var flowContext = flowPayload.FlowContext;
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
// Do not call when the controller method was filtered, if the same message has two methods
return;
@ -76,10 +88,10 @@ namespace Tapeti.Flow.Default
private static async Task<FlowContext> EnrichWithFlowContext(IControllerMessageContext context)
private static async Task<FlowContext> EnrichWithFlowContext(IMessageContext context)
{
if (context.Get(ContextItems.FlowContext, out FlowContext flowContext))
return flowContext;
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload))
return flowPayload.FlowContext;
if (context.Properties.CorrelationId == null)
@ -100,7 +112,7 @@ namespace Tapeti.Flow.Default
if (flowState == null)
return null;
flowContext = new FlowContext
var flowContext = new FlowContext
{
HandlerContext = new FlowHandlerContext(context),
@ -112,26 +124,28 @@ namespace Tapeti.Flow.Default
};
// IDisposable items in the IMessageContext are automatically disposed
context.Store(ContextItems.FlowContext, flowContext);
context.Store(new FlowMessageContextPayload(flowContext));
return flowContext;
}
private static async Task CallConvergeMethod(IControllerMessageContext context, string methodName, bool sync)
private static async Task CallConvergeMethod(IMessageContext context, ControllerMessageContextPayload controllerPayload, string methodName, bool sync)
{
IYieldPoint yieldPoint;
var method = context.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
var method = controllerPayload.Controller.GetType().GetMethod(methodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
throw new ArgumentException($"Unknown converge method in controller {context.Controller.GetType().Name}: {methodName}");
throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {methodName}");
if (sync)
yieldPoint = (IYieldPoint)method.Invoke(context.Controller, new object[] {});
yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] {});
else
yieldPoint = await (Task<IYieldPoint>)method.Invoke(context.Controller, new object[] { });
yieldPoint = await (Task<IYieldPoint>)method.Invoke(controllerPayload.Controller, new object[] { });
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for converge method {methodName}");
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {methodName}");
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);

View File

@ -18,15 +18,18 @@ namespace Tapeti.Flow.Default
/// <summary>
/// </summary>
public FlowHandlerContext(IControllerMessageContext source)
public FlowHandlerContext(IMessageContext source)
{
if (source == null)
return;
if (!source.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return;
Config = source.Config;
Controller = source.Controller;
Method = source.Binding.Method;
ControllerMessageContext = source;
Controller = controllerPayload.Controller;
Method = controllerPayload.Binding.Method;
MessageContext = source;
}
@ -45,6 +48,6 @@ namespace Tapeti.Flow.Default
public MethodInfo Method { get; set; }
/// <inheritdoc />
public IControllerMessageContext ControllerMessageContext { get; set; }
public IMessageContext MessageContext { get; set; }
}
}

View File

@ -162,16 +162,16 @@ namespace Tapeti.Flow.Default
private static ReplyMetadata GetReply(IFlowHandlerContext context)
{
var requestAttribute = context.ControllerMessageContext?.Message?.GetType().GetCustomAttribute<RequestAttribute>();
var requestAttribute = context.MessageContext?.Message?.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response == null)
return null;
return new ReplyMetadata
{
CorrelationId = context.ControllerMessageContext.Properties.CorrelationId,
ReplyTo = context.ControllerMessageContext.Properties.ReplyTo,
CorrelationId = context.MessageContext.Properties.CorrelationId,
ReplyTo = context.MessageContext.Properties.ReplyTo,
ResponseTypeName = requestAttribute.Response.FullName,
Mandatory = context.ControllerMessageContext.Properties.Persistent.GetValueOrDefault(true)
Mandatory = context.MessageContext.Properties.Persistent.GetValueOrDefault(true)
};
}
@ -206,8 +206,8 @@ namespace Tapeti.Flow.Default
try
{
var messageContext = context.ControllerMessageContext;
if (messageContext == null || !messageContext.Get(ContextItems.FlowContext, out flowContext))
var messageContext = context.MessageContext;
if (messageContext == null || !messageContext.TryGet<FlowMessageContextPayload>(out var flowPayload))
{
flowContext = new FlowContext
{
@ -218,7 +218,9 @@ namespace Tapeti.Flow.Default
// in the messageContext as the yield point is the last to execute.
disposeFlowContext = true;
}
else
flowContext = flowPayload.FlowContext;
try
{
await executableYieldPoint.Execute(flowContext);

View File

@ -0,0 +1,33 @@
using System;
using Tapeti.Config;
using Tapeti.Flow.Default;
namespace Tapeti.Flow
{
/// <summary>
/// Contains information about the flow for the current message. For internal use.
/// </summary>
internal class FlowMessageContextPayload : IMessageContextPayload, IDisposable
{
public FlowContext FlowContext { get; }
/// <summary>
/// Indicates if the current message handler is the last one to be called before a
/// parallel flow is done and the convergeMethod will be called.
/// Temporarily disables storing the flow state.
/// </summary>
public bool FlowIsConverging { get; set; }
public FlowMessageContextPayload(FlowContext flowContext)
{
FlowContext = flowContext;
}
public void Dispose()
{
FlowContext?.Dispose();
}
}
}

View File

@ -29,9 +29,9 @@ namespace Tapeti.Flow
/// <summary>
/// Access to the controller message context if this is a continuated flow.
/// Access to the message context if this is a continuated flow.
/// Will be null when in a starting flow.
/// </summary>
IControllerMessageContext ControllerMessageContext { get; }
IMessageContext MessageContext { get; }
}
}

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Text;
using Tapeti.Config;
using ISerilogLogger = Serilog.ILogger;
@ -12,6 +13,21 @@ namespace Tapeti.Serilog
/// </summary>
public class TapetiSeriLogger: IBindingLogger
{
/// <summary>
/// Implements the Tapeti ILogger interface for Serilog output. This version
/// includes the message body and information if available when an error occurs.
/// </summary>
public class WithMessageLogging : TapetiSeriLogger
{
/// <inheritdoc />
public WithMessageLogging(ISerilogLogger seriLogger) : base(seriLogger) { }
internal override bool IncludeMessageInfo() => true;
}
private readonly ISerilogLogger seriLogger;
@ -69,20 +85,38 @@ namespace Tapeti.Serilog
/// <inheritdoc />
public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
var message = new StringBuilder("Tapeti: exception in message handler");
var messageParams = new List<object>();
var contextLogger = seriLogger
.ForContext("consumeResult", consumeResult)
.ForContext("exchange", messageContext.Exchange)
.ForContext("queue", messageContext.Queue)
.ForContext("routingKey", messageContext.RoutingKey);
if (messageContext is IControllerMessageContext controllerMessageContext)
if (messageContext.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
{
contextLogger = contextLogger
.ForContext("controller", controllerMessageContext.Binding.Controller.FullName)
.ForContext("method", controllerMessageContext.Binding.Method.Name);
.ForContext("controller", controllerPayload.Binding.Controller.FullName)
.ForContext("method", controllerPayload.Binding.Method.Name);
message.Append(" {controller}.{method}");
messageParams.Add(controllerPayload.Binding.Controller.FullName);
messageParams.Add(controllerPayload.Binding.Method.Name);
}
if (IncludeMessageInfo())
{
message.Append(" on exchange {exchange}, queue {queue}, routingKey {routingKey}, replyTo {replyTo}, correlationId {correlationId} with body {body}");
messageParams.Add(messageContext.Exchange);
messageParams.Add(messageContext.Queue);
messageParams.Add(messageContext.RoutingKey);
messageParams.Add(messageContext.Properties.ReplyTo);
messageParams.Add(messageContext.Properties.CorrelationId);
messageParams.Add(messageContext.RawBody != null ? Encoding.UTF8.GetString(messageContext.RawBody) : null);
}
contextLogger.Error(exception, "Tapeti: exception in message handler");
contextLogger.Error(exception, message.ToString(), messageParams.ToArray());
}
/// <inheritdoc />
@ -134,5 +168,7 @@ namespace Tapeti.Serilog
else
seriLogger.Information("Tapeti: obsolete queue {queue} has been unbound but not yet deleted, {messageCount} messages remaining", queueName, messageCount);
}
internal virtual bool IncludeMessageInfo() => false;
}
}

View File

@ -0,0 +1,32 @@
namespace Tapeti.Config
{
/// <inheritdoc />
/// <summary>
/// Extends the message context with information about the controller.
/// </summary>
public class ControllerMessageContextPayload : IMessageContextPayload
{
/// <summary>
/// An instance of the controller referenced by the binding. Note: can be null during Cleanup.
/// </summary>
public object Controller { get; }
/// <remarks>
/// Provides access to the binding which is currently processing the message.
/// </remarks>
public IControllerMethodBinding Binding { get; }
/// <summary>
/// Constructs the payload to enrich the message context with information about the controller.
/// </summary>
/// <param name="controller">An instance of the controller referenced by the binding</param>
/// <param name="binding">The binding which is currently processing the message</param>
public ControllerMessageContextPayload(object controller, IControllerMethodBinding binding)
{
Controller = controller;
Binding = binding;
}
}
}

View File

@ -11,7 +11,7 @@ namespace Tapeti.Config
/// Injects a value for a controller method parameter.
/// </summary>
/// <param name="context"></param>
public delegate object ValueFactory(IControllerMessageContext context);
public delegate object ValueFactory(IMessageContext context);
/// <summary>
@ -19,7 +19,7 @@ namespace Tapeti.Config
/// </summary>
/// <param name="context"></param>
/// <param name="value"></param>
public delegate Task ResultHandler(IControllerMessageContext context, object value);
public delegate Task ResultHandler(IMessageContext context, object value);
/// <summary>

View File

@ -14,6 +14,6 @@ namespace Tapeti.Config
/// <param name="context"></param>
/// <param name="consumeResult"></param>
/// <param name="next">Always call to allow the next in the chain to clean up</param>
Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func<Task> next);
Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next);
}
}

View File

@ -15,6 +15,6 @@ namespace Tapeti.Config
/// <param name="context"></param>
/// <param name="next"></param>
/// <returns></returns>
Task Filter(IControllerMessageContext context, Func<Task> next);
Task Filter(IMessageContext context, Func<Task> next);
}
}

View File

@ -1,20 +0,0 @@
namespace Tapeti.Config
{
/// <inheritdoc />
/// <summary>
/// Extends the message context with information about the controller.
/// </summary>
public interface IControllerMessageContext : IMessageContext
{
/// <summary>
/// An instance of the controller referenced by the binding. Note: is null during Cleanup.
/// </summary>
object Controller { get; }
/// <remarks>
/// Provides access to the binding which is currently processing the message.
/// </remarks>
new IControllerMethodBinding Binding { get; }
}
}

View File

@ -14,6 +14,6 @@ namespace Tapeti.Config
/// </summary>
/// <param name="context"></param>
/// <param name="next">Call to pass the message to the next handler in the chain or call the controller method</param>
Task Handle(IControllerMessageContext context, Func<Task> next);
Task Handle(IMessageContext context, Func<Task> next);
}
}

View File

@ -1,5 +1,7 @@
using System;
// ReSharper disable UnusedMemberInSuper.Global - public API
namespace Tapeti.Config
{
/// <summary>
@ -27,6 +29,11 @@ namespace Tapeti.Config
/// </summary>
string RoutingKey { get; }
/// <summary>
/// Contains the raw body of the message.
/// </summary>
byte[] RawBody { get; }
/// <summary>
/// Contains the decoded message instance.
/// </summary>
@ -42,6 +49,36 @@ namespace Tapeti.Config
/// </remarks>
IBinding Binding { get; }
/// <summary>
/// Stores additional properties in the message context which can be passed between middleware stages.
/// </summary>
/// <remarks>
/// Only one instance of type T is stored, if Enrich was called before for this type an InvalidOperationException will be thrown.
/// </remarks>
/// <param name="payload">A class implementing IMessageContextPayload</param>
void Store<T>(T payload) where T : IMessageContextPayload;
/// <summary>
/// Stored a new payload, or updates an existing one.
/// </summary>
/// <param name="onAdd">A method returning the new payload to be stored</param>
/// <param name="onUpdate">A method called when the payload exists</param>
/// <typeparam name="T">The payload type as passed to Enrich</typeparam>
void StoreOrUpdate<T>(Func<T> onAdd, Action<T> onUpdate) where T : IMessageContextPayload;
/// <summary>
/// Returns the properties as previously stored with Enrich. Throws a KeyNotFoundException
/// if the payload is not stored in this message context.
/// </summary>
/// <typeparam name="T">The payload type as passed to Enrich</typeparam>
T Get<T>() where T : IMessageContextPayload;
/// <summary>
/// Returns true and the payload value if this message context was previously enriched with the payload T.
/// </summary>
/// <typeparam name="T">The payload type as passed to Enrich</typeparam>
bool TryGet<T>(out T payload) where T : IMessageContextPayload;
/// <summary>
/// Stores a key-value pair in the context for passing information between the various
@ -49,6 +86,7 @@ namespace Tapeti.Config
/// </summary>
/// <param name="key">A unique key. It is recommended to prefix it with the package name which hosts the middleware to prevent conflicts</param>
/// <param name="value">Will be disposed if the value implements IDisposable or IAsyncDisposable</param>
[Obsolete("For backwards compatibility only. Use Store<T> payload for typed properties instead")]
void Store(string key, object value);
/// <summary>
@ -57,6 +95,18 @@ namespace Tapeti.Config
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns>True if the value was found, False otherwise</returns>
[Obsolete("For backwards compatibility only. Use Get<T> payload overload for typed properties instead")]
bool Get<T>(string key, out T value) where T : class;
}
/// <summary>
/// Base interface for additional properties added to the message context.
/// </summary>
/// <remarks>
/// Descendants implementing IDisposable or IAsyncDisposable will be disposed along with the message context.
/// </remarks>
public interface IMessageContextPayload
{
}
}

View File

@ -57,6 +57,7 @@ namespace Tapeti.Connection
return await DispatchMessage(message, new MessageContextData
{
RawBody = body,
Exchange = exchange,
RoutingKey = routingKey,
Properties = properties
@ -70,6 +71,7 @@ namespace Tapeti.Connection
Queue = queueName,
Exchange = exchange,
RoutingKey = routingKey,
RawBody = body,
Message = message,
Properties = properties,
Binding = null
@ -112,6 +114,7 @@ namespace Tapeti.Connection
Queue = queueName,
Exchange = messageContextData.Exchange,
RoutingKey = messageContextData.RoutingKey,
RawBody = messageContextData.RawBody,
Message = message,
Properties = messageContextData.Properties,
Binding = binding
@ -174,6 +177,7 @@ namespace Tapeti.Connection
private struct MessageContextData
{
public byte[] RawBody;
public string Exchange;
public string RoutingKey;
public IMessageProperties Properties;

View File

@ -11,6 +11,19 @@ namespace Tapeti.Default
/// </summary>
public class ConsoleLogger : IBindingLogger
{
/// <summary>
/// Default ILogger implementation for console applications. This version
/// includes the message body if available when an error occurs.
/// </summary>
public class WithMessageLogging : ConsoleLogger
{
/// <inheritdoc />
public WithMessageLogging() : base() { }
internal override bool IncludeMessageBody() => true;
}
/// <inheritdoc />
public void Connect(IConnectContext connectContext)
{
@ -39,17 +52,23 @@ namespace Tapeti.Default
public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
Console.WriteLine("[Tapeti] Exception while handling message");
Console.WriteLine($" Result : {consumeResult}");
Console.WriteLine($" Exchange : {messageContext.Exchange}");
Console.WriteLine($" Queue : {messageContext.Queue}");
Console.WriteLine($" RoutingKey : {messageContext.RoutingKey}");
Console.WriteLine($" Result : {consumeResult}");
Console.WriteLine($" Exchange : {messageContext.Exchange}");
Console.WriteLine($" Queue : {messageContext.Queue}");
Console.WriteLine($" RoutingKey : {messageContext.RoutingKey}");
Console.WriteLine($" ReplyTo : {messageContext.Properties.ReplyTo}");
Console.WriteLine($" CorrelationId : {messageContext.Properties.CorrelationId}");
if (messageContext is IControllerMessageContext controllerMessageContext)
if (messageContext.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
{
Console.WriteLine($" Controller : {controllerMessageContext.Binding.Controller.FullName}");
Console.WriteLine($" Method : {controllerMessageContext.Binding.Method.Name}");
Console.WriteLine($" Controller : {controllerPayload.Binding.Controller.FullName}");
Console.WriteLine($" Method : {controllerPayload.Binding.Method.Name}");
}
if (IncludeMessageBody())
Console.WriteLine($" Body : {(messageContext.RawBody != null ? Encoding.UTF8.GetString(messageContext.RawBody) : "<null>")}");
Console.WriteLine();
Console.WriteLine(exception);
}
@ -102,5 +121,7 @@ namespace Tapeti.Default
? $"[Tapeti] Obsolete queue was deleted: {queueName}"
: $"[Tapeti] Obsolete queue bindings removed: {queueName}, {messageCount} messages remaining");
}
internal virtual bool IncludeMessageBody() => false;
}
}

View File

@ -1,71 +0,0 @@
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Default
{
internal class ControllerMessageContext : IControllerMessageContext
{
private readonly IMessageContext decoratedContext;
/// <inheritdoc />
public object Controller { get; set; }
/// <inheritdoc />
public ITapetiConfig Config => decoratedContext.Config;
/// <inheritdoc />
public string Queue => decoratedContext.Queue;
/// <inheritdoc />
public string Exchange => decoratedContext.Exchange;
/// <inheritdoc />
public string RoutingKey => decoratedContext.RoutingKey;
/// <inheritdoc />
public object Message => decoratedContext.Message;
/// <inheritdoc />
public IMessageProperties Properties => decoratedContext.Properties;
IBinding IMessageContext.Binding => decoratedContext.Binding;
IControllerMethodBinding IControllerMessageContext.Binding => decoratedContext.Binding as IControllerMethodBinding;
public ControllerMessageContext(IMessageContext decoratedContext)
{
this.decoratedContext = decoratedContext;
}
/// <inheritdoc />
public void Dispose()
{
// Do not call decoratedContext.Dispose - by design
}
/// <inheritdoc />
public ValueTask DisposeAsync()
{
// Do not call decoratedContext.DisposeAsync - by design
return default;
}
/// <inheritdoc />
public void Store(string key, object value)
{
decoratedContext.Store(key, value);
}
/// <inheritdoc />
public bool Get<T>(string key, out T value) where T : class
{
return decoratedContext.Get(key, out value);
}
}
}

View File

@ -160,39 +160,30 @@ namespace Tapeti.Default
public async Task Invoke(IMessageContext context)
{
var controller = dependencyResolver.Resolve(bindingInfo.ControllerType);
await using var controllerContext = new ControllerMessageContext(context)
{
Controller = controller
};
context.Store(new ControllerMessageContextPayload(controller, context.Binding as IControllerMethodBinding));
if (!await FilterAllowed(controllerContext))
if (!await FilterAllowed(context))
return;
await MiddlewareHelper.GoAsync(
bindingInfo.MessageMiddleware,
async (handler, next) => await handler.Handle(controllerContext, next),
async () => await messageHandler(controllerContext));
async (handler, next) => await handler.Handle(context, next),
async () => await messageHandler(context));
}
/// <inheritdoc />
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
{
await using var controllerContext = new ControllerMessageContext(context)
{
Controller = null
};
await MiddlewareHelper.GoAsync(
bindingInfo.CleanupMiddleware,
async (handler, next) => await handler.Cleanup(controllerContext, consumeResult, next),
async (handler, next) => await handler.Cleanup(context, consumeResult, next),
() => Task.CompletedTask);
}
private async Task<bool> FilterAllowed(IControllerMessageContext context)
private async Task<bool> FilterAllowed(IMessageContext context)
{
var allowed = false;
await MiddlewareHelper.GoAsync(
@ -208,7 +199,7 @@ namespace Tapeti.Default
}
private delegate Task MessageHandlerFunc(IControllerMessageContext context);
private delegate Task MessageHandlerFunc(IMessageContext context);
private MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameterFactories, ResultHandler resultHandler)
@ -233,9 +224,10 @@ namespace Tapeti.Default
{
return context =>
{
var controllerPayload = context.Get<ControllerMessageContextPayload>();
try
{
var result = method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
var result = method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray());
return resultHandler(context, result);
}
catch (Exception e)
@ -250,9 +242,10 @@ namespace Tapeti.Default
{
return context =>
{
var controllerPayload = context.Get<ControllerMessageContextPayload>();
try
{
method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray());
return Task.CompletedTask;
}
catch (Exception e)
@ -268,9 +261,10 @@ namespace Tapeti.Default
{
return context =>
{
var controllerPayload = context.Get<ControllerMessageContextPayload>();
try
{
return (Task) method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
return (Task) method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray());
}
catch (Exception e)
{
@ -285,9 +279,10 @@ namespace Tapeti.Default
{
return context =>
{
var controllerPayload = context.Get<ControllerMessageContextPayload>();
try
{
return (Task<object>)method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray());
return (Task<object>)method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray());
}
catch (Exception e)
{
@ -302,9 +297,10 @@ namespace Tapeti.Default
{
return context =>
{
var controllerPayload = context.Get<ControllerMessageContextPayload>();
try
{
return Task.FromResult(method.Invoke(context.Controller, parameterFactories.Select(p => p(context)).ToArray()));
return Task.FromResult(method.Invoke(controllerPayload.Controller, parameterFactories.Select(p => p(context)).ToArray()));
}
catch (Exception e)
{

View File

@ -7,7 +7,7 @@ namespace Tapeti.Default
{
internal class MessageContext : IMessageContext
{
private readonly Dictionary<string, object> items = new();
private readonly Dictionary<Type, IMessageContextPayload> payloads = new();
/// <inheritdoc />
@ -22,6 +22,9 @@ namespace Tapeti.Default
/// <inheritdoc />
public string RoutingKey { get; set; }
/// <inheritdoc />
public byte[] RawBody { get; set; }
/// <inheritdoc />
public object Message { get; set; }
@ -31,21 +34,52 @@ namespace Tapeti.Default
/// <inheritdoc />
public IBinding Binding { get; set; }
public void Store<T>(T payload) where T : IMessageContextPayload
{
payloads.Add(typeof(T), payload);
}
public void StoreOrUpdate<T>(Func<T> onAdd, Action<T> onUpdate) where T : IMessageContextPayload
{
if (payloads.TryGetValue(typeof(T), out var payload))
onUpdate((T)payload);
else
payloads.Add(typeof(T), onAdd());
}
public T Get<T>() where T : IMessageContextPayload
{
return (T)payloads[typeof(T)];
}
public bool TryGet<T>(out T payload) where T : IMessageContextPayload
{
if (payloads.TryGetValue(typeof(T), out var payloadValue))
{
payload = (T)payloadValue;
return true;
}
payload = default;
return false;
}
/// <inheritdoc />
public void Dispose()
{
foreach (var item in items.Values)
(item as IDisposable)?.Dispose();
foreach (var payload in payloads.Values)
(payload as IDisposable)?.Dispose();
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
foreach (var item in items.Values)
foreach (var payload in payloads.Values)
{
if (item is IAsyncDisposable asyncDisposable)
if (payload is IAsyncDisposable asyncDisposable)
await asyncDisposable.DisposeAsync();
}
}
@ -55,21 +89,66 @@ namespace Tapeti.Default
/// <inheritdoc />
public void Store(string key, object value)
{
items.Add(key, value);
StoreOrUpdate(
() => new KeyValuePayload(key, value),
payload => payload.Add(key, value));
}
/// <inheritdoc />
public bool Get<T>(string key, out T value) where T : class
{
if (!items.TryGetValue(key, out var objectValue))
if (!TryGet<KeyValuePayload>(out var payload) ||
!payload.TryGetValue(key, out var objectValue))
{
value = default(T);
value = null;
return false;
}
value = (T)objectValue;
return true;
}
// ReSharper disable once InconsistentNaming
public class KeyValuePayload : IMessageContextPayload, IDisposable, IAsyncDisposable
{
private readonly Dictionary<string, object> items = new();
public KeyValuePayload(string key, object value)
{
Add(key, value);
}
public void Add(string key, object value)
{
items.Add(key, value);
}
public bool TryGetValue(string key, out object value)
{
return items.TryGetValue(key, out value);
}
public void Dispose()
{
foreach (var item in items.Values)
(item as IDisposable)?.Dispose();
}
public async ValueTask DisposeAsync()
{
foreach (var item in items.Values)
{
if (item is IAsyncDisposable asyncDisposable)
await asyncDisposable.DisposeAsync();
}
}
}
}
}