Fixed Saga loading and message handler filtering
Added Saga publisher extension
This commit is contained in:
parent
2806e2d304
commit
14ee717985
@ -11,6 +11,8 @@ namespace Tapeti.Config
|
||||
{
|
||||
Type MessageClass { get; set; }
|
||||
IReadOnlyList<IBindingParameter> Parameters { get; }
|
||||
|
||||
void Use(IMessageMiddleware middleware);
|
||||
}
|
||||
|
||||
|
||||
|
@ -29,6 +29,8 @@ namespace Tapeti.Config
|
||||
MethodInfo Method { get; }
|
||||
Type MessageClass { get; }
|
||||
|
||||
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
|
||||
|
||||
bool Accept(object message);
|
||||
Task<object> Invoke(IMessageContext context, object message);
|
||||
}
|
||||
|
@ -1,11 +1,16 @@
|
||||
using System.Collections.Generic;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace Tapeti.Config
|
||||
{
|
||||
public interface IMessageContext
|
||||
{
|
||||
IDependencyResolver DependencyResolver { get; }
|
||||
|
||||
object Controller { get; }
|
||||
object Message { get; }
|
||||
IBasicProperties Properties { get; }
|
||||
|
||||
IDictionary<string, object> Items { get; }
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,10 @@
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Tapeti.Config
|
||||
{
|
||||
public interface IMessageMiddleware
|
||||
{
|
||||
void Handle(IMessageContext context, Action next);
|
||||
Task Handle(IMessageContext context, Func<Task> next);
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using RabbitMQ.Client;
|
||||
using Tapeti.Config;
|
||||
using Tapeti.Helpers;
|
||||
@ -33,25 +34,31 @@ namespace Tapeti.Connection
|
||||
if (message == null)
|
||||
throw new ArgumentException("Empty message");
|
||||
|
||||
var handled = false;
|
||||
var validMessageType = false;
|
||||
foreach (var binding in bindings.Where(b => b.Accept(message)))
|
||||
{
|
||||
var context = new MessageContext
|
||||
{
|
||||
DependencyResolver = dependencyResolver,
|
||||
Controller = dependencyResolver.Resolve(binding.Controller),
|
||||
Message = message
|
||||
Message = message,
|
||||
Properties = properties
|
||||
};
|
||||
|
||||
MiddlewareHelper.Go(messageMiddleware, (handler, next) => handler.Handle(context, next));
|
||||
|
||||
MiddlewareHelper.GoAsync(binding.MessageMiddleware != null ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() : messageMiddleware,
|
||||
async (handler, next) => await handler.Handle(context, next),
|
||||
async () =>
|
||||
{
|
||||
var result = binding.Invoke(context, message).Result;
|
||||
if (result != null)
|
||||
worker.Publish(result);
|
||||
await worker.Publish(result, null);
|
||||
}
|
||||
).Wait();
|
||||
|
||||
handled = true;
|
||||
validMessageType = true;
|
||||
}
|
||||
|
||||
if (!handled)
|
||||
if (!validMessageType)
|
||||
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
|
||||
|
||||
worker.Respond(deliveryTag, ConsumeResponse.Ack);
|
||||
@ -66,8 +73,12 @@ namespace Tapeti.Connection
|
||||
|
||||
protected class MessageContext : IMessageContext
|
||||
{
|
||||
public IDependencyResolver DependencyResolver { get; set; }
|
||||
|
||||
public object Controller { get; set; }
|
||||
public object Message { get; set; }
|
||||
public IBasicProperties Properties { get; set; }
|
||||
|
||||
public IDictionary<string, object> Items { get; } = new Dictionary<string, object>();
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,10 @@
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace Tapeti.Connection
|
||||
{
|
||||
public class TapetiPublisher : IPublisher
|
||||
public class TapetiPublisher : IAdvancedPublisher
|
||||
{
|
||||
private readonly Func<TapetiWorker> workerFactory;
|
||||
|
||||
@ -16,7 +17,13 @@ namespace Tapeti.Connection
|
||||
|
||||
public Task Publish(object message)
|
||||
{
|
||||
return workerFactory().Publish(message);
|
||||
return workerFactory().Publish(message, null);
|
||||
}
|
||||
|
||||
|
||||
public Task Publish(object message, IBasicProperties properties)
|
||||
{
|
||||
return workerFactory().Publish(message, properties);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -32,16 +32,19 @@ namespace Tapeti.Connection
|
||||
}
|
||||
|
||||
|
||||
public Task Publish(object message)
|
||||
public Task Publish(object message, IBasicProperties properties)
|
||||
{
|
||||
return taskQueue.Value.Add(async () =>
|
||||
{
|
||||
var properties = new BasicProperties();
|
||||
var body = messageSerializer.Serialize(message, properties);
|
||||
var messageProperties = properties ?? new BasicProperties();
|
||||
if (messageProperties.Timestamp.UnixTime == 0)
|
||||
messageProperties.Timestamp = new AmqpTimestamp(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds());
|
||||
|
||||
var body = messageSerializer.Serialize(message, messageProperties);
|
||||
|
||||
(await GetChannel())
|
||||
.BasicPublish(Exchange, routingKeyStrategy.GetRoutingKey(message.GetType()), false,
|
||||
properties, body);
|
||||
messageProperties, body);
|
||||
}).Unwrap();
|
||||
}
|
||||
|
||||
|
@ -1,13 +0,0 @@
|
||||
using System;
|
||||
using Tapeti.Config;
|
||||
|
||||
namespace Tapeti.Default
|
||||
{
|
||||
// End of the line...
|
||||
public class BindingBufferStop : IBindingMiddleware
|
||||
{
|
||||
public void Handle(IBindingContext context, Action next)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
@ -47,8 +47,8 @@ namespace Tapeti.Default
|
||||
{
|
||||
object typeName;
|
||||
|
||||
if (!properties.ContentType.Equals(ContentType))
|
||||
throw new ArgumentException("content_type must be {ContentType}");
|
||||
if (properties.ContentType == null || !properties.ContentType.Equals(ContentType))
|
||||
throw new ArgumentException($"content_type must be {ContentType}");
|
||||
|
||||
if (properties.Headers == null || !properties.Headers.TryGetValue(ClassTypeHeader, out typeName))
|
||||
throw new ArgumentException($"{ClassTypeHeader} header not present");
|
||||
|
@ -1,15 +1,20 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics.Eventing.Reader;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Tapeti.Helpers
|
||||
{
|
||||
public static class MiddlewareHelper
|
||||
{
|
||||
public static void Go<T>(IReadOnlyList<T> middleware, Action<T, Action> handle)
|
||||
public static void Go<T>(IReadOnlyList<T> middleware, Action<T, Action> handle, Action lastHandler)
|
||||
{
|
||||
var handlerIndex = middleware.Count - 1;
|
||||
if (handlerIndex == -1)
|
||||
{
|
||||
lastHandler();
|
||||
return;
|
||||
}
|
||||
|
||||
Action handleNext = null;
|
||||
|
||||
@ -18,9 +23,35 @@ namespace Tapeti.Helpers
|
||||
handlerIndex--;
|
||||
if (handlerIndex >= 0)
|
||||
handle(middleware[handlerIndex], handleNext);
|
||||
else
|
||||
lastHandler();
|
||||
};
|
||||
|
||||
handle(middleware[handlerIndex], handleNext);
|
||||
}
|
||||
|
||||
|
||||
public static async Task GoAsync<T>(IReadOnlyList<T> middleware, Func<T, Func<Task>, Task> handle, Func<Task> lastHandler)
|
||||
{
|
||||
var handlerIndex = middleware.Count - 1;
|
||||
if (handlerIndex == -1)
|
||||
{
|
||||
await lastHandler();
|
||||
return;
|
||||
}
|
||||
|
||||
Func<Task> handleNext = null;
|
||||
|
||||
handleNext = async () =>
|
||||
{
|
||||
handlerIndex--;
|
||||
if (handlerIndex >= 0)
|
||||
await handle(middleware[handlerIndex], handleNext);
|
||||
else
|
||||
await lastHandler();
|
||||
};
|
||||
|
||||
await handle(middleware[handlerIndex], handleNext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
using System.Threading.Tasks;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
namespace Tapeti
|
||||
{
|
||||
@ -6,4 +7,10 @@ namespace Tapeti
|
||||
{
|
||||
Task Publish(object message);
|
||||
}
|
||||
|
||||
|
||||
public interface IAdvancedPublisher : IPublisher
|
||||
{
|
||||
Task Publish(object message, IBasicProperties properties);
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,5 @@ namespace Tapeti.Saga
|
||||
{
|
||||
Task<ISaga<T>> Begin<T>(T initialState) where T : class;
|
||||
Task<ISaga<T>> Continue<T>(string sagaId) where T : class;
|
||||
Task<object> Continue(string sagaId);
|
||||
}
|
||||
}
|
||||
|
@ -1,28 +0,0 @@
|
||||
using System;
|
||||
using System.Linq;
|
||||
using Tapeti.Config;
|
||||
|
||||
namespace Tapeti.Saga
|
||||
{
|
||||
public class SagaBindingMiddleware : IBindingMiddleware
|
||||
{
|
||||
public void Handle(IBindingContext context, Action next)
|
||||
{
|
||||
foreach (var parameter in context.Parameters.Where(p =>
|
||||
p.Info.ParameterType.IsGenericType &&
|
||||
p.Info.ParameterType.GetGenericTypeDefinition() == typeof(ISaga<>)))
|
||||
{
|
||||
parameter.SetBinding(messageContext =>
|
||||
{
|
||||
object saga;
|
||||
if (!messageContext.Items.TryGetValue("Saga", out saga))
|
||||
return null;
|
||||
|
||||
return saga.GetType() == typeof(ISaga<>) ? saga : null;
|
||||
});
|
||||
}
|
||||
|
||||
next();
|
||||
}
|
||||
}
|
||||
}
|
16
Tapeti.Saga/SagaExtensions.cs
Normal file
16
Tapeti.Saga/SagaExtensions.cs
Normal file
@ -0,0 +1,16 @@
|
||||
using System.Threading.Tasks;
|
||||
using RabbitMQ.Client.Framing;
|
||||
|
||||
namespace Tapeti.Saga
|
||||
{
|
||||
public static class SagaExtensions
|
||||
{
|
||||
public static Task Publish<T>(this IPublisher publisher, object message, ISaga<T> saga) where T : class
|
||||
{
|
||||
return ((IAdvancedPublisher)publisher).Publish(message, new BasicProperties
|
||||
{
|
||||
CorrelationId = saga.Id
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
@ -1,22 +0,0 @@
|
||||
using System;
|
||||
using Tapeti.Config;
|
||||
|
||||
namespace Tapeti.Saga
|
||||
{
|
||||
public class SagaMessageMiddleware : IMessageMiddleware
|
||||
{
|
||||
private readonly IDependencyResolver dependencyResolver;
|
||||
|
||||
|
||||
public SagaMessageMiddleware(IDependencyResolver dependencyResolver)
|
||||
{
|
||||
this.dependencyResolver = dependencyResolver;
|
||||
}
|
||||
|
||||
public void Handle(IMessageContext context, Action next)
|
||||
{
|
||||
context.Items["Saga"] = dependencyResolver.Resolve<ISagaProvider>().Continue("");
|
||||
next();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,16 +1,70 @@
|
||||
using System.Collections.Generic;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using Tapeti.Config;
|
||||
|
||||
namespace Tapeti.Saga
|
||||
{
|
||||
public class SagaMiddleware : IMiddlewareBundle
|
||||
{
|
||||
private const string SagaContextKey = "Saga";
|
||||
|
||||
|
||||
public IEnumerable<object> GetContents(IDependencyResolver dependencyResolver)
|
||||
{
|
||||
(dependencyResolver as IDependencyInjector)?.RegisterDefault<ISagaProvider, SagaProvider>();
|
||||
|
||||
yield return new SagaBindingMiddleware();
|
||||
yield return new SagaMessageMiddleware(dependencyResolver);
|
||||
}
|
||||
|
||||
|
||||
protected class SagaBindingMiddleware : IBindingMiddleware
|
||||
{
|
||||
public void Handle(IBindingContext context, Action next)
|
||||
{
|
||||
var registered = false;
|
||||
|
||||
foreach (var parameter in context.Parameters.Where(p =>
|
||||
p.Info.ParameterType.IsGenericType &&
|
||||
p.Info.ParameterType.GetGenericTypeDefinition() == typeof(ISaga<>)))
|
||||
{
|
||||
if (!registered)
|
||||
{
|
||||
var sagaType = parameter.Info.ParameterType.GetGenericArguments()[0];
|
||||
var middlewareType = typeof(SagaMessageMiddleware<>).MakeGenericType(sagaType);
|
||||
|
||||
context.Use(Activator.CreateInstance(middlewareType) as IMessageMiddleware);
|
||||
|
||||
registered = true;
|
||||
}
|
||||
|
||||
parameter.SetBinding(messageContext =>
|
||||
{
|
||||
object saga;
|
||||
return messageContext.Items.TryGetValue(SagaContextKey, out saga) ? saga : null;
|
||||
});
|
||||
}
|
||||
|
||||
next();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected class SagaMessageMiddleware<T> : IMessageMiddleware where T : class
|
||||
{
|
||||
public async Task Handle(IMessageContext context, Func<Task> next)
|
||||
{
|
||||
if (string.IsNullOrEmpty(context.Properties.CorrelationId))
|
||||
return;
|
||||
|
||||
var saga = await context.DependencyResolver.Resolve<ISagaProvider>().Continue<T>(context.Properties.CorrelationId);
|
||||
if (saga == null)
|
||||
return;
|
||||
|
||||
context.Items[SagaContextKey] = saga;
|
||||
await next();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -29,15 +29,6 @@ namespace Tapeti.Saga
|
||||
return await Saga<T>.Create(async () => await store.Read(sagaId) as T, sagaId);
|
||||
}
|
||||
|
||||
public async Task<object> Continue(string sagaId)
|
||||
{
|
||||
return new Saga<object>
|
||||
{
|
||||
Id = sagaId,
|
||||
State = await store.Read(sagaId)
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
protected class Saga<T> : ISaga<T> where T : class
|
||||
{
|
||||
|
@ -31,6 +31,7 @@
|
||||
<WarningLevel>4</WarningLevel>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<Reference Include="RabbitMQ.Client, Version=4.0.0.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce" />
|
||||
<Reference Include="System" />
|
||||
<Reference Include="System.Core" />
|
||||
<Reference Include="System.Xml.Linq" />
|
||||
@ -45,11 +46,10 @@
|
||||
<Compile Include="ISagaProvider.cs" />
|
||||
<Compile Include="ISagaStore.cs" />
|
||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||
<Compile Include="SagaBindingMiddleware.cs" />
|
||||
<Compile Include="SagaMemoryStore.cs" />
|
||||
<Compile Include="SagaMessageMiddleware.cs" />
|
||||
<Compile Include="SagaMiddleware.cs" />
|
||||
<Compile Include="SagaProvider.cs" />
|
||||
<Compile Include="SagaExtensions.cs" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Tapeti.csproj">
|
||||
|
@ -64,7 +64,6 @@
|
||||
<Compile Include="IConnection.cs" />
|
||||
<Compile Include="ILogger.cs" />
|
||||
<Compile Include="Config\IMessageContext.cs" />
|
||||
<Compile Include="Default\BindingBufferStop.cs" />
|
||||
<Compile Include="Config\IMessageMiddleware.cs" />
|
||||
<Compile Include="Config\IMiddlewareBundle.cs" />
|
||||
<Compile Include="Config\IConfig.cs" />
|
||||
|
@ -36,7 +36,6 @@ namespace Tapeti
|
||||
this.exchange = exchange;
|
||||
this.dependencyResolver = dependencyResolver;
|
||||
|
||||
Use(new BindingBufferStop());
|
||||
Use(new DependencyResolverBinding(dependencyResolver));
|
||||
Use(new MessageBinding());
|
||||
}
|
||||
@ -129,7 +128,8 @@ namespace Tapeti
|
||||
Method = method,
|
||||
QueueInfo = methodQueueInfo,
|
||||
MessageClass = context.MessageClass,
|
||||
MessageHandler = messageHandler
|
||||
MessageHandler = messageHandler,
|
||||
MessageMiddleware = context.MessageMiddleware
|
||||
};
|
||||
|
||||
if (methodQueueInfo.Dynamic.GetValueOrDefault())
|
||||
@ -159,7 +159,7 @@ namespace Tapeti
|
||||
|
||||
protected MessageHandlerFunc GetMessageHandler(IBindingContext context, MethodInfo method)
|
||||
{
|
||||
MiddlewareHelper.Go(bindingMiddleware, (handler, next) => handler.Handle(context, next));
|
||||
MiddlewareHelper.Go(bindingMiddleware, (handler, next) => handler.Handle(context, next), () => {});
|
||||
|
||||
if (context.MessageClass == null)
|
||||
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} does not resolve to a message class");
|
||||
@ -336,6 +336,8 @@ namespace Tapeti
|
||||
public MethodInfo Method { get; set; }
|
||||
public Type MessageClass { get; set; }
|
||||
|
||||
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; set; }
|
||||
|
||||
public QueueInfo QueueInfo { get; set; }
|
||||
public MessageHandlerFunc MessageHandler { get; set; }
|
||||
|
||||
@ -361,14 +363,26 @@ namespace Tapeti
|
||||
|
||||
internal class BindingContext : IBindingContext
|
||||
{
|
||||
private List<IMessageMiddleware> messageMiddleware;
|
||||
|
||||
public Type MessageClass { get; set; }
|
||||
public IReadOnlyList<IBindingParameter> Parameters { get; }
|
||||
public IReadOnlyList<IMessageMiddleware> MessageMiddleware => messageMiddleware;
|
||||
|
||||
|
||||
public BindingContext(IReadOnlyList<IBindingParameter> parameters)
|
||||
{
|
||||
Parameters = parameters;
|
||||
}
|
||||
|
||||
|
||||
public void Use(IMessageMiddleware middleware)
|
||||
{
|
||||
if (messageMiddleware == null)
|
||||
messageMiddleware = new List<IMessageMiddleware>();
|
||||
|
||||
messageMiddleware.Add(middleware);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
using System;
|
||||
using Microsoft.SqlServer.Server;
|
||||
using System.Threading.Tasks;
|
||||
using Tapeti;
|
||||
using Tapeti.Annotations;
|
||||
using Tapeti.Saga;
|
||||
|
||||
namespace Test
|
||||
{
|
||||
@ -9,33 +9,46 @@ namespace Test
|
||||
public class MarcoController : MessageController
|
||||
{
|
||||
private readonly IPublisher publisher;
|
||||
private readonly ISagaProvider sagaProvider;
|
||||
|
||||
|
||||
public MarcoController(IPublisher publisher/*, ISagaProvider sagaProvider*/)
|
||||
public MarcoController(IPublisher publisher, ISagaProvider sagaProvider)
|
||||
{
|
||||
this.publisher = publisher;
|
||||
this.sagaProvider = sagaProvider;
|
||||
}
|
||||
|
||||
|
||||
//[StaticQueue("test")]
|
||||
public PoloMessage Marco(MarcoMessage message, Visualizer visualizer)
|
||||
/*
|
||||
* For simple request response patterns, the return type can also be used:
|
||||
|
||||
public async Task<PoloMessage> Marco(MarcoMessage message, Visualizer visualizer)
|
||||
{
|
||||
visualizer.VisualizeMarco();
|
||||
|
||||
/*
|
||||
using (sagaProvider.Begin<MarcoState>(new MarcoState
|
||||
{
|
||||
...
|
||||
}))
|
||||
{
|
||||
//publisher.Publish(new PoloColorRequest(), saga, PoloColorResponse1);
|
||||
//publisher.Publish(new PoloColorRequest(), saga, callID = "tweede");
|
||||
|
||||
// Saga refcount = 2
|
||||
return new PoloMessage(); ;
|
||||
}
|
||||
*/
|
||||
|
||||
return new PoloMessage(); ;
|
||||
// Visualizer can also be constructor injected, just proving a point here...
|
||||
public async Task Marco(MarcoMessage message, Visualizer visualizer)
|
||||
{
|
||||
visualizer.VisualizeMarco();
|
||||
|
||||
using (var saga = await sagaProvider.Begin(new MarcoPoloSaga()))
|
||||
{
|
||||
// TODO provide publish extension with Saga support
|
||||
await publisher.Publish(new PoloMessage(), saga);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void Polo(PoloMessage message, Visualizer visualizer, ISaga<MarcoPoloSaga> saga)
|
||||
{
|
||||
if (saga.State.ReceivedPolo)
|
||||
return;
|
||||
|
||||
saga.State.ReceivedPolo = true;
|
||||
visualizer.VisualizePolo();
|
||||
}
|
||||
|
||||
|
||||
@ -61,11 +74,6 @@ namespace Test
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
public void Polo(PoloMessage message, Visualizer visualizer)
|
||||
{
|
||||
visualizer.VisualizePolo();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -79,15 +87,8 @@ namespace Test
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
public class PoloColorRequest
|
||||
public class MarcoPoloSaga
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
public class PoloColorResponse
|
||||
{
|
||||
|
||||
public bool ReceivedPolo;
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ namespace Test
|
||||
var container = new Container();
|
||||
container.Register<MarcoEmitter>();
|
||||
container.Register<Visualizer>();
|
||||
container.Register<ISagaStore, SagaMemoryStore>();
|
||||
container.RegisterSingleton<ISagaStore, SagaMemoryStore>();
|
||||
|
||||
var config = new TapetiConfig("test", new SimpleInjectorDependencyResolver(container))
|
||||
.Use(new SagaMiddleware())
|
||||
|
Loading…
Reference in New Issue
Block a user