Fixed Saga loading and message handler filtering

Added Saga publisher extension
This commit is contained in:
Mark van Renswoude 2016-12-14 20:28:17 +01:00
parent 7bafd2f3c4
commit 5e24a2a336
22 changed files with 211 additions and 131 deletions

View File

@ -11,6 +11,8 @@ namespace Tapeti.Config
{
Type MessageClass { get; set; }
IReadOnlyList<IBindingParameter> Parameters { get; }
void Use(IMessageMiddleware middleware);
}

View File

@ -29,6 +29,8 @@ namespace Tapeti.Config
MethodInfo Method { get; }
Type MessageClass { get; }
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
bool Accept(object message);
Task<object> Invoke(IMessageContext context, object message);
}

View File

@ -1,11 +1,16 @@
using System.Collections.Generic;
using RabbitMQ.Client;
namespace Tapeti.Config
{
public interface IMessageContext
{
IDependencyResolver DependencyResolver { get; }
object Controller { get; }
object Message { get; }
IBasicProperties Properties { get; }
IDictionary<string, object> Items { get; }
}
}

View File

@ -1,9 +1,10 @@
using System;
using System.Threading.Tasks;
namespace Tapeti.Config
{
public interface IMessageMiddleware
{
void Handle(IMessageContext context, Action next);
Task Handle(IMessageContext context, Func<Task> next);
}
}

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using RabbitMQ.Client;
using Tapeti.Config;
using Tapeti.Helpers;
@ -33,25 +34,31 @@ namespace Tapeti.Connection
if (message == null)
throw new ArgumentException("Empty message");
var handled = false;
var validMessageType = false;
foreach (var binding in bindings.Where(b => b.Accept(message)))
{
var context = new MessageContext
{
DependencyResolver = dependencyResolver,
Controller = dependencyResolver.Resolve(binding.Controller),
Message = message
Message = message,
Properties = properties
};
MiddlewareHelper.Go(messageMiddleware, (handler, next) => handler.Handle(context, next));
MiddlewareHelper.GoAsync(binding.MessageMiddleware != null ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() : messageMiddleware,
async (handler, next) => await handler.Handle(context, next),
async () =>
{
var result = binding.Invoke(context, message).Result;
if (result != null)
await worker.Publish(result, null);
}
).Wait();
var result = binding.Invoke(context, message).Result;
if (result != null)
worker.Publish(result);
handled = true;
validMessageType = true;
}
if (!handled)
if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
worker.Respond(deliveryTag, ConsumeResponse.Ack);
@ -66,8 +73,12 @@ namespace Tapeti.Connection
protected class MessageContext : IMessageContext
{
public IDependencyResolver DependencyResolver { get; set; }
public object Controller { get; set; }
public object Message { get; set; }
public IBasicProperties Properties { get; set; }
public IDictionary<string, object> Items { get; } = new Dictionary<string, object>();
}
}

View File

@ -1,9 +1,10 @@
using System;
using System.Threading.Tasks;
using RabbitMQ.Client;
namespace Tapeti.Connection
{
public class TapetiPublisher : IPublisher
public class TapetiPublisher : IAdvancedPublisher
{
private readonly Func<TapetiWorker> workerFactory;
@ -16,7 +17,13 @@ namespace Tapeti.Connection
public Task Publish(object message)
{
return workerFactory().Publish(message);
return workerFactory().Publish(message, null);
}
public Task Publish(object message, IBasicProperties properties)
{
return workerFactory().Publish(message, properties);
}
}
}

View File

@ -32,16 +32,19 @@ namespace Tapeti.Connection
}
public Task Publish(object message)
public Task Publish(object message, IBasicProperties properties)
{
return taskQueue.Value.Add(async () =>
{
var properties = new BasicProperties();
var body = messageSerializer.Serialize(message, properties);
var messageProperties = properties ?? new BasicProperties();
if (messageProperties.Timestamp.UnixTime == 0)
messageProperties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds());
var body = messageSerializer.Serialize(message, messageProperties);
(await GetChannel())
.BasicPublish(Exchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false,
properties, body);
messageProperties, body);
}).Unwrap();
}

View File

@ -1,13 +0,0 @@
using System;
using Tapeti.Config;
namespace Tapeti.Default
{
// End of the line...
public class BindingBufferStop : IBindingMiddleware
{
public void Handle(IBindingContext context, Action next)
{
}
}
}

View File

@ -47,8 +47,8 @@ namespace Tapeti.Default
{
object typeName;
if (!properties.ContentType.Equals(ContentType))
throw new ArgumentException("content_type must be {ContentType}");
if (properties.ContentType == null || !properties.ContentType.Equals(ContentType))
throw new ArgumentException($"content_type must be {ContentType}");
if (properties.Headers == null || !properties.Headers.TryGetValue(ClassTypeHeader, out typeName))
throw new ArgumentException($"{ClassTypeHeader} header not present");

View File

@ -1,15 +1,20 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.Eventing.Reader;
using System.Threading.Tasks;
namespace Tapeti.Helpers
{
public static class MiddlewareHelper
{
public static void Go<T>(IReadOnlyList<T> middleware, Action<T, Action> handle)
public static void Go<T>(IReadOnlyList<T> middleware, Action<T, Action> handle, Action lastHandler)
{
var handlerIndex = middleware.Count - 1;
if (handlerIndex == -1)
{
lastHandler();
return;
}
Action handleNext = null;
@ -18,9 +23,35 @@ namespace Tapeti.Helpers
handlerIndex--;
if (handlerIndex >= 0)
handle(middleware[handlerIndex], handleNext);
else
lastHandler();
};
handle(middleware[handlerIndex], handleNext);
}
public static async Task GoAsync<T>(IReadOnlyList<T> middleware, Func<T, Func<Task>, Task> handle, Func<Task> lastHandler)
{
var handlerIndex = middleware.Count - 1;
if (handlerIndex == -1)
{
await lastHandler();
return;
}
Func<Task> handleNext = null;
handleNext = async () =>
{
handlerIndex--;
if (handlerIndex >= 0)
await handle(middleware[handlerIndex], handleNext);
else
await lastHandler();
};
await handle(middleware[handlerIndex], handleNext);
}
}
}

View File

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
namespace Tapeti
{
@ -6,4 +7,10 @@ namespace Tapeti
{
Task Publish(object message);
}
public interface IAdvancedPublisher : IPublisher
{
Task Publish(object message, IBasicProperties properties);
}
}

View File

@ -6,6 +6,5 @@ namespace Tapeti.Saga
{
Task<ISaga<T>> Begin<T>(T initialState) where T : class;
Task<ISaga<T>> Continue<T>(string sagaId) where T : class;
Task<object> Continue(string sagaId);
}
}

View File

@ -1,28 +0,0 @@
using System;
using System.Linq;
using Tapeti.Config;
namespace Tapeti.Saga
{
public class SagaBindingMiddleware : IBindingMiddleware
{
public void Handle(IBindingContext context, Action next)
{
foreach (var parameter in context.Parameters.Where(p =>
p.Info.ParameterType.IsGenericType &&
p.Info.ParameterType.GetGenericTypeDefinition() == typeof(ISaga<>)))
{
parameter.SetBinding(messageContext =>
{
object saga;
if (!messageContext.Items.TryGetValue("Saga", out saga))
return null;
return saga.GetType() == typeof(ISaga<>) ? saga : null;
});
}
next();
}
}
}

View File

@ -0,0 +1,16 @@
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;
namespace Tapeti.Saga
{
public static class SagaExtensions
{
public static Task Publish<T>(this IPublisher publisher, object message, ISaga<T> saga) where T : class
{
return ((IAdvancedPublisher)publisher).Publish(message, new BasicProperties
{
CorrelationId = saga.Id
});
}
}
}

View File

@ -1,22 +0,0 @@
using System;
using Tapeti.Config;
namespace Tapeti.Saga
{
public class SagaMessageMiddleware : IMessageMiddleware
{
private readonly IDependencyResolver dependencyResolver;
public SagaMessageMiddleware(IDependencyResolver dependencyResolver)
{
this.dependencyResolver = dependencyResolver;
}
public void Handle(IMessageContext context, Action next)
{
context.Items["Saga"] = dependencyResolver.Resolve<ISagaProvider>().Continue("");
next();
}
}
}

View File

@ -1,16 +1,70 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Saga
{
public class SagaMiddleware : IMiddlewareBundle
{
private const string SagaContextKey = "Saga";
public IEnumerable<object> GetContents(IDependencyResolver dependencyResolver)
{
(dependencyResolver as IDependencyInjector)?.RegisterDefault<ISagaProvider, SagaProvider>();
yield return new SagaBindingMiddleware();
yield return new SagaMessageMiddleware(dependencyResolver);
}
protected class SagaBindingMiddleware : IBindingMiddleware
{
public void Handle(IBindingContext context, Action next)
{
var registered = false;
foreach (var parameter in context.Parameters.Where(p =>
p.Info.ParameterType.IsGenericType &&
p.Info.ParameterType.GetGenericTypeDefinition() == typeof(ISaga<>)))
{
if (!registered)
{
var sagaType = parameter.Info.ParameterType.GetGenericArguments()[0];
var middlewareType = typeof(SagaMessageMiddleware<>).MakeGenericType(sagaType);
context.Use(Activator.CreateInstance(middlewareType) as IMessageMiddleware);
registered = true;
}
parameter.SetBinding(messageContext =>
{
object saga;
return messageContext.Items.TryGetValue(SagaContextKey, out saga) ? saga : null;
});
}
next();
}
}
protected class SagaMessageMiddleware<T> : IMessageMiddleware where T : class
{
public async Task Handle(IMessageContext context, Func<Task> next)
{
if (string.IsNullOrEmpty(context.Properties.CorrelationId))
return;
var saga = await context.DependencyResolver.Resolve<ISagaProvider>().Continue<T>(context.Properties.CorrelationId);
if (saga == null)
return;
context.Items[SagaContextKey] = saga;
await next();
}
}
}
}

View File

@ -29,15 +29,6 @@ namespace Tapeti.Saga
return await Saga<T>.Create(async () => await store.Read(sagaId) as T, sagaId);
}
public async Task<object> Continue(string sagaId)
{
return new Saga<object>
{
Id = sagaId,
State = await store.Read(sagaId)
};
}
protected class Saga<T> : ISaga<T> where T : class
{

View File

@ -31,6 +31,7 @@
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="RabbitMQ.Client, Version=4.0.0.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce" />
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
@ -45,11 +46,10 @@
<Compile Include="ISagaProvider.cs" />
<Compile Include="ISagaStore.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="SagaBindingMiddleware.cs" />
<Compile Include="SagaMemoryStore.cs" />
<Compile Include="SagaMessageMiddleware.cs" />
<Compile Include="SagaMiddleware.cs" />
<Compile Include="SagaProvider.cs" />
<Compile Include="SagaExtensions.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti.csproj">

View File

@ -64,7 +64,6 @@
<Compile Include="IConnection.cs" />
<Compile Include="ILogger.cs" />
<Compile Include="Config\IMessageContext.cs" />
<Compile Include="Default\BindingBufferStop.cs" />
<Compile Include="Config\IMessageMiddleware.cs" />
<Compile Include="Config\IMiddlewareBundle.cs" />
<Compile Include="Config\IConfig.cs" />

View File

@ -36,7 +36,6 @@ namespace Tapeti
this.exchange = exchange;
this.dependencyResolver = dependencyResolver;
Use(new BindingBufferStop());
Use(new DependencyResolverBinding(dependencyResolver));
Use(new MessageBinding());
}
@ -129,7 +128,8 @@ namespace Tapeti
Method = method,
QueueInfo = methodQueueInfo,
MessageClass = context.MessageClass,
MessageHandler = messageHandler
MessageHandler = messageHandler,
MessageMiddleware = context.MessageMiddleware
};
if (methodQueueInfo.Dynamic.GetValueOrDefault())
@ -159,7 +159,7 @@ namespace Tapeti
protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method)
{
MiddlewareHelper.Go(bindingMiddleware, (handler, next) => handler.Handle(context, next));
MiddlewareHelper.Go(bindingMiddleware, (handler, next) => handler.Handle(context, next), () => {});
if (context.MessageClass == null)
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} does not resolve to a message class");
@ -336,6 +336,8 @@ namespace Tapeti
public MethodInfo Method { get; set; }
public Type MessageClass { get; set; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; set; }
public QueueInfo QueueInfo { get; set; }
public MessageHandlerFunc MessageHandler { get; set; }
@ -361,14 +363,26 @@ namespace Tapeti
internal class BindingContext : IBindingContext
{
private List<IMessageMiddleware> messageMiddleware;
public Type MessageClass { get; set; }
public IReadOnlyList<IBindingParameter> Parameters { get; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware => messageMiddleware;
public BindingContext(IReadOnlyList<IBindingParameter> parameters)
{
Parameters = parameters;
}
public void Use(IMessageMiddleware middleware)
{
if (messageMiddleware == null)
messageMiddleware = new List<IMessageMiddleware>();
messageMiddleware.Add(middleware);
}
}

View File

@ -1,7 +1,7 @@
using System;
using Microsoft.SqlServer.Server;
using System.Threading.Tasks;
using Tapeti;
using Tapeti.Annotations;
using Tapeti.Saga;
namespace Test
{
@ -9,33 +9,46 @@ namespace Test
public class MarcoController : MessageController
{
private readonly IPublisher publisher;
private readonly ISagaProvider sagaProvider;
public MarcoController(IPublisher publisher/*, ISagaProvider sagaProvider*/)
public MarcoController(IPublisher publisher, ISagaProvider sagaProvider)
{
this.publisher = publisher;
this.sagaProvider = sagaProvider;
}
//[StaticQueue("test")]
public PoloMessage Marco(MarcoMessage message, Visualizer visualizer)
/*
* For simple request response patterns, the return type can also be used:
public async Task<PoloMessage> Marco(MarcoMessage message, Visualizer visualizer)
{
visualizer.VisualizeMarco();
return new PoloMessage(); ;
}
*/
// Visualizer can also be constructor injected, just proving a point here...
public async Task Marco(MarcoMessage message, Visualizer visualizer)
{
visualizer.VisualizeMarco();
/*
using (sagaProvider.Begin<MarcoState>(new MarcoState
using (var saga = await sagaProvider.Begin(new MarcoPoloSaga()))
{
...
}))
{
//publisher.Publish(new PoloColorRequest(), saga, PoloColorResponse1);
//publisher.Publish(new PoloColorRequest(), saga, callID = "tweede");
// Saga refcount = 2
// TODO provide publish extension with Saga support
await publisher.Publish(new PoloMessage(), saga);
}
*/
}
return new PoloMessage(); ;
public void Polo(PoloMessage message, Visualizer visualizer, ISaga<MarcoPoloSaga> saga)
{
if (saga.State.ReceivedPolo)
return;
saga.State.ReceivedPolo = true;
visualizer.VisualizePolo();
}
@ -61,11 +74,6 @@ namespace Test
}
}
*/
public void Polo(PoloMessage message, Visualizer visualizer)
{
visualizer.VisualizePolo();
}
}
@ -79,15 +87,8 @@ namespace Test
}
public class PoloColorRequest
public class MarcoPoloSaga
{
}
public class PoloColorResponse
{
public bool ReceivedPolo;
}
}

View File

@ -13,7 +13,7 @@ namespace Test
var container = new Container();
container.Register<MarcoEmitter>();
container.Register<Visualizer>();
container.Register<ISagaStore, SagaMemoryStore>();
container.RegisterSingleton<ISagaStore, SagaMemoryStore>();
var config = new TapetiConfig("test", new SimpleInjectorDependencyResolver(container))
.Use(new SagaMiddleware())