1
0
mirror of synced 2024-11-22 01:13:49 +00:00

Merge branch 'hotfix/2.2.1'

This commit is contained in:
Frederik 2020-06-11 17:32:14 +02:00
commit 021d9746e2
5 changed files with 22 additions and 16 deletions

View File

@ -52,16 +52,19 @@ namespace Tapeti.Flow.Default
} }
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next) public async Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func<Task> next)
{ {
await next(); await next();
if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext)) if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext))
return; return;
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method))
// Do not call when the controller method was filtered, if the same message has two methods
return;
if (flowContext?.FlowStateLock != null) if (flowContext?.FlowStateLock != null)
{ {
// TODO do not call when the controller method was filtered, if the same message has two methods
if (!flowContext.IsStoredOrDeleted()) if (!flowContext.IsStoredOrDeleted())
// The exception strategy can set the consume result to Success. Instead, check if the yield point // The exception strategy can set the consume result to Success. Instead, check if the yield point
// was handled. The flow provider ensures we only end up here in case of an exception. // was handled. The flow provider ensures we only end up here in case of an exception.

View File

@ -14,6 +14,6 @@ namespace Tapeti.Config
/// <param name="context"></param> /// <param name="context"></param>
/// <param name="consumeResult"></param> /// <param name="consumeResult"></param>
/// <param name="next">Always call to allow the next in the chain to clean up</param> /// <param name="next">Always call to allow the next in the chain to clean up</param>
Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next); Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func<Task> next);
} }
} }

View File

@ -7,7 +7,7 @@
public interface IControllerMessageContext : IMessageContext public interface IControllerMessageContext : IMessageContext
{ {
/// <summary> /// <summary>
/// An instance of the controller referenced by the binding. /// An instance of the controller referenced by the binding. Note: is null during Cleanup.
/// </summary> /// </summary>
object Controller { get; } object Controller { get; }

View File

@ -1,4 +1,4 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Reflection; using System.Reflection;
@ -181,12 +181,18 @@ namespace Tapeti.Default
/// <inheritdoc /> /// <inheritdoc />
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult) public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
{
using (var controllerContext = new ControllerMessageContext(context)
{
Controller = null
})
{ {
await MiddlewareHelper.GoAsync( await MiddlewareHelper.GoAsync(
bindingInfo.CleanupMiddleware, bindingInfo.CleanupMiddleware,
async (handler, next) => await handler.Cleanup(context, consumeResult, next), async (handler, next) => await handler.Cleanup(controllerContext, consumeResult, next),
() => Task.CompletedTask); () => Task.CompletedTask);
} }
}
private async Task<bool> FilterAllowed(IControllerMessageContext context) private async Task<bool> FilterAllowed(IControllerMessageContext context)

View File

@ -47,15 +47,8 @@ namespace Tapeti
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false)
.Select(m => (MethodInfo)m)) .Select(m => (MethodInfo)m))
{ {
var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo;
if (methodQueueInfo == null || !methodQueueInfo.IsValid)
throw new TopologyConfigurationException(
$"Method {method.Name} or controller {controller.Name} requires a queue attribute");
var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute<ObsoleteAttribute>() != null; var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute<ObsoleteAttribute>() != null;
var context = new ControllerBindingContext(method.GetParameters(), method.ReturnParameter) var context = new ControllerBindingContext(method.GetParameters(), method.ReturnParameter)
{ {
Controller = controller, Controller = controller,
@ -85,6 +78,10 @@ namespace Tapeti
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}"); throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}");
} }
var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo;
if (methodQueueInfo == null || !methodQueueInfo.IsValid)
throw new TopologyConfigurationException(
$"Method {method.Name} or controller {controller.Name} requires a queue attribute");
builder.RegisterBinding(new ControllerMethodBinding(builderAccess.DependencyResolver, new ControllerMethodBinding.BindingInfo builder.RegisterBinding(new ControllerMethodBinding(builderAccess.DependencyResolver, new ControllerMethodBinding.BindingInfo
{ {