1
0
mirror of synced 2024-11-25 20:23:09 +01:00

Allow Message middleware to septup a nested context and call 'next' multiple times

This commit is contained in:
Menno van Lavieren 2017-07-07 10:59:12 +02:00
parent 5131e931f5
commit 06654b099a
3 changed files with 313 additions and 25 deletions

View File

@ -21,5 +21,7 @@ namespace Tapeti.Config
object Controller { get; } object Controller { get; }
IBinding Binding { get; } IBinding Binding { get; }
IMessageContext SetupNestedContext();
} }
} }

View File

@ -6,6 +6,7 @@ using RabbitMQ.Client;
using Tapeti.Config; using Tapeti.Config;
using Tapeti.Default; using Tapeti.Default;
using Tapeti.Helpers; using Tapeti.Helpers;
using System.Threading.Tasks;
namespace Tapeti.Connection namespace Tapeti.Connection
{ {
@ -56,31 +57,13 @@ namespace Tapeti.Connection
{ {
foreach (var binding in bindings) foreach (var binding in bindings)
{ {
if (!binding.Accept(context, message)) if (binding.Accept(context, message))
continue;
context.Binding = binding;
// ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas
MiddlewareHelper.GoAsync(
binding.MessageFilterMiddleware,
async (handler, next) => await handler.Handle(context, next),
async () =>
{ {
context.Controller = dependencyResolver.Resolve(binding.Controller); InvokeUsingBinding(context, binding, message);
await MiddlewareHelper.GoAsync(
binding.MessageMiddleware != null
? messageMiddleware.Concat(binding.MessageMiddleware).ToList()
: messageMiddleware,
async (handler, next) => await handler.Handle(context, next),
() => binding.Invoke(context, message)
);
}).Wait();
// ReSharper restore AccessToDisposedClosure
validMessageType = true; validMessageType = true;
} }
}
if (!validMessageType) if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
@ -104,6 +87,60 @@ namespace Tapeti.Connection
} }
private void InvokeUsingBinding(MessageContext context, IBinding binding, object message)
{
context.Binding = binding;
RecursiveCaller firstCaller = null;
RecursiveCaller currentCaller = null;
Action<Handler> addHandler = (Handler handle) =>
{
var caller = new RecursiveCaller(handle);
if (currentCaller == null)
firstCaller = caller;
else
currentCaller.next = caller;
currentCaller = caller;
};
if (binding.MessageFilterMiddleware != null)
{
foreach (var m in binding.MessageFilterMiddleware)
{
addHandler(m.Handle);
}
}
addHandler(async (c, next) =>
{
c.Controller = dependencyResolver.Resolve(binding.Controller);
await next();
});
foreach (var m in messageMiddleware)
{
addHandler(m.Handle);
}
if (binding.MessageMiddleware != null)
{
foreach (var m in binding.MessageMiddleware)
{
addHandler(m.Handle);
}
}
addHandler(async (c, next) =>
{
await binding.Invoke(context, message);
});
firstCaller.Call(context)
.Wait();
}
private static Exception UnwrapException(Exception exception) private static Exception UnwrapException(Exception exception)
{ {
// In async/await style code this is handled similarly. For synchronous // In async/await style code this is handled similarly. For synchronous
@ -120,4 +157,61 @@ namespace Tapeti.Connection
} }
} }
} }
public delegate Task Handler(MessageContext context, Func<Task> next);
public class RecursiveCaller: ICallFrame
{
private Handler handle;
private MessageContext context;
private MessageContext nextContext;
public RecursiveCaller next;
public RecursiveCaller(Handler handle)
{
this.handle = handle;
}
internal async Task Call(MessageContext context)
{
if (this.context != null)
throw new InvalidOperationException("Cannot simultaneously call 'next' in Middleware.");
try
{
this.context = context;
if (next != null)
context.SetCallFrame(this);
await handle(context, callNext);
}
finally
{
context = null;
}
}
private Task callNext()
{
if (next == null)
return Task.CompletedTask;
return next.Call(nextContext ?? context);
}
void ICallFrame.UseNestedContext(MessageContext context)
{
if (nextContext != null)
throw new InvalidOperationException("Previous nested context was not yet disposed.");
nextContext = context;
}
void ICallFrame.OnContextDisposed(MessageContext context)
{
if (nextContext == context)
nextContext = null;
}
}
} }

View File

@ -1,10 +1,17 @@
using System; using System;
using System.Collections;
using System.Collections.Generic; using System.Collections.Generic;
using RabbitMQ.Client; using RabbitMQ.Client;
using Tapeti.Config; using Tapeti.Config;
using System.Linq;
namespace Tapeti.Default namespace Tapeti.Default
{ {
public interface ICallFrame {
void UseNestedContext(MessageContext context);
void OnContextDisposed(MessageContext context);
}
public class MessageContext : IMessageContext public class MessageContext : IMessageContext
{ {
public IDependencyResolver DependencyResolver { get; set; } public IDependencyResolver DependencyResolver { get; set; }
@ -17,13 +24,198 @@ namespace Tapeti.Default
public object Message { get; set; } public object Message { get; set; }
public IBasicProperties Properties { get; set; } public IBasicProperties Properties { get; set; }
public IDictionary<string, object> Items { get; } = new Dictionary<string, object>(); public IDictionary<string, object> Items { get; }
private readonly MessageContext outerContext;
private ICallFrame callFrame;
public MessageContext()
{
Items = new Dictionary<string, object>();
}
public MessageContext(ICallFrame callFrame)
{
Items = new Dictionary<string, object>();
this.callFrame = callFrame;
}
private MessageContext(MessageContext outerContext)
{
DependencyResolver = outerContext.DependencyResolver;
Controller = outerContext.Controller;
Binding = outerContext.Binding;
Queue = outerContext.Queue;
RoutingKey = outerContext.RoutingKey;
Message = outerContext.Message;
Properties = outerContext.Properties;
Items = new DeferingDictionary(outerContext.Items);
this.outerContext = outerContext;
}
public void Dispose() public void Dispose()
{ {
foreach (var value in Items.Values) var items = (Items as DeferingDictionary)?.MyState ?? Items;
foreach (var value in items.Values)
(value as IDisposable)?.Dispose(); (value as IDisposable)?.Dispose();
callFrame?.OnContextDisposed(this);
}
public void SetCallFrame(ICallFrame callFrame)
{
this.callFrame = callFrame;
}
public IMessageContext SetupNestedContext()
{
if (callFrame == null)
throw new NotSupportedException("This context does not support creating nested contexts");
var nested = new MessageContext(this);
callFrame.UseNestedContext(nested);
return nested;
}
private class DeferingDictionary : IDictionary<string, object>
{
private IDictionary<string, object> myState;
private IDictionary<string, object> deferee;
public DeferingDictionary(IDictionary<string, object> deferee)
{
myState = new Dictionary<string, object>();
this.deferee = deferee;
}
public IDictionary<string, object> MyState => myState;
object IDictionary<string, object>.this[string key]
{
get
{
return myState.ContainsKey(key) ? myState[key] : deferee[key];
}
set
{
if (deferee.ContainsKey(key))
throw new InvalidOperationException("Cannot hide an item set in an outer context.");
myState[key] = value;
}
}
int ICollection<KeyValuePair<string, object>>.Count
{
get
{
return myState.Count + deferee.Count;
}
}
bool ICollection<KeyValuePair<string, object>>.IsReadOnly
{
get
{
return false;
}
}
ICollection<string> IDictionary<string, object>.Keys
{
get
{
return myState.Keys.Concat(deferee.Keys).ToList().AsReadOnly();
}
}
ICollection<object> IDictionary<string, object>.Values
{
get
{
return myState.Values.Concat(deferee.Values).ToList().AsReadOnly();
}
}
void ICollection<KeyValuePair<string, object>>.Add(KeyValuePair<string, object> item)
{
if (deferee.ContainsKey(item.Key))
throw new InvalidOperationException("Cannot hide an item set in an outer context.");
myState.Add(item);
}
void IDictionary<string, object>.Add(string key, object value)
{
if (deferee.ContainsKey(key))
throw new InvalidOperationException("Cannot hide an item set in an outer context.");
myState.Add(key, value);
}
void ICollection<KeyValuePair<string, object>>.Clear()
{
throw new InvalidOperationException("Cannot influence the items in an outer context.");
}
bool ICollection<KeyValuePair<string, object>>.Contains(KeyValuePair<string, object> item)
{
return myState.Contains(item) || deferee.Contains(item);
}
bool IDictionary<string, object>.ContainsKey(string key)
{
return myState.ContainsKey(key) || deferee.ContainsKey(key);
}
void ICollection<KeyValuePair<string, object>>.CopyTo(KeyValuePair<string, object>[] array, int arrayIndex)
{
foreach(var item in myState.Concat(deferee))
{
array[arrayIndex++] = item;
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return (IEnumerator)myState.Concat(deferee);
}
IEnumerator<KeyValuePair<string, object>> IEnumerable<KeyValuePair<string, object>>.GetEnumerator()
{
return (IEnumerator < KeyValuePair < string, object>> )myState.Concat(deferee);
}
bool ICollection<KeyValuePair<string, object>>.Remove(KeyValuePair<string, object> item)
{
if (deferee.ContainsKey(item.Key))
throw new InvalidOperationException("Cannot remove an item set in an outer context.");
return myState.Remove(item);
}
bool IDictionary<string, object>.Remove(string key)
{
if (deferee.ContainsKey(key))
throw new InvalidOperationException("Cannot remove an item set in an outer context.");
return myState.Remove(key);
}
bool IDictionary<string, object>.TryGetValue(string key, out object value)
{
return myState.TryGetValue(key, out value)
|| deferee.TryGetValue(key, out value);
}
} }
} }
} }