diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 6f6a887..c635f88 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -148,7 +148,10 @@ namespace Tapeti.Flow.Default private static ValueTask HandleParallelResponse(IMessageContext context) { - if (context.TryGet(out var flowPayload) && flowPayload.FlowIsConverging) + if (!context.TryGet(out var flowPayload)) + return default; + + if (flowPayload.FlowIsConverging) return default; var flowHandler = context.Config.DependencyResolver.Resolve(); diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 62e3176..4d14bf9 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -72,7 +72,7 @@ namespace Tapeti.Flow.Default internal async Task PrepareRequest(FlowContext context, ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName = null, bool convergeMethodTaskSync = false) + string? convergeMethodName = null, bool convergeMethodTaskSync = false) { if (!context.HasFlowStateAndLock) { @@ -101,7 +101,7 @@ namespace Tapeti.Flow.Default internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, - string convergeMethodName = null, bool convergeMethodTaskSync = false) + string? convergeMethodName = null, bool convergeMethodTaskSync = false) { var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync); await context.Store(responseHandlerInfo.IsDurableQueue); diff --git a/Tapeti.Serilog/Default/DiagnosticContext.cs b/Tapeti.Serilog/Default/DiagnosticContext.cs index 0f7f291..3f7905a 100644 --- a/Tapeti.Serilog/Default/DiagnosticContext.cs +++ b/Tapeti.Serilog/Default/DiagnosticContext.cs @@ -1,4 +1,6 @@ using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; using Serilog.Core; using Serilog.Events; @@ -10,16 +12,20 @@ namespace Tapeti.Serilog.Default public class DiagnosticContext : IDiagnosticContext { private readonly global::Serilog.ILogger logger; + private readonly Stopwatch stopwatch; private readonly List properties = new(); + private int resetCount; /// /// Creates a new instance of a DiagnosticContext /// /// The Serilog ILogger which will be enriched - public DiagnosticContext(global::Serilog.ILogger logger) + /// The Stopwatch instance that monitors the run time of the message handler + public DiagnosticContext(global::Serilog.ILogger logger, Stopwatch stopwatch) { this.logger = logger; + this.stopwatch = stopwatch; } @@ -31,6 +37,17 @@ namespace Tapeti.Serilog.Default } + /// + public void ResetStopwatch(bool addToContext = true, string propertyNamePrefix = "stopwatchReset") + { + var newResetCount = Interlocked.Increment(ref resetCount); + if (addToContext) + Set(propertyNamePrefix + newResetCount, stopwatch.ElapsedMilliseconds); + + stopwatch.Restart(); + } + + /// /// Returns a Serilog ILogger which is enriched with the properties set by this DiagnosticContext /// diff --git a/Tapeti.Serilog/IDiagnosticContext.cs b/Tapeti.Serilog/IDiagnosticContext.cs index 544c395..cc0833f 100644 --- a/Tapeti.Serilog/IDiagnosticContext.cs +++ b/Tapeti.Serilog/IDiagnosticContext.cs @@ -5,8 +5,7 @@ /// MessageHandlerLogging middleware. /// /// - /// This is a one-to-one copy of the IDiagnosticContext in Serilog.Extensions.Hosting which - /// saves a reference to that package while allowing similar usage within Tapeti message handlers. + /// Similar to IDiagnosticContext in Serilog.Extensions.Hosting but slightly extended. /// public interface IDiagnosticContext { @@ -19,5 +18,14 @@ /// If true, the value will be serialized as structured /// data if possible; if false, the object will be recorded as a scalar or simple array. void Set(string propertyName, object value, bool destructureObjects = false); + + /// + /// Resets the timer which is used to monitor how long a message handler takes to complete. + /// Useful for example when a message handler is throttled by a rate limiter in the message + /// handler method and you want to measure only the time taken after it is allowed to start. + /// + /// If true, the time taken until this reset is added to this diagnostic context as an incrementally named property for logging purposes. The value will be the time in milliseconds. + /// The prefix for the property name(s) when addToContext is true. The number of times ResetStopwatch is called will be appended (stopwatchReset1, stopwatchReset2, etc). + void ResetStopwatch(bool addToContext = true, string propertyNamePrefix = "stopwatchReset"); } } diff --git a/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs b/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs index 1043039..04905de 100644 --- a/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs +++ b/Tapeti.Serilog/Middleware/MessageHandlerLoggingMessageMiddleware.cs @@ -32,11 +32,11 @@ namespace Tapeti.Serilog.Middleware public async ValueTask Handle(IMessageContext context, Func next) { var logger = context.Config.DependencyResolver.Resolve(); - - var diagnosticContext = new DiagnosticContext(logger); + + var stopwatch = new Stopwatch(); + var diagnosticContext = new DiagnosticContext(logger, stopwatch); context.Store(new DiagnosticContextPayload(diagnosticContext)); - var stopwatch = new Stopwatch(); stopwatch.Start(); await next();