1
0
mirror of synced 2025-01-23 08:23:08 +01:00

First working prototype of the flow port

This commit is contained in:
Mark van Renswoude 2017-02-05 23:22:34 +01:00
parent 1379eb5a17
commit a7b1ea85e5
38 changed files with 601 additions and 323 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
bin/ bin/
obj/ obj/
packages/ packages/
*.user

View File

@ -7,7 +7,7 @@ namespace Tapeti.Config
{ {
public interface IConfig public interface IConfig
{ {
string Exchange { get; } string SubscribeExchange { get; }
IDependencyResolver DependencyResolver { get; } IDependencyResolver DependencyResolver { get; }
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; } IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
IEnumerable<IQueue> Queues { get; } IEnumerable<IQueue> Queues { get; }
@ -33,9 +33,10 @@ namespace Tapeti.Config
string QueueName { get; } string QueueName { get; }
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; } IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
IReadOnlyList<IBindingFilter> BindingFilters { get; }
bool Accept(object message); Task<bool> Accept(IMessageContext context, object message);
Task<object> Invoke(IMessageContext context, object message); Task Invoke(IMessageContext context, object message);
} }

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Reflection;
using RabbitMQ.Client; using RabbitMQ.Client;
namespace Tapeti.Config namespace Tapeti.Config
@ -19,5 +20,10 @@ namespace Tapeti.Config
/// Controller will be null when passed to an IBindingFilter /// Controller will be null when passed to an IBindingFilter
/// </summary> /// </summary>
object Controller { get; } object Controller { get; }
/// <summary>
/// Binding will be null when passed to an IBindingFilter
/// </summary>
IBinding Binding { get; }
} }
} }

View File

@ -36,47 +36,52 @@ namespace Tapeti.Connection
throw new ArgumentException("Empty message"); throw new ArgumentException("Empty message");
var validMessageType = false; var validMessageType = false;
foreach (var binding in bindings.Where(b => b.Accept(message)))
{
using (var context = new MessageContext using (var context = new MessageContext
{ {
DependencyResolver = dependencyResolver, DependencyResolver = dependencyResolver,
Controller = dependencyResolver.Resolve(binding.Controller),
Queue = queueName, Queue = queueName,
RoutingKey = routingKey, RoutingKey = routingKey,
Message = message, Message = message,
Properties = properties Properties = properties
}) })
{ {
foreach (var binding in bindings)
{
if (!binding.Accept(context, message).Result)
continue;
context.Controller = dependencyResolver.Resolve(binding.Controller);
context.Binding = binding;
// ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas // ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas
MiddlewareHelper.GoAsync( MiddlewareHelper.GoAsync(
binding.MessageMiddleware != null binding.MessageMiddleware != null
? messageMiddleware.Concat(binding.MessageMiddleware).ToList() ? messageMiddleware.Concat(binding.MessageMiddleware).ToList()
: messageMiddleware, : messageMiddleware,
async (handler, next) => await handler.Handle(context, next), async (handler, next) => await handler.Handle(context, next),
async () => () => binding.Invoke(context, message)
{
var result = binding.Invoke(context, message).Result;
// TODO change to result handler
if (result != null)
await worker.Publish(result, null);
}
).Wait(); ).Wait();
// ReSharper restore AccessToDisposedClosure // ReSharper restore AccessToDisposedClosure
}
validMessageType = true; validMessageType = true;
} }
}
if (!validMessageType) if (!validMessageType)
throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}"); throw new ArgumentException($"Unsupported message type: {message.GetType().FullName}");
worker.Respond(deliveryTag, ConsumeResponse.Ack); worker.Respond(deliveryTag, ConsumeResponse.Ack);
} }
catch (Exception) catch (Exception e)
{ {
// TODO allow different exception handling depending on exception type
worker.Respond(deliveryTag, ConsumeResponse.Requeue); worker.Respond(deliveryTag, ConsumeResponse.Requeue);
var aggregateException = e as AggregateException;
if (aggregateException != null && aggregateException.InnerExceptions.Count == 1)
throw aggregateException.InnerExceptions[0];
throw; throw;
} }
} }
@ -87,6 +92,7 @@ namespace Tapeti.Connection
public IDependencyResolver DependencyResolver { get; set; } public IDependencyResolver DependencyResolver { get; set; }
public object Controller { get; set; } public object Controller { get; set; }
public IBinding Binding { get; set; }
public string Queue { get; set; } public string Queue { get; set; }
public string RoutingKey { get; set; } public string RoutingKey { get; set; }

View File

@ -4,7 +4,7 @@ using RabbitMQ.Client;
namespace Tapeti.Connection namespace Tapeti.Connection
{ {
public class TapetiPublisher : IAdvancedPublisher public class TapetiPublisher : IInternalPublisher
{ {
private readonly Func<TapetiWorker> workerFactory; private readonly Func<TapetiWorker> workerFactory;

View File

@ -12,7 +12,7 @@ namespace Tapeti.Connection
public class TapetiWorker public class TapetiWorker
{ {
public TapetiConnectionParams ConnectionParams { get; set; } public TapetiConnectionParams ConnectionParams { get; set; }
public string Exchange { get; set; } public string SubscribeExchange { get; set; }
private readonly IDependencyResolver dependencyResolver; private readonly IDependencyResolver dependencyResolver;
private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware; private readonly IReadOnlyList<IMessageMiddleware> messageMiddleware;
@ -34,7 +34,8 @@ namespace Tapeti.Connection
public Task Publish(object message, IBasicProperties properties) public Task Publish(object message, IBasicProperties properties)
{ {
return Publish(message, properties, Exchange, routingKeyStrategy.GetRoutingKey(message.GetType())); // TODO use exchange strategy!
return Publish(message, properties, SubscribeExchange, routingKeyStrategy.GetRoutingKey(message.GetType()));
} }
@ -66,7 +67,7 @@ namespace Tapeti.Connection
foreach (var binding in queue.Bindings) foreach (var binding in queue.Bindings)
{ {
var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass); var routingKey = routingKeyStrategy.GetRoutingKey(binding.MessageClass);
channel.QueueBind(dynamicQueue.QueueName, Exchange, routingKey); channel.QueueBind(dynamicQueue.QueueName, SubscribeExchange, routingKey);
(binding as IDynamicQueueBinding)?.SetQueueName(dynamicQueue.QueueName); (binding as IDynamicQueueBinding)?.SetQueueName(dynamicQueue.QueueName);
} }
@ -173,6 +174,9 @@ namespace Tapeti.Connection
connection = connectionFactory.CreateConnection(); connection = connectionFactory.CreateConnection();
channelInstance = connection.CreateModel(); channelInstance = connection.CreateModel();
if (ConnectionParams.PrefetchCount > 0)
channelInstance.BasicQos(0, ConnectionParams.PrefetchCount, false);
break; break;
} }
catch (BrokerUnreachableException) catch (BrokerUnreachableException)

View File

@ -10,9 +10,9 @@ namespace Tapeti.Default
private readonly Regex namespaceRegEx; private readonly Regex namespaceRegEx;
public NamespaceMatchExchangeStrategy(string namespaceFormat = DefaultFormat) public NamespaceMatchExchangeStrategy()
{ {
namespaceRegEx = new Regex(namespaceFormat, RegexOptions.Compiled | RegexOptions.Singleline); namespaceRegEx = new Regex(DefaultFormat, RegexOptions.Compiled | RegexOptions.Singleline);
} }

View File

@ -0,0 +1,53 @@
using System;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;
using Tapeti.Config;
using Tapeti.Helpers;
namespace Tapeti.Default
{
public class PublishResultBinding : IBindingMiddleware
{
public void Handle(IBindingContext context, Action next)
{
next();
if (context.Result.HasHandler)
return;
bool isTask;
if (context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out isTask))
{
if (isTask)
{
context.Result.SetHandler(async (messageContext, value) =>
{
var message = await (Task<object>)value;
if (message != null)
await Reply(message, messageContext);
});
}
else
context.Result.SetHandler((messageContext, value) =>
value == null ? null : Reply(value, messageContext));
}
}
private Task Reply(object message, IMessageContext messageContext)
{
var publisher = (IInternalPublisher)messageContext.DependencyResolver.Resolve<IPublisher>();
var properties = new BasicProperties();
// Only set the property if it's not null, otherwise a string reference exception can occur:
// http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html
if (messageContext.Properties.IsCorrelationIdPresent())
properties.CorrelationId = messageContext.Properties.CorrelationId;
if (messageContext.Properties.IsReplyToPresent())
return publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties);
return publisher.Publish(message, properties);
}
}
}

View File

@ -1,6 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics.Eventing.Reader;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Tapeti.Helpers namespace Tapeti.Helpers

28
Helpers/TaskTypeHelper.cs Normal file
View File

@ -0,0 +1,28 @@
using System;
using System.Threading.Tasks;
namespace Tapeti.Helpers
{
public static class TaskTypeHelper
{
public static bool IsTypeOrTaskOf(this Type type, Func<Type, bool> predicate, out bool isTask)
{
if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Task<>))
{
isTask = true;
var genericArguments = type.GetGenericArguments();
return genericArguments.Length == 1 && predicate(genericArguments[0]);
}
isTask = false;
return predicate(type);
}
public static bool IsTypeOrTaskOf(this Type type, Type compareTo, out bool isTask)
{
return IsTypeOrTaskOf(type, t => t == compareTo, out isTask);
}
}
}

View File

@ -1,5 +1,4 @@
using System; using System;
using Tapeti.Config;
namespace Tapeti namespace Tapeti
{ {
@ -13,8 +12,12 @@ namespace Tapeti
public interface IDependencyContainer : IDependencyResolver public interface IDependencyContainer : IDependencyResolver
{ {
void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService; void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService;
void RegisterPublisher(Func<IPublisher> publisher); void RegisterDefault<TService>(Func<TService> factory) where TService : class;
void RegisterConfig(IConfig config);
void RegisterDefaultSingleton<TService, TImplementation>() where TService : class where TImplementation : class, TService;
void RegisterDefaultSingleton<TService>(TService instance) where TService : class;
void RegisterDefaultSingleton<TService>(Func<TService> factory) where TService : class;
void RegisterController(Type type); void RegisterController(Type type);
} }
} }

View File

@ -3,13 +3,15 @@ using RabbitMQ.Client;
namespace Tapeti namespace Tapeti
{ {
// Note: Tapeti assumes every implementation of IPublisher can also be cast to an IInternalPublisher.
// The distinction is made on purpose to trigger code-smells in non-Tapeti code when casting.
public interface IPublisher public interface IPublisher
{ {
Task Publish(object message); Task Publish(object message);
} }
public interface IAdvancedPublisher : IPublisher public interface IInternalPublisher : IPublisher
{ {
Task Publish(object message, IBasicProperties properties); Task Publish(object message, IBasicProperties properties);
Task PublishDirect(object message, string queueName, IBasicProperties properties); Task PublishDirect(object message, string queueName, IBasicProperties properties);

View File

@ -7,9 +7,9 @@ using System.Runtime.InteropServices;
[assembly: AssemblyTitle("Tapeti")] [assembly: AssemblyTitle("Tapeti")]
[assembly: AssemblyDescription("")] [assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")] [assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Hewlett-Packard Company")] [assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Tapeti")] [assembly: AssemblyProduct("Tapeti")]
[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] [assembly: AssemblyCopyright("")]
[assembly: AssemblyTrademark("")] [assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")] [assembly: AssemblyCulture("")]

View File

@ -0,0 +1,11 @@
namespace Tapeti.Flow
{
public static class ConfigExtensions
{
public static TapetiConfig WithFlow(this TapetiConfig config)
{
config.Use(new FlowMiddleware());
return config;
}
}
}

View File

@ -10,14 +10,10 @@ namespace Tapeti.Flow.Default
public async Task<bool> Accept(IMessageContext context, IBinding binding) public async Task<bool> Accept(IMessageContext context, IBinding binding)
{ {
var flowContext = await GetFlowContext(context); var flowContext = await GetFlowContext(context);
if (flowContext?.FlowState == null) if (flowContext?.ContinuationMetadata == null)
return false; return false;
string continuation; return flowContext.ContinuationMetadata.MethodName == MethodSerializer.Serialize(binding.Method);
if (!flowContext.FlowState.Continuations.TryGetValue(flowContext.ContinuationID, out continuation))
return false;
return continuation == MethodSerializer.Serialize(binding.Method);
} }
@ -35,27 +31,29 @@ namespace Tapeti.Flow.Default
var flowStore = context.DependencyResolver.Resolve<IFlowStore>(); var flowStore = context.DependencyResolver.Resolve<IFlowStore>();
var flowStateID = await flowStore.FindFlowStateID(continuationID); var flowID = await flowStore.FindFlowID(continuationID);
if (!flowStateID.HasValue) if (!flowID.HasValue)
return null; return null;
var flowStateLock = await flowStore.LockFlowState(flowStateID.Value); var flowStateLock = await flowStore.LockFlowState(flowID.Value);
if (flowStateLock == null) if (flowStateLock == null)
return null; return null;
var flowState = await flowStateLock.GetFlowState(); var flowState = await flowStateLock.GetFlowState();
if (flowState == null)
return null;
ContinuationMetadata continuation;
var flowMetadata = flowState != null ? Newtonsoft.Json.JsonConvert.DeserializeObject<FlowMetadata>(flowState.Metadata) : null;
//var continuationMetaData = Newtonsoft.Json.JsonConvert.DeserializeObject<ContinuationMetadata>(continuation.MetaData);
var flowContext = new FlowContext var flowContext = new FlowContext
{ {
MessageContext = context, MessageContext = context,
ContinuationID = continuationID,
FlowStateLock = flowStateLock, FlowStateLock = flowStateLock,
FlowState = flowState, FlowState = flowState,
Reply = flowMetadata?.Reply
ContinuationID = continuationID,
ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out continuation) ? continuation : null
}; };
// IDisposable items in the IMessageContext are automatically disposed // IDisposable items in the IMessageContext are automatically disposed

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using Tapeti.Config; using Tapeti.Config;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
@ -8,37 +9,13 @@ namespace Tapeti.Flow.Default
public IMessageContext MessageContext { get; set; } public IMessageContext MessageContext { get; set; }
public IFlowStateLock FlowStateLock { get; set; } public IFlowStateLock FlowStateLock { get; set; }
public FlowState FlowState { get; set; } public FlowState FlowState { get; set; }
public Guid ContinuationID { get; set; } public Guid ContinuationID { get; set; }
public ContinuationMetadata ContinuationMetadata { get; set; }
public FlowReplyMetadata Reply { get; set; }
public void Dispose() public void Dispose()
{ {
MessageContext?.Dispose();
FlowStateLock?.Dispose(); FlowStateLock?.Dispose();
} }
} }
internal class FlowReplyMetadata
{
public string ReplyTo { get; set; }
public string CorrelationId { get; set; }
public string ResponseTypeName { get; set; }
}
internal class FlowMetadata
{
public string ControllerTypeName { get; set; }
public FlowReplyMetadata Reply { get; set; }
}
internal class ContinuationMetadata
{
public string MethodName { get; set; }
public string ConvergeMethodName { get; set; }
}
} }

View File

@ -1,7 +1,6 @@
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
{ {

View File

@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using RabbitMQ.Client.Framing; using RabbitMQ.Client.Framing;
@ -11,13 +12,13 @@ namespace Tapeti.Flow.Default
public class FlowProvider : IFlowProvider, IFlowHandler public class FlowProvider : IFlowProvider, IFlowHandler
{ {
private readonly IConfig config; private readonly IConfig config;
private readonly IAdvancedPublisher publisher; private readonly IInternalPublisher publisher;
public FlowProvider(IConfig config, IPublisher publisher) public FlowProvider(IConfig config, IPublisher publisher)
{ {
this.config = config; this.config = config;
this.publisher = (IAdvancedPublisher)publisher; this.publisher = (IInternalPublisher)publisher;
} }
@ -55,11 +56,11 @@ namespace Tapeti.Flow.Default
var continuationID = Guid.NewGuid(); var continuationID = Guid.NewGuid();
context.FlowState.Continuations.Add(continuationID, context.FlowState.Continuations.Add(continuationID,
Newtonsoft.Json.JsonConvert.SerializeObject(new ContinuationMetadata new ContinuationMetadata
{ {
MethodName = responseHandlerInfo.MethodName, MethodName = responseHandlerInfo.MethodName,
ConvergeMethodName = null ConvergeMethodName = null
})); });
var properties = new BasicProperties var properties = new BasicProperties
{ {
@ -73,25 +74,32 @@ namespace Tapeti.Flow.Default
private async Task SendResponse(FlowContext context, object message) private async Task SendResponse(FlowContext context, object message)
{ {
if (context.Reply == null) var reply = context.FlowState.Metadata.Reply;
throw new InvalidOperationException("No response is required"); if (reply == null)
throw new YieldPointException("No response is required");
if (message.GetType().FullName != context.Reply.ResponseTypeName) if (message.GetType().FullName != reply.ResponseTypeName)
throw new InvalidOperationException($"Flow must end with a response message of type {context.Reply.ResponseTypeName}, {message.GetType().FullName} was returned instead"); throw new YieldPointException($"Flow must end with a response message of type {reply.ResponseTypeName}, {message.GetType().FullName} was returned instead");
var properties = new BasicProperties var properties = new BasicProperties();
{
CorrelationId = context.Reply.CorrelationId
};
await publisher.PublishDirect(message, context.Reply.ReplyTo, properties); // Only set the property if it's not null, otherwise a string reference exception can occur:
// http://rabbitmq.1065348.n5.nabble.com/SocketException-when-invoking-model-BasicPublish-td36330.html
if (reply.CorrelationId != null)
properties.CorrelationId = reply.CorrelationId;
// TODO disallow if replyto is not specified?
if (context.FlowState.Metadata.Reply.ReplyTo != null)
await publisher.PublishDirect(message, reply.ReplyTo, properties);
else
await publisher.Publish(message, properties);
} }
private static Task EndFlow(FlowContext context) private static Task EndFlow(FlowContext context)
{ {
if (context.Reply != null) if (context.FlowState.Metadata.Reply != null)
throw new InvalidOperationException($"Flow must end with a response message of type {context.Reply.ResponseTypeName}"); throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}");
return Task.CompletedTask; return Task.CompletedTask;
} }
@ -107,6 +115,10 @@ namespace Tapeti.Flow.Default
if (requestAttribute?.Response != null && requestAttribute.Response != binding.MessageClass) if (requestAttribute?.Response != null && requestAttribute.Response != binding.MessageClass)
throw new ArgumentException($"responseHandler must accept message of type {binding.MessageClass}", nameof(responseHandler)); throw new ArgumentException($"responseHandler must accept message of type {binding.MessageClass}", nameof(responseHandler));
var continuationAttribute = binding.Method.GetCustomAttribute<ContinuationAttribute>();
if (continuationAttribute == null)
throw new ArgumentException($"responseHandler must be marked with the Continuation attribute", nameof(responseHandler));
return new ResponseHandlerInfo return new ResponseHandlerInfo
{ {
MethodName = MethodSerializer.Serialize(responseHandler.Method), MethodName = MethodSerializer.Serialize(responseHandler.Method),
@ -115,16 +127,71 @@ namespace Tapeti.Flow.Default
} }
private static ReplyMetadata GetReply(IMessageContext context)
{
var requestAttribute = context.Message.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response == null)
return null;
return new ReplyMetadata
{
CorrelationId = context.Properties.CorrelationId,
ReplyTo = context.Properties.ReplyTo,
ResponseTypeName = requestAttribute.Response.FullName
};
}
public async Task Execute(IMessageContext context, IYieldPoint yieldPoint) public async Task Execute(IMessageContext context, IYieldPoint yieldPoint)
{ {
var flowContext = (FlowContext)context.Items[ContextItems.FlowContext];
if (flowContext == null)
return;
var delegateYieldPoint = (DelegateYieldPoint)yieldPoint; var delegateYieldPoint = (DelegateYieldPoint)yieldPoint;
await delegateYieldPoint.Execute(flowContext); var storeState = delegateYieldPoint.StoreState;
if (delegateYieldPoint.StoreState) FlowContext flowContext;
object flowContextItem;
if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextItem))
{
flowContext = new FlowContext
{
MessageContext = context
};
if (storeState)
{
// Initiate the flow
var flowStore = context.DependencyResolver.Resolve<IFlowStore>();
var flowID = Guid.NewGuid();
flowContext.FlowStateLock = await flowStore.LockFlowState(flowID);
if (flowContext.FlowStateLock == null)
throw new InvalidOperationException("Unable to lock a new flow");
flowContext.FlowState = await flowContext.FlowStateLock.GetFlowState();
if (flowContext.FlowState == null)
throw new InvalidOperationException("Unable to get state for new flow");
flowContext.FlowState.Metadata.Reply = GetReply(context);
}
}
else
flowContext = (FlowContext) flowContextItem;
try
{
await delegateYieldPoint.Execute(flowContext);
}
catch (YieldPointException e)
{
var controllerName = flowContext.MessageContext.Controller.GetType().FullName;
var methodName = flowContext.MessageContext.Binding.Method.Name;
throw new YieldPointException($"{e.Message} in controller {controllerName}, method {methodName}", e);
}
if (storeState)
{ {
flowContext.FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(context.Controller); flowContext.FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(context.Controller);
await flowContext.FlowStateLock.StoreFlowState(flowContext.FlowState); await flowContext.FlowStateLock.StoreFlowState(flowContext.FlowState);
@ -202,61 +269,5 @@ namespace Tapeti.Flow.Default
public string MethodName { get; set; } public string MethodName { get; set; }
public string ReplyToQueue { get; set; } public string ReplyToQueue { get; set; }
} }
/*
* Handle response (correlationId known)
internal async Task HandleMessage(object message, string correlationID)
{
var continuationID = Guid.Parse(correlationID);
var flowStateID = await owner.flowStore.FindFlowStateID(continuationID);
if (!flowStateID.HasValue)
return;
using (flowStateLock = await owner.flowStore.LockFlowState(flowStateID.Value))
{
flowState = await flowStateLock.GetFlowState();
continuation = flowState.Continuations[continuationID];
if (continuation != null)
await HandleContinuation(message);
}
}
private async Task HandleContinuation(object message)
{
var flowMetaData = Newtonsoft.Json.JsonConvert.DeserializeObject<FlowMetaData>(flowState.MetaData);
var continuationMetaData =
Newtonsoft.Json.JsonConvert.DeserializeObject<ContinuationMetaData>(continuation.MetaData);
reply = flowMetaData.Reply;
controllerType = owner.GetControllerType(flowMetaData.ControllerTypeName);
method = controllerType.GetMethod(continuationMetaData.MethodName);
controller = owner.container.GetInstance(controllerType);
Newtonsoft.Json.JsonConvert.PopulateObject(flowState.Data, controller);
var yieldPoint = (AbstractYieldPoint) await owner.CallFlowController(controller, method, message);
await yieldPoint.Execute(this);
if (yieldPoint.Store)
{
flowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(controller);
flowState.Continuations.Remove(continuation);
await flowStateLock.StoreFlowState(flowState);
}
else
{
await flowStateLock.DeleteFlowState();
}
}
}
*/
} }
} }

View File

@ -0,0 +1,95 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace Tapeti.Flow.Default
{
public class FlowState
{
private FlowMetadata metadata;
private Dictionary<Guid, ContinuationMetadata> continuations;
public FlowMetadata Metadata
{
get { return metadata ?? (metadata = new FlowMetadata()); }
set { metadata = value; }
}
public string Data { get; set; }
public Dictionary<Guid, ContinuationMetadata> Continuations
{
get { return continuations ?? (continuations = new Dictionary<Guid, ContinuationMetadata>()); }
set { continuations = value; }
}
public void Assign(FlowState value)
{
Metadata = value.Metadata.Clone();
Data = value.Data;
Continuations = value.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value.Clone());
}
public FlowState Clone()
{
var result = new FlowState();
result.Assign(this);
return result;
}
}
public class FlowMetadata
{
public ReplyMetadata Reply { get; set; }
public FlowMetadata Clone()
{
return new FlowMetadata
{
Reply = Reply?.Clone()
};
}
}
public class ReplyMetadata
{
public string ReplyTo { get; set; }
public string CorrelationId { get; set; }
public string ResponseTypeName { get; set; }
public ReplyMetadata Clone()
{
return new ReplyMetadata
{
ReplyTo = ReplyTo,
CorrelationId = CorrelationId,
ResponseTypeName = ResponseTypeName
};
}
}
public class ContinuationMetadata
{
public string MethodName { get; set; }
public string ConvergeMethodName { get; set; }
public ContinuationMetadata Clone()
{
return new ContinuationMetadata
{
MethodName = MethodName,
ConvergeMethodName = ConvergeMethodName
};
}
}
}

View File

@ -1,17 +1,19 @@
using System; using Newtonsoft.Json;
using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Tapeti.Flow namespace Tapeti.Flow.Default
{ {
public class FlowStore : IFlowStore public class FlowStore : IFlowStore
{ {
private static readonly ConcurrentDictionary<Guid, FlowState> FlowStates = new ConcurrentDictionary<Guid, FlowState>();
private static readonly ConcurrentDictionary<Guid, Guid> ContinuationLookup = new ConcurrentDictionary<Guid, Guid>();
private readonly IFlowRepository repository; private readonly IFlowRepository repository;
private readonly ConcurrentDictionary<Guid, FlowState> flowStates = new ConcurrentDictionary<Guid, FlowState>();
private readonly ConcurrentDictionary<Guid, Guid> continuationLookup = new ConcurrentDictionary<Guid, Guid>();
public FlowStore(IFlowRepository repository) public FlowStore(IFlowRepository repository)
@ -22,41 +24,37 @@ namespace Tapeti.Flow
public async Task Load() public async Task Load()
{ {
flowStates.Clear(); FlowStates.Clear();
continuationLookup.Clear(); ContinuationLookup.Clear();
foreach (var state in await repository.GetAllStates()) foreach (var flowStateRecord in await repository.GetStates())
{ {
flowStates.GetOrAdd(state.FlowID, new FlowState var flowState = ToFlowState(flowStateRecord);
{ FlowStates.GetOrAdd(flowStateRecord.FlowID, flowState);
Metadata = state.Metadata,
Data = state.Data,
Continuations = state.Continuations
});
foreach (var continuation in state.Continuations) foreach (var continuation in flowStateRecord.ContinuationMetadata)
continuationLookup.GetOrAdd(continuation.Key, state.FlowID); ContinuationLookup.GetOrAdd(continuation.Key, flowStateRecord.FlowID);
} }
} }
public Task<Guid?> FindFlowStateID(Guid continuationID) public Task<Guid?> FindFlowID(Guid continuationID)
{ {
Guid result; Guid result;
return Task.FromResult(continuationLookup.TryGetValue(continuationID, out result) ? result : (Guid?)null); return Task.FromResult(ContinuationLookup.TryGetValue(continuationID, out result) ? result : (Guid?)null);
} }
public async Task<IFlowStateLock> LockFlowState(Guid flowStateID) public async Task<IFlowStateLock> LockFlowState(Guid flowID)
{ {
var isNew = false; var isNew = false;
var flowState = flowStates.GetOrAdd(flowStateID, id => var flowState = FlowStates.GetOrAdd(flowID, id =>
{ {
isNew = true; isNew = true;
return new FlowState(); return new FlowState();
}); });
var result = new FlowStateLock(this, flowState, flowStateID, isNew); var result = new FlowStateLock(this, flowState, flowID, isNew);
await result.Lock(); await result.Lock();
return result; return result;
@ -103,7 +101,7 @@ namespace Tapeti.Flow
} }
} }
public Guid FlowStateID => flowID; public Guid FlowID => flowID;
public Task<FlowState> GetFlowState() public Task<FlowState> GetFlowState()
{ {
@ -112,12 +110,7 @@ namespace Tapeti.Flow
if (isDisposed) if (isDisposed)
throw new ObjectDisposedException("FlowStateLock"); throw new ObjectDisposedException("FlowStateLock");
return Task.FromResult(new FlowState return Task.FromResult(flowState.Clone());
{
Data = flowState.Data,
Metadata = flowState.Metadata,
Continuations = flowState.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value)
});
} }
} }
@ -128,37 +121,29 @@ namespace Tapeti.Flow
if (isDisposed) if (isDisposed)
throw new ObjectDisposedException("FlowStateLock"); throw new ObjectDisposedException("FlowStateLock");
foreach ( foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k)))
var removedContinuation in
flowState.Continuations.Keys.Where(
k => !newFlowState.Continuations.ContainsKey(k)))
{ {
Guid removedValue; Guid removedValue;
owner.continuationLookup.TryRemove(removedContinuation, out removedValue); ContinuationLookup.TryRemove(removedContinuation, out removedValue);
} }
foreach ( foreach (var addedContinuation in newFlowState.Continuations.Where(c => !flowState.Continuations.ContainsKey(c.Key)))
var addedContinuation in
newFlowState.Continuations.Where(
c => !flowState.Continuations.ContainsKey(c.Key)))
{ {
owner.continuationLookup.TryAdd(addedContinuation.Key, flowID); ContinuationLookup.TryAdd(addedContinuation.Key, flowID);
} }
flowState.Metadata = newFlowState.Metadata; flowState.Assign(newFlowState);
flowState.Data = newFlowState.Data;
flowState.Continuations = newFlowState.Continuations.ToDictionary(kv => kv.Key, kv => kv.Value);
} }
if (isNew) if (isNew)
{ {
isNew = false; isNew = false;
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
await await owner.repository.CreateState(ToFlowStateRecord(flowID, flowState), now);
owner.repository.CreateState(flowID, now, flowState.Metadata, flowState.Data, flowState.Continuations);
} }
else else
{ {
await owner.repository.UpdateState(flowID, flowState.Metadata, flowState.Data, flowState.Continuations); await owner.repository.UpdateState(ToFlowStateRecord(flowID, flowState));
} }
} }
@ -172,15 +157,42 @@ namespace Tapeti.Flow
foreach (var removedContinuation in flowState.Continuations.Keys) foreach (var removedContinuation in flowState.Continuations.Keys)
{ {
Guid removedValue; Guid removedValue;
owner.continuationLookup.TryRemove(removedContinuation, out removedValue); ContinuationLookup.TryRemove(removedContinuation, out removedValue);
} }
FlowState removedFlow; FlowState removedFlow;
owner.flowStates.TryRemove(flowID, out removedFlow); FlowStates.TryRemove(flowID, out removedFlow);
} }
if (!isNew) if (!isNew)
await owner.repository.DeleteState(flowID); await owner.repository.DeleteState(flowID);
} }
}
private static FlowStateRecord ToFlowStateRecord(Guid flowID, FlowState flowState)
{
return new FlowStateRecord
{
FlowID = flowID,
Metadata = JsonConvert.SerializeObject(flowState.Metadata),
Data = flowState.Data,
ContinuationMetadata = flowState.Continuations.ToDictionary(
kv => kv.Key,
kv => JsonConvert.SerializeObject(kv.Value))
};
}
private static FlowState ToFlowState(FlowStateRecord flowStateRecord)
{
return new FlowState
{
Metadata = JsonConvert.DeserializeObject<FlowMetadata>(flowStateRecord.Metadata),
Data = flowStateRecord.Data,
Continuations = flowStateRecord.ContinuationMetadata.ToDictionary(
kv => kv.Key,
kv => JsonConvert.DeserializeObject<ContinuationMetadata>(kv.Value))
};
} }
} }
} }

View File

@ -7,17 +7,17 @@ namespace Tapeti.Flow.Default
{ {
public class NonPersistentFlowRepository : IFlowRepository public class NonPersistentFlowRepository : IFlowRepository
{ {
public Task<IQueryable<FlowStateRecord>> GetAllStates() public Task<IQueryable<FlowStateRecord>> GetStates()
{ {
return Task.FromResult(new List<FlowStateRecord>().AsQueryable()); return Task.FromResult(new List<FlowStateRecord>().AsQueryable());
} }
public Task CreateState(Guid flowID, DateTime timestamp, string metadata, string data, IDictionary<Guid, string> continuations) public Task CreateState(FlowStateRecord stateRecord, DateTime timestamp)
{ {
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task UpdateState(Guid flowID, string metadata, string data, IDictionary<Guid, string> continuations) public Task UpdateState(FlowStateRecord stateRecord)
{ {
return Task.CompletedTask; return Task.CompletedTask;
} }

View File

@ -5,6 +5,7 @@ using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
using Tapeti.Flow.Annotations; using Tapeti.Flow.Annotations;
using Tapeti.Flow.Default; using Tapeti.Flow.Default;
using Tapeti.Helpers;
namespace Tapeti.Flow namespace Tapeti.Flow
{ {
@ -13,13 +14,14 @@ namespace Tapeti.Flow
public IEnumerable<object> GetContents(IDependencyResolver dependencyResolver) public IEnumerable<object> GetContents(IDependencyResolver dependencyResolver)
{ {
var container = dependencyResolver as IDependencyContainer; var container = dependencyResolver as IDependencyContainer;
// ReSharper disable once InvertIf
if (container != null) if (container != null)
{ {
container.RegisterDefault<IFlowProvider, FlowProvider>(); container.RegisterDefault<IFlowProvider, FlowProvider>();
container.RegisterDefault<IFlowHandler, FlowProvider>(); container.RegisterDefault<IFlowHandler, FlowProvider>();
// TODO singleton
container.RegisterDefault<IFlowStore, FlowStore>();
container.RegisterDefault<IFlowRepository, NonPersistentFlowRepository>(); container.RegisterDefault<IFlowRepository, NonPersistentFlowRepository>();
container.RegisterDefault<IFlowStore, FlowStore>();
} }
return new[] { new FlowBindingMiddleware() }; return new[] { new FlowBindingMiddleware() };
@ -30,33 +32,32 @@ namespace Tapeti.Flow
{ {
public void Handle(IBindingContext context, Action next) public void Handle(IBindingContext context, Action next)
{ {
HandleContinuationFilter(context); RegisterContinuationFilter(context);
HandleYieldPointResult(context); RegisterYieldPointResult(context);
next(); next();
} }
private static void HandleContinuationFilter(IBindingContext context) private static void RegisterContinuationFilter(IBindingContext context)
{ {
var continuationAttribute = context.Method.GetCustomAttribute<ContinuationAttribute>(); var continuationAttribute = context.Method.GetCustomAttribute<ContinuationAttribute>();
if (continuationAttribute != null) if (continuationAttribute == null)
{ return;
context.Use(new FlowBindingFilter()); context.Use(new FlowBindingFilter());
context.Use(new FlowMessageMiddleware()); context.Use(new FlowMessageMiddleware());
} }
}
private static void HandleYieldPointResult(IBindingContext context) private static void RegisterYieldPointResult(IBindingContext context)
{ {
if (context.Result.Info.ParameterType == typeof(IYieldPoint)) bool isTask;
context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value)); if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out isTask))
return;
else if (context.Result.Info.ParameterType == typeof(Task<>)) if (isTask)
{ {
var genericArguments = context.Result.Info.ParameterType.GetGenericArguments();
if (genericArguments.Length == 1 && genericArguments[0] == typeof(IYieldPoint))
context.Result.SetHandler(async (messageContext, value) => context.Result.SetHandler(async (messageContext, value) =>
{ {
var yieldPoint = await (Task<IYieldPoint>)value; var yieldPoint = await (Task<IYieldPoint>)value;
@ -64,6 +65,8 @@ namespace Tapeti.Flow
await HandleYieldPoint(messageContext, yieldPoint); await HandleYieldPoint(messageContext, yieldPoint);
}); });
} }
else
context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value));
} }

View File

@ -7,9 +7,9 @@ namespace Tapeti.Flow
{ {
public interface IFlowRepository public interface IFlowRepository
{ {
Task<IQueryable<FlowStateRecord>> GetAllStates(); Task<IQueryable<FlowStateRecord>> GetStates();
Task CreateState(Guid flowID, DateTime timestamp, string metadata, string data, IDictionary<Guid, string> continuations); Task CreateState(FlowStateRecord stateRecord, DateTime timestamp);
Task UpdateState(Guid flowID, string metadata, string data, IDictionary<Guid, string> continuations); Task UpdateState(FlowStateRecord stateRecord);
Task DeleteState(Guid flowID); Task DeleteState(Guid flowID);
} }
@ -19,6 +19,6 @@ namespace Tapeti.Flow
public Guid FlowID; public Guid FlowID;
public string Metadata; public string Metadata;
public string Data; public string Data;
public Dictionary<Guid, string> Continuations; public Dictionary<Guid, string> ContinuationMetadata;
} }
} }

View File

@ -1,28 +1,23 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Flow.Default;
namespace Tapeti.Flow namespace Tapeti.Flow
{ {
public interface IFlowStore public interface IFlowStore
{ {
Task Load(); Task Load();
Task<Guid?> FindFlowStateID(Guid continuationID); Task<Guid?> FindFlowID(Guid continuationID);
Task<IFlowStateLock> LockFlowState(Guid flowStateID); Task<IFlowStateLock> LockFlowState(Guid flowID);
} }
public interface IFlowStateLock : IDisposable public interface IFlowStateLock : IDisposable
{ {
Guid FlowStateID { get; } Guid FlowID { get; }
Task<FlowState> GetFlowState(); Task<FlowState> GetFlowState();
Task StoreFlowState(FlowState flowState); Task StoreFlowState(FlowState flowState);
Task DeleteFlowState(); Task DeleteFlowState();
} }
public class FlowState
{
public string Metadata { get; set; }
public string Data { get; set; }
public Dictionary<Guid, string> Continuations { get; set; }
}
} }

View File

@ -1,5 +1,4 @@
using System.Reflection; using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following // General Information about an assembly is controlled through the following

View File

@ -55,8 +55,10 @@
<Compile Include="Default\FlowBindingFilter.cs" /> <Compile Include="Default\FlowBindingFilter.cs" />
<Compile Include="Default\FlowContext.cs" /> <Compile Include="Default\FlowContext.cs" />
<Compile Include="Default\FlowMessageMiddleware.cs" /> <Compile Include="Default\FlowMessageMiddleware.cs" />
<Compile Include="Default\FlowState.cs" />
<Compile Include="Default\NonPersistentFlowRepository.cs" /> <Compile Include="Default\NonPersistentFlowRepository.cs" />
<Compile Include="Default\DelegateYieldPoint.cs" /> <Compile Include="Default\DelegateYieldPoint.cs" />
<Compile Include="ConfigExtensions.cs" />
<Compile Include="FlowHelpers\MethodSerializer.cs" /> <Compile Include="FlowHelpers\MethodSerializer.cs" />
<Compile Include="FlowMiddleware.cs" /> <Compile Include="FlowMiddleware.cs" />
<Compile Include="Default\FlowStore.cs" /> <Compile Include="Default\FlowStore.cs" />
@ -65,6 +67,7 @@
<Compile Include="IFlowStore.cs" /> <Compile Include="IFlowStore.cs" />
<Compile Include="IFlowProvider.cs" /> <Compile Include="IFlowProvider.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="YieldPointException.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Tapeti.csproj"> <ProjectReference Include="..\Tapeti.csproj">

View File

@ -0,0 +1,10 @@
using System;
namespace Tapeti.Flow
{
public class YieldPointException : Exception
{
public YieldPointException(string message) : base(message) { }
public YieldPointException(string message, Exception innerException) : base(message, innerException) { }
}
}

View File

@ -1,7 +1,6 @@
using System; using System;
using System.Linq; using System.Linq;
using SimpleInjector; using SimpleInjector;
using Tapeti.Config;
namespace Tapeti.SimpleInjector namespace Tapeti.SimpleInjector
{ {
@ -27,29 +26,44 @@ namespace Tapeti.SimpleInjector
public void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService public void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService
{ {
// ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates if (CanRegisterDefault<TService>())
if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(TService)))
container.Register<TService, TImplementation>(); container.Register<TService, TImplementation>();
} }
public void RegisterDefault<TService>(Func<TService> factory) where TService : class
public void RegisterConfig(IConfig config)
{ {
container.RegisterSingleton(config); if (CanRegisterDefault<TService>())
container.Register(factory);
} }
public void RegisterDefaultSingleton<TService, TImplementation>() where TService : class where TImplementation : class, TService
public void RegisterPublisher(Func<IPublisher> publisher)
{ {
// ReSharper disable once SimplifyLinqExpression - still not a fan of negative predicates if (CanRegisterDefault<TService>())
if (!container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(IPublisher))) container.RegisterSingleton<TService, TImplementation>();
container.Register(publisher);
} }
public void RegisterDefaultSingleton<TService>(TService instance) where TService : class
{
if (CanRegisterDefault<TService>())
container.RegisterSingleton(instance);
}
public void RegisterDefaultSingleton<TService>(Func<TService> factory) where TService : class
{
if (CanRegisterDefault<TService>())
container.RegisterSingleton(factory);
}
public void RegisterController(Type type) public void RegisterController(Type type)
{ {
container.Register(type); container.Register(type);
} }
private bool CanRegisterDefault<TService>() where TService : class
{
// ReSharper disable once SimplifyLinqExpression - not a fan of negative predicates
return !container.GetCurrentRegistrations().Any(ip => ip.ServiceType == typeof(TService));
}
} }
} }

View File

@ -61,10 +61,12 @@
<Compile Include="Default\ConsoleLogger.cs" /> <Compile Include="Default\ConsoleLogger.cs" />
<Compile Include="Default\DevNullLogger.cs" /> <Compile Include="Default\DevNullLogger.cs" />
<Compile Include="Default\JsonMessageSerializer.cs" /> <Compile Include="Default\JsonMessageSerializer.cs" />
<Compile Include="Default\PublishResultBinding.cs" />
<Compile Include="Default\NamespaceMatchExchangeStrategy.cs" /> <Compile Include="Default\NamespaceMatchExchangeStrategy.cs" />
<Compile Include="Default\TypeNameRoutingKeyStrategy.cs" /> <Compile Include="Default\TypeNameRoutingKeyStrategy.cs" />
<Compile Include="Helpers\ConsoleHelper.cs" /> <Compile Include="Helpers\ConsoleHelper.cs" />
<Compile Include="Helpers\MiddlewareHelper.cs" /> <Compile Include="Helpers\MiddlewareHelper.cs" />
<Compile Include="Helpers\TaskTypeHelper.cs" />
<Compile Include="IConnection.cs" /> <Compile Include="IConnection.cs" />
<Compile Include="IExchangeStrategy.cs" /> <Compile Include="IExchangeStrategy.cs" />
<Compile Include="ILogger.cs" /> <Compile Include="ILogger.cs" />

View File

@ -1,5 +0,0 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/Housekeeping/Bookmarks/NumberedBookmarks/=Bookmark9/@KeyIndexDefined">True</s:Boolean>
<s:String x:Key="/Default/Housekeeping/Bookmarks/NumberedBookmarks/=Bookmark9/Coords/@EntryValue">(Doc Ln 29 Col 51)</s:String>
<s:String x:Key="/Default/Housekeeping/Bookmarks/NumberedBookmarks/=Bookmark9/FileId/@EntryValue">F84AD920-D5A1-455D-AED5-2542B3A47B85/d:Default/f:FlowProvider.cs</s:String>
<s:String x:Key="/Default/Housekeeping/Bookmarks/NumberedBookmarks/=Bookmark9/Owner/@EntryValue">NumberedBookmarkManager</s:String></wpf:ResourceDictionary>

View File

@ -16,7 +16,7 @@ namespace Tapeti
public TopologyConfigurationException(string message) : base(message) { } public TopologyConfigurationException(string message) : base(message) { }
} }
public delegate Task<object> MessageHandlerFunc(IMessageContext context, object message); public delegate Task MessageHandlerFunc(IMessageContext context, object message);
public class TapetiConfig public class TapetiConfig
@ -27,17 +27,18 @@ namespace Tapeti
private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>(); private readonly List<IBindingMiddleware> bindingMiddleware = new List<IBindingMiddleware>();
private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>(); private readonly List<IMessageMiddleware> messageMiddleware = new List<IMessageMiddleware>();
private readonly string exchange; private readonly string subscribeExchange;
private readonly IDependencyResolver dependencyResolver; private readonly IDependencyResolver dependencyResolver;
public TapetiConfig(string exchange, IDependencyResolver dependencyResolver) public TapetiConfig(string subscribeExchange, IDependencyResolver dependencyResolver)
{ {
this.exchange = exchange; this.subscribeExchange = subscribeExchange;
this.dependencyResolver = dependencyResolver; this.dependencyResolver = dependencyResolver;
Use(new DependencyResolverBinding()); Use(new DependencyResolverBinding());
Use(new MessageBinding()); Use(new MessageBinding());
Use(new PublishResultBinding());
} }
@ -62,8 +63,8 @@ namespace Tapeti
queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl))); queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl)));
var config = new Config(exchange, dependencyResolver, messageMiddleware, queues); var config = new Config(subscribeExchange, dependencyResolver, messageMiddleware, queues);
(dependencyResolver as IDependencyContainer)?.RegisterConfig(config); (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton<IConfig>(config);
return config; return config;
} }
@ -139,7 +140,8 @@ namespace Tapeti
QueueInfo = methodQueueInfo, QueueInfo = methodQueueInfo,
MessageClass = context.MessageClass, MessageClass = context.MessageClass,
MessageHandler = messageHandler, MessageHandler = messageHandler,
MessageMiddleware = context.MessageMiddleware MessageMiddleware = context.MessageMiddleware,
BindingFilters = context.BindingFilters
}; };
if (methodQueueInfo.Dynamic.GetValueOrDefault()) if (methodQueueInfo.Dynamic.GetValueOrDefault())
@ -177,61 +179,59 @@ namespace Tapeti
var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList(); var invalidBindings = context.Parameters.Where(p => !p.HasBinding).ToList();
// ReSharper disable once InvertIf - doesn't make the flow clearer imo // ReSharper disable once InvertIf
if (invalidBindings.Count > 0) if (invalidBindings.Count > 0)
{ {
var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name)); var parameterNames = string.Join(", ", invalidBindings.Select(p => p.Info.Name));
throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}"); throw new TopologyConfigurationException($"Method {method.Name} in controller {method.DeclaringType?.Name} has unknown parameters: {parameterNames}");
} }
return WrapMethod(method, context.Parameters.Select(p => ((IBindingParameterAccess)p).GetBinding())); var resultHandler = ((IBindingResultAccess) context.Result).GetHandler();
return WrapMethod(method, context.Parameters.Select(p => ((IBindingParameterAccess)p).GetBinding()), resultHandler);
} }
protected MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameters) protected MessageHandlerFunc WrapMethod(MethodInfo method, IEnumerable<ValueFactory> parameters, ResultHandler resultHandler)
{ {
if (resultHandler != null)
return WrapResultHandlerMethod(method, parameters, resultHandler);
if (method.ReturnType == typeof(void)) if (method.ReturnType == typeof(void))
return WrapNullMethod(method, parameters); return WrapNullMethod(method, parameters);
if (method.ReturnType == typeof(Task)) if (method.ReturnType == typeof(Task))
return WrapTaskMethod(method, parameters); return WrapTaskMethod(method, parameters);
if (method.ReturnType == typeof(Task<>)) if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
{
var genericArguments = method.GetGenericArguments();
if (genericArguments.Length != 1)
throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} must have exactly one generic argument to Task<>");
if (!genericArguments[0].IsClass)
throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} must have an object generic argument to Task<>");
return WrapGenericTaskMethod(method, parameters); return WrapGenericTaskMethod(method, parameters);
}
if (method.ReturnType.IsClass)
return WrapObjectMethod(method, parameters); return WrapObjectMethod(method, parameters);
throw new ArgumentException($"Method {method.Name} in controller {method.DeclaringType?.Name} has an invalid return type");
} }
protected MessageHandlerFunc WrapResultHandlerMethod(MethodInfo method, IEnumerable<ValueFactory> parameters, ResultHandler resultHandler)
{
return (context, message) =>
{
var result = method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
return resultHandler(context, result);
};
}
protected MessageHandlerFunc WrapNullMethod(MethodInfo method, IEnumerable<ValueFactory> parameters) protected MessageHandlerFunc WrapNullMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
{ {
return (context, message) => return (context, message) =>
{ {
method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray()); method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
return Task.FromResult<object>(null); return Task.CompletedTask;
}; };
} }
protected MessageHandlerFunc WrapTaskMethod(MethodInfo method, IEnumerable<ValueFactory> parameters) protected MessageHandlerFunc WrapTaskMethod(MethodInfo method, IEnumerable<ValueFactory> parameters)
{ {
return async (context, message) => return (context, message) => (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
{
await (Task)method.Invoke(context.Controller, parameters.Select(p => p(context)).ToArray());
return Task.FromResult<object>(null);
};
} }
@ -310,7 +310,7 @@ namespace Tapeti
protected class Config : IConfig protected class Config : IConfig
{ {
public string Exchange { get; } public string SubscribeExchange { get; }
public IDependencyResolver DependencyResolver { get; } public IDependencyResolver DependencyResolver { get; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; } public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
public IEnumerable<IQueue> Queues { get; } public IEnumerable<IQueue> Queues { get; }
@ -318,9 +318,9 @@ namespace Tapeti
private readonly Dictionary<MethodInfo, IBinding> bindingMethodLookup; private readonly Dictionary<MethodInfo, IBinding> bindingMethodLookup;
public Config(string exchange, IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware, IEnumerable<IQueue> queues) public Config(string subscribeExchange, IDependencyResolver dependencyResolver, IReadOnlyList<IMessageMiddleware> messageMiddleware, IEnumerable<IQueue> queues)
{ {
Exchange = exchange; SubscribeExchange = subscribeExchange;
DependencyResolver = dependencyResolver; DependencyResolver = dependencyResolver;
MessageMiddleware = messageMiddleware; MessageMiddleware = messageMiddleware;
Queues = queues.ToList(); Queues = queues.ToList();
@ -361,6 +361,7 @@ namespace Tapeti
public string QueueName { get; set; } public string QueueName { get; set; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; set; } public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; set; }
public IReadOnlyList<IBindingFilter> BindingFilters { get; set; }
private QueueInfo queueInfo; private QueueInfo queueInfo;
public QueueInfo QueueInfo public QueueInfo QueueInfo
@ -382,13 +383,25 @@ namespace Tapeti
} }
public bool Accept(object message) public async Task<bool> Accept(IMessageContext context, object message)
{ {
return message.GetType() == MessageClass; if (message.GetType() != MessageClass)
return false;
if (BindingFilters == null)
return true;
foreach (var filter in BindingFilters)
{
if (!await filter.Accept(context, this))
return false;
}
return true;
} }
public Task<object> Invoke(IMessageContext context, object message) public Task Invoke(IMessageContext context, object message)
{ {
return MessageHandler(context, message); return MessageHandler(context, message);
} }

View File

@ -16,12 +16,12 @@ namespace Tapeti
public TapetiConnection(IConfig config) public TapetiConnection(IConfig config)
{ {
this.config = config; this.config = config;
(config.DependencyResolver as IDependencyContainer)?.RegisterPublisher(GetPublisher); (config.DependencyResolver as IDependencyContainer)?.RegisterDefault(GetPublisher);
worker = new Lazy<TapetiWorker>(() => new TapetiWorker(config.DependencyResolver, config.MessageMiddleware) worker = new Lazy<TapetiWorker>(() => new TapetiWorker(config.DependencyResolver, config.MessageMiddleware)
{ {
ConnectionParams = Params ?? new TapetiConnectionParams(), ConnectionParams = Params ?? new TapetiConnectionParams(),
Exchange = config.Exchange SubscribeExchange = config.SubscribeExchange
}); });
} }

View File

@ -10,6 +10,13 @@ namespace Tapeti
public string Username { get; set; } = "guest"; public string Username { get; set; } = "guest";
public string Password { get; set; } = "guest"; public string Password { get; set; } = "guest";
/// <summary>
/// The amount of message to prefetch. See http://www.rabbitmq.com/consumer-prefetch.html for more information.
///
/// If set to 0, no limit will be applied.
/// </summary>
public ushort PrefetchCount { get; set; } = 50;
public TapetiConnectionParams() public TapetiConnectionParams()
{ {

View File

@ -30,18 +30,13 @@ namespace Test
* The Visualizer could've been injected through the constructor, which is * The Visualizer could've been injected through the constructor, which is
* the recommended way. Just testing the injection middleware here. * the recommended way. Just testing the injection middleware here.
*/ */
public async Task Marco(MarcoMessage message, Visualizer myVisualizer) public async Task<IYieldPoint> Marco(MarcoMessage message, Visualizer myVisualizer)
{ {
Console.WriteLine(">> Marco (yielding with request)");
await myVisualizer.VisualizeMarco(); await myVisualizer.VisualizeMarco();
await publisher.Publish(new PoloMessage());
}
return flowProvider.YieldWithRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(
public IYieldPoint Polo(PoloMessage message)
{
StateTestGuid = Guid.NewGuid();
return flowProvider.YieldWithRequest<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(
new PoloConfirmationRequestMessage() new PoloConfirmationRequestMessage()
{ {
StoredInState = StateTestGuid StoredInState = StateTestGuid
@ -50,10 +45,15 @@ namespace Test
} }
public async Task<IYieldPoint> HandlePoloConfirmationResponse(PoloConfirmationResponseMessage message) [Continuation]
public IYieldPoint HandlePoloConfirmationResponse(PoloConfirmationResponseMessage message)
{ {
await visualizer.VisualizePolo(message.ShouldMatchState.Equals(StateTestGuid)); Console.WriteLine(">> HandlePoloConfirmationResponse (ending flow)");
return flowProvider.End();
Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!");
// This should error, as MarcoMessage expects a PoloMessage as a response
return flowProvider.EndWithResponse(new PoloMessage());
} }
@ -63,16 +63,28 @@ namespace Test
* use the replyTo header of the request if provided. * use the replyTo header of the request if provided.
*/ */
// TODO validation middleware to ensure a request message returns the specified response (already done for IYieldPoint methods)
public PoloConfirmationResponseMessage PoloConfirmation(PoloConfirmationRequestMessage message) public PoloConfirmationResponseMessage PoloConfirmation(PoloConfirmationRequestMessage message)
{ {
Console.WriteLine(">> PoloConfirmation (returning confirmation)");
return new PoloConfirmationResponseMessage return new PoloConfirmationResponseMessage
{ {
ShouldMatchState = message.StoredInState ShouldMatchState = message.StoredInState
}; };
} }
public void Polo(PoloMessage message)
{
Console.WriteLine(">> Polo");
StateTestGuid = Guid.NewGuid();
}
} }
[Request(Response = typeof(PoloMessage))]
public class MarcoMessage public class MarcoMessage
{ {
} }

View File

@ -17,10 +17,14 @@ namespace Test
public async Task Run() public async Task Run()
{ {
await publisher.Publish(new MarcoMessage());
/*
var concurrent = new SemaphoreSlim(20); var concurrent = new SemaphoreSlim(20);
//for (var x = 0; x < 5000; x++)
while (true) while (true)
{
for (var x = 0; x < 200; x++)
{ {
await concurrent.WaitAsync(); await concurrent.WaitAsync();
try try
@ -32,6 +36,15 @@ namespace Test
concurrent.Release(); concurrent.Release();
} }
} }
await Task.Delay(1000);
}
*/
while (true)
{
await Task.Delay(1000);
}
} }
} }
} }

View File

@ -13,14 +13,20 @@ namespace Test
var container = new Container(); var container = new Container();
container.Register<MarcoEmitter>(); container.Register<MarcoEmitter>();
container.Register<Visualizer>(); container.Register<Visualizer>();
//container.RegisterSingleton<ISagaStore, SagaMemoryStore>();
var config = new TapetiConfig("test", new SimpleInjectorDependencyResolver(container)) var config = new TapetiConfig("test", new SimpleInjectorDependencyResolver(container))
.Use(new FlowMiddleware()) .WithFlow()
.RegisterAllControllers() .RegisterAllControllers()
.Build(); .Build();
using (var connection = new TapetiConnection(config)) using (var connection = new TapetiConnection(config)
{
Params = new TapetiConnectionParams
{
HostName = "localhost",
PrefetchCount = 200
}
})
{ {
Console.WriteLine("Subscribing..."); Console.WriteLine("Subscribing...");
connection.Subscribe().Wait(); connection.Subscribe().Wait();

View File

@ -7,9 +7,9 @@ using System.Runtime.InteropServices;
[assembly: AssemblyTitle("Test")] [assembly: AssemblyTitle("Test")]
[assembly: AssemblyDescription("")] [assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")] [assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Hewlett-Packard Company")] [assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Test")] [assembly: AssemblyProduct("Test")]
[assembly: AssemblyCopyright("Copyright © Hewlett-Packard Company 2016")] [assembly: AssemblyCopyright("")]
[assembly: AssemblyTrademark("")] [assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")] [assembly: AssemblyCulture("")]

View File

@ -11,9 +11,9 @@ namespace Test
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task VisualizePolo(bool matches) public Task VisualizePolo()
{ {
Console.WriteLine(matches ? "Polo!" : "Oops! Mismatch!"); Console.WriteLine("Polo!");
return Task.CompletedTask; return Task.CompletedTask;
} }
} }