Merge branch 'hotfix/2.2.1' into develop
This commit is contained in:
commit
432bf489c8
@ -52,16 +52,19 @@ namespace Tapeti.Flow.Default
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next)
|
public async Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func<Task> next)
|
||||||
{
|
{
|
||||||
await next();
|
await next();
|
||||||
|
|
||||||
if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext))
|
if (!context.Get(ContextItems.FlowContext, out FlowContext flowContext))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method))
|
||||||
|
// Do not call when the controller method was filtered, if the same message has two methods
|
||||||
|
return;
|
||||||
|
|
||||||
if (flowContext?.FlowStateLock != null)
|
if (flowContext?.FlowStateLock != null)
|
||||||
{
|
{
|
||||||
// TODO do not call when the controller method was filtered, if the same message has two methods
|
|
||||||
if (!flowContext.IsStoredOrDeleted())
|
if (!flowContext.IsStoredOrDeleted())
|
||||||
// The exception strategy can set the consume result to Success. Instead, check if the yield point
|
// The exception strategy can set the consume result to Success. Instead, check if the yield point
|
||||||
// was handled. The flow provider ensures we only end up here in case of an exception.
|
// was handled. The flow provider ensures we only end up here in case of an exception.
|
||||||
|
@ -14,6 +14,6 @@ namespace Tapeti.Config
|
|||||||
/// <param name="context"></param>
|
/// <param name="context"></param>
|
||||||
/// <param name="consumeResult"></param>
|
/// <param name="consumeResult"></param>
|
||||||
/// <param name="next">Always call to allow the next in the chain to clean up</param>
|
/// <param name="next">Always call to allow the next in the chain to clean up</param>
|
||||||
Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next);
|
Task Cleanup(IControllerMessageContext context, ConsumeResult consumeResult, Func<Task> next);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,7 @@
|
|||||||
public interface IControllerMessageContext : IMessageContext
|
public interface IControllerMessageContext : IMessageContext
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// An instance of the controller referenced by the binding.
|
/// An instance of the controller referenced by the binding. Note: is null during Cleanup.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
object Controller { get; }
|
object Controller { get; }
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
@ -182,10 +182,16 @@ namespace Tapeti.Default
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
|
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
|
||||||
{
|
{
|
||||||
await MiddlewareHelper.GoAsync(
|
using (var controllerContext = new ControllerMessageContext(context)
|
||||||
bindingInfo.CleanupMiddleware,
|
{
|
||||||
async (handler, next) => await handler.Cleanup(context, consumeResult, next),
|
Controller = null
|
||||||
() => Task.CompletedTask);
|
})
|
||||||
|
{
|
||||||
|
await MiddlewareHelper.GoAsync(
|
||||||
|
bindingInfo.CleanupMiddleware,
|
||||||
|
async (handler, next) => await handler.Cleanup(controllerContext, consumeResult, next),
|
||||||
|
() => Task.CompletedTask);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -47,15 +47,8 @@ namespace Tapeti
|
|||||||
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false)
|
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false)
|
||||||
.Select(m => (MethodInfo)m))
|
.Select(m => (MethodInfo)m))
|
||||||
{
|
{
|
||||||
var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo;
|
|
||||||
if (methodQueueInfo == null || !methodQueueInfo.IsValid)
|
|
||||||
throw new TopologyConfigurationException(
|
|
||||||
$"Method {method.Name} or controller {controller.Name} requires a queue attribute");
|
|
||||||
|
|
||||||
|
|
||||||
var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute<ObsoleteAttribute>() != null;
|
var methodIsObsolete = controllerIsObsolete || method.GetCustomAttribute<ObsoleteAttribute>() != null;
|
||||||
|
|
||||||
|
|
||||||
var context = new ControllerBindingContext(method.GetParameters(), method.ReturnParameter)
|
var context = new ControllerBindingContext(method.GetParameters(), method.ReturnParameter)
|
||||||
{
|
{
|
||||||
Controller = controller,
|
Controller = controller,
|
||||||
@ -85,6 +78,10 @@ namespace Tapeti
|
|||||||
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}");
|
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo;
|
||||||
|
if (methodQueueInfo == null || !methodQueueInfo.IsValid)
|
||||||
|
throw new TopologyConfigurationException(
|
||||||
|
$"Method {method.Name} or controller {controller.Name} requires a queue attribute");
|
||||||
|
|
||||||
builder.RegisterBinding(new ControllerMethodBinding(builderAccess.DependencyResolver, new ControllerMethodBinding.BindingInfo
|
builder.RegisterBinding(new ControllerMethodBinding(builderAccess.DependencyResolver, new ControllerMethodBinding.BindingInfo
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user