diff --git a/Tapeti.Flow.SQL/ConfigExtensions.cs b/Tapeti.Flow.SQL/ConfigExtensions.cs index 686ef9e..c5e660d 100644 --- a/Tapeti.Flow.SQL/ConfigExtensions.cs +++ b/Tapeti.Flow.SQL/ConfigExtensions.cs @@ -5,9 +5,9 @@ namespace Tapeti.Flow.SQL { public static class ConfigExtensions { - public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config) + public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config, string connectionString, int serviceId, string schema = "dbo") { - config.Use(new FlowSqlRepositoryBundle()); + config.Use(new FlowSqlRepositoryBundle(connectionString, serviceId, schema)); return config; } } @@ -15,19 +15,25 @@ namespace Tapeti.Flow.SQL internal class FlowSqlRepositoryBundle : ITapetiExtension { - /* - public IEnumerable GetContents(IDependencyResolver dependencyResolver) - { - ((IDependencyContainer)dependencyResolver)?.RegisterDefault(); + private readonly string connectionString; + private readonly string schema; + private readonly int serviceId; - return null; + + public FlowSqlRepositoryBundle(string connectionString, int serviceId, string schema) + { + this.connectionString = connectionString; + this.serviceId = serviceId; + this.schema = schema; } - */ + public void RegisterDefaults(IDependencyContainer container) { + container.RegisterDefault(() => new SqlConnectionFlowRepository(connectionString, serviceId, schema)); } + public IEnumerable GetMiddleware(IDependencyResolver dependencyResolver) { return null; diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs new file mode 100644 index 0000000..15c351a --- /dev/null +++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Data.SqlClient; +using System.Linq; +using System.Threading.Tasks; + +namespace Tapeti.Flow.SQL +{ + /* + Assumes the following table layout (schema configurable): + + + create table dbo.Flow + ( + FlowID uniqueidentifier not null, + ServiceID int not null, + CreationTime datetime2(3) not null, + Metadata nvarchar(max) null, + Flowdata nvarchar(max) null, + + constraint PK_Flow primary key clustered (FlowID) + ); + + create table dbo.FlowContinuation + ( + FlowID uniqueidentifier not null, + ContinuationID uniqueidentifier not null, + Metadata nvarchar(max) null, + + constraint PK_FlowContinuation primary key clustered (FlowID, ContinuationID) + ); + go; + + alter table shared.FlowContinuation with check add constraint FK_FlowContinuation_Flow foreign key (FlowID) references shared.Flow (FlowID); + */ + public class SqlConnectionFlowRepository : IFlowRepository + { + private readonly string connectionString; + private readonly int serviceId; + private readonly string schema; + + + public SqlConnectionFlowRepository(string connectionString, int serviceId, string schema) + { + this.connectionString = connectionString; + this.serviceId = serviceId; + this.schema = schema; + } + + + public async Task> GetStates() + { + var result = new List(); + + using (var connection = await GetConnection()) + { + var flowQuery = new SqlCommand($"select FlowID, Metadata, Flowdata from {schema}.Flow " + + "where ServiceID = @ServiceID " + + "order by FlowID", connection); + var flowServiceParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int); + + var continuationQuery = new SqlCommand($"select FlowID, ContinuationID, Metadata from {schema}.FlowContinuation " + + "where ServiceID = @ServiceID " + + "order by FlowID", connection); + var continuationQueryParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int); + + + flowServiceParam.Value = serviceId; + continuationQueryParam.Value = serviceId; + + + var flowReader = await flowQuery.ExecuteReaderAsync(); + var continuationReader = await continuationQuery.ExecuteReaderAsync(); + var hasContinuation = await continuationReader.ReadAsync(); + + while (await flowReader.ReadAsync()) + { + var flowStateRecord = new FlowStateRecord + { + FlowID = flowReader.GetGuid(0), + Metadata = flowReader.GetString(1), + Data = flowReader.GetString(2), + ContinuationMetadata = new Dictionary() + }; + + while (hasContinuation && continuationReader.GetGuid(0) == flowStateRecord.FlowID) + { + flowStateRecord.ContinuationMetadata.Add( + continuationReader.GetGuid(1), + continuationReader.GetString(2) + ); + + hasContinuation = await continuationReader.ReadAsync(); + } + + result.Add(flowStateRecord); + } + } + + return result.AsQueryable(); + } + + + public Task CreateState(FlowStateRecord stateRecord, DateTime timestamp) + { + throw new NotImplementedException(); + } + + public Task UpdateState(FlowStateRecord stateRecord) + { + throw new NotImplementedException(); + } + + public Task DeleteState(Guid flowID) + { + throw new NotImplementedException(); + } + + + private async Task GetConnection() + { + var connection = new SqlConnection(connectionString); + await connection.OpenAsync(); + + return connection; + } + } +} diff --git a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj index 151e970..1549148 100644 --- a/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj +++ b/Tapeti.Flow.SQL/Tapeti.Flow.SQL.csproj @@ -43,6 +43,7 @@ + diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 6b91e95..bb11bda 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -27,7 +27,7 @@ namespace Tapeti.Flow.Default if (continuationAttribute == null) return; - context.Use(new FlowBindingFilter()); + context.Use(new FlowMessageFilterMiddleware()); context.Use(new FlowMessageMiddleware()); } diff --git a/Tapeti.Flow/Default/FlowBindingFilter.cs b/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs similarity index 84% rename from Tapeti.Flow/Default/FlowBindingFilter.cs rename to Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs index 727e773..1d327ae 100644 --- a/Tapeti.Flow/Default/FlowBindingFilter.cs +++ b/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs @@ -5,15 +5,18 @@ using Tapeti.Flow.FlowHelpers; namespace Tapeti.Flow.Default { - public class FlowBindingFilter : IBindingFilter + public class FlowMessageFilterMiddleware : IMessageFilterMiddleware { - public async Task Accept(IMessageContext context, IBinding binding) + public async Task Handle(IMessageContext context, Func next) { var flowContext = await GetFlowContext(context); if (flowContext?.ContinuationMetadata == null) - return false; + return; - return flowContext.ContinuationMetadata.MethodName == MethodSerializer.Serialize(binding.Method); + if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method)) + return; + + await next(); } diff --git a/Tapeti.Flow/Tapeti.Flow.csproj b/Tapeti.Flow/Tapeti.Flow.csproj index c8ca119..d2aab1d 100644 --- a/Tapeti.Flow/Tapeti.Flow.csproj +++ b/Tapeti.Flow/Tapeti.Flow.csproj @@ -52,7 +52,7 @@ - + diff --git a/Tapeti/Config/IBindingContext.cs b/Tapeti/Config/IBindingContext.cs index d45d42e..704ad21 100644 --- a/Tapeti/Config/IBindingContext.cs +++ b/Tapeti/Config/IBindingContext.cs @@ -17,7 +17,7 @@ namespace Tapeti.Config IReadOnlyList Parameters { get; } IBindingResult Result { get; } - void Use(IBindingFilter filter); + void Use(IMessageFilterMiddleware filterMiddleware); void Use(IMessageMiddleware middleware); } diff --git a/Tapeti/Config/IBindingFilter.cs b/Tapeti/Config/IBindingFilter.cs deleted file mode 100644 index 1e2259a..0000000 --- a/Tapeti/Config/IBindingFilter.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Threading.Tasks; - -namespace Tapeti.Config -{ - public interface IBindingFilter - { - Task Accept(IMessageContext context, IBinding binding); - } -} diff --git a/Tapeti/Config/IConfig.cs b/Tapeti/Config/IConfig.cs index ef68921..e9b2ba9 100644 --- a/Tapeti/Config/IConfig.cs +++ b/Tapeti/Config/IConfig.cs @@ -31,10 +31,10 @@ namespace Tapeti.Config Type MessageClass { get; } string QueueName { get; } + IReadOnlyList MessageFilterMiddleware { get; } IReadOnlyList MessageMiddleware { get; } - IReadOnlyList BindingFilters { get; } - Task Accept(IMessageContext context, object message); + bool Accept(IMessageContext context, object message); Task Invoke(IMessageContext context, object message); } diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs index bdda93b..50c3b30 100644 --- a/Tapeti/Config/IMessageContext.cs +++ b/Tapeti/Config/IMessageContext.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Reflection; using RabbitMQ.Client; namespace Tapeti.Config @@ -17,13 +16,10 @@ namespace Tapeti.Config IDictionary Items { get; } /// - /// Controller will be null when passed to an IBindingFilter + /// Controller will be null when passed to a IMessageFilterMiddleware /// object Controller { get; } - /// - /// Binding will be null when passed to an IBindingFilter - /// IBinding Binding { get; } } } diff --git a/Tapeti/Config/IMessageFilterMiddleware.cs b/Tapeti/Config/IMessageFilterMiddleware.cs new file mode 100644 index 0000000..497909c --- /dev/null +++ b/Tapeti/Config/IMessageFilterMiddleware.cs @@ -0,0 +1,10 @@ +using System; +using System.Threading.Tasks; + +namespace Tapeti.Config +{ + public interface IMessageFilterMiddleware + { + Task Handle(IMessageContext context, Func next); + } +} diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 5375034..b3b3ed6 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -53,20 +53,27 @@ namespace Tapeti.Connection { foreach (var binding in bindings) { - if (!binding.Accept(context, message).Result) + if (!binding.Accept(context, message)) continue; - context.Controller = dependencyResolver.Resolve(binding.Controller); context.Binding = binding; // ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas MiddlewareHelper.GoAsync( - binding.MessageMiddleware != null - ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() - : messageMiddleware, + binding.MessageFilterMiddleware, async (handler, next) => await handler.Handle(context, next), - () => binding.Invoke(context, message) - ).Wait(); + async () => + { + context.Controller = dependencyResolver.Resolve(binding.Controller); + + await MiddlewareHelper.GoAsync( + binding.MessageMiddleware != null + ? messageMiddleware.Concat(binding.MessageMiddleware).ToList() + : messageMiddleware, + async (handler, next) => await handler.Handle(context, next), + () => binding.Invoke(context, message) + ); + }).Wait(); // ReSharper restore AccessToDisposedClosure validMessageType = true; diff --git a/Tapeti/Helpers/MiddlewareHelper.cs b/Tapeti/Helpers/MiddlewareHelper.cs index edac856..f70e6bb 100644 --- a/Tapeti/Helpers/MiddlewareHelper.cs +++ b/Tapeti/Helpers/MiddlewareHelper.cs @@ -8,8 +8,8 @@ namespace Tapeti.Helpers { public static void Go(IReadOnlyList middleware, Action handle, Action lastHandler) { - var handlerIndex = middleware.Count - 1; - if (handlerIndex == -1) + var handlerIndex = middleware?.Count - 1 ?? -1; + if (middleware == null || handlerIndex == -1) { lastHandler(); return; @@ -32,8 +32,8 @@ namespace Tapeti.Helpers public static async Task GoAsync(IReadOnlyList middleware, Func, Task> handle, Func lastHandler) { - var handlerIndex = middleware.Count - 1; - if (handlerIndex == -1) + var handlerIndex = middleware?.Count - 1 ?? -1; + if (middleware == null || handlerIndex == -1) { await lastHandler(); return; diff --git a/Tapeti/Tapeti.csproj b/Tapeti/Tapeti.csproj index 7fbad18..223fdba 100644 --- a/Tapeti/Tapeti.csproj +++ b/Tapeti/Tapeti.csproj @@ -40,6 +40,7 @@ True + @@ -53,7 +54,7 @@ - + @@ -78,6 +79,7 @@ + diff --git a/Tapeti/TapetiAppSettingsConnectionParams.cs b/Tapeti/TapetiAppSettingsConnectionParams.cs new file mode 100644 index 0000000..b9b69d0 --- /dev/null +++ b/Tapeti/TapetiAppSettingsConnectionParams.cs @@ -0,0 +1,36 @@ +using System; +using System.Configuration; +using System.Linq; + +namespace Tapeti +{ + public class TapetiAppSettingsConnectionParams : TapetiConnectionParams + { + public const string DefaultPrefix = "rabbitmq:"; + public const string KeyHostname = "hostname"; + public const string KeyPort = "port"; + public const string KeyVirtualHost = "virtualhost"; + public const string KeyUsername = "username"; + public const string KeyPassword = "password"; + public const string KeyPrefetchCount = "prefetchcount"; + + + public TapetiAppSettingsConnectionParams(string prefix = DefaultPrefix) + { + var keys = ConfigurationManager.AppSettings.AllKeys; + Action> getAppSetting = (key, setValue) => + { + if (keys.Contains(prefix + key)) + setValue(ConfigurationManager.AppSettings[prefix + key]); + }; + + + getAppSetting(KeyHostname, value => HostName = value); + getAppSetting(KeyPort, value => Port = int.Parse(value)); + getAppSetting(KeyVirtualHost, value => VirtualHost = value); + getAppSetting(KeyUsername, value => Username = value); + getAppSetting(KeyPassword, value => Password = value); + getAppSetting(KeyPrefetchCount, value => PrefetchCount = ushort.Parse(value)); + } + } +} diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 1d6d81c..bdc3da7 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -150,7 +150,7 @@ namespace Tapeti MessageClass = context.MessageClass, MessageHandler = messageHandler, MessageMiddleware = context.MessageMiddleware, - BindingFilters = context.BindingFilters + MessageFilterMiddleware = context.MessageFilterMiddleware }; if (methodQueueInfo.Dynamic.GetValueOrDefault()) @@ -268,11 +268,9 @@ namespace Tapeti { var existing = staticRegistrations[binding.QueueInfo.Name]; - // Technically we could easily do multicasting, but it complicates exception handling and requeueing - // TODO allow multiple, if there is a filter which guarantees uniqueness - // TODO move to independant validation middleware - if (existing.Any(h => h.MessageClass == binding.MessageClass)) - throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}"); + // TODO allow multiple only if there is a filter which guarantees uniqueness? and/or move to independant validation middleware + //if (existing.Any(h => h.MessageClass == binding.MessageClass)) + // throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}"); existing.Add(binding); } @@ -368,7 +366,7 @@ namespace Tapeti public string QueueName { get; set; } public IReadOnlyList MessageMiddleware { get; set; } - public IReadOnlyList BindingFilters { get; set; } + public IReadOnlyList MessageFilterMiddleware { get; set; } private QueueInfo queueInfo; public QueueInfo QueueInfo @@ -390,21 +388,9 @@ namespace Tapeti } - public async Task Accept(IMessageContext context, object message) + public bool Accept(IMessageContext context, object message) { - if (message.GetType() != MessageClass) - return false; - - if (BindingFilters == null) - return true; - - foreach (var filter in BindingFilters) - { - if (!await filter.Accept(context, this)) - return false; - } - - return true; + return message.GetType() == MessageClass; } @@ -431,7 +417,7 @@ namespace Tapeti internal class BindingContext : IBindingContext { private List messageMiddleware; - private List bindingFilters; + private List messageFilterMiddleware; public Type MessageClass { get; set; } @@ -440,7 +426,7 @@ namespace Tapeti public IBindingResult Result { get; } public IReadOnlyList MessageMiddleware => messageMiddleware; - public IReadOnlyList BindingFilters => bindingFilters; + public IReadOnlyList MessageFilterMiddleware => messageFilterMiddleware; public BindingContext(MethodInfo method) @@ -461,12 +447,12 @@ namespace Tapeti } - public void Use(IBindingFilter filter) + public void Use(IMessageFilterMiddleware filterMiddleware) { - if (bindingFilters == null) - bindingFilters = new List(); + if (messageFilterMiddleware == null) + messageFilterMiddleware = new List(); - bindingFilters.Add(filter); + messageFilterMiddleware.Add(filterMiddleware); } } diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index e9744c0..1f1f9a8 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -7,8 +7,9 @@ using Tapeti.Flow.Annotations; namespace Test { + [MessageController] [DynamicQueue] - public class MarcoController : MessageController + public class MarcoController { private readonly IPublisher publisher; private readonly IFlowProvider flowProvider; @@ -52,7 +53,6 @@ namespace Test Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!"); - // This should error, as MarcoMessage expects a PoloMessage as a response return flowProvider.EndWithResponse(new PoloMessage()); } diff --git a/Test/MarcoEmitter.cs b/Test/MarcoEmitter.cs index 593b15f..79c0911 100644 --- a/Test/MarcoEmitter.cs +++ b/Test/MarcoEmitter.cs @@ -17,9 +17,8 @@ namespace Test public async Task Run() { - await publisher.Publish(new MarcoMessage()); +// await publisher.Publish(new MarcoMessage()); - /* var concurrent = new SemaphoreSlim(20); while (true) @@ -37,14 +36,14 @@ namespace Test } } - await Task.Delay(1000); + await Task.Delay(200); } - */ + /* while (true) { await Task.Delay(1000); - } + }*/ } } } diff --git a/Test/Program.cs b/Test/Program.cs index 9f20a3c..f4244d2 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -2,6 +2,7 @@ using SimpleInjector; using Tapeti; using Tapeti.Flow; +using Tapeti.Flow.SQL; using Tapeti.SimpleInjector; namespace Test @@ -12,25 +13,23 @@ namespace Test { // TODO SQL based flow store // TODO logging + // TODO uitzoeken of we consumers kunnen pauzeren (denk: SQL down) --> nee, EFDBContext Get Async maken en retryen? kan dat, of timeout dan Rabbit? var container = new Container(); container.Register(); container.Register(); - container.Register(); + //container.Register(() => new EF(serviceID)); var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) .WithFlow() + //.WithFlowSqlRepository("data source=localhost;initial catalog=lef;integrated security=True;multipleactiveresultsets=True", 1) .RegisterAllControllers() .Build(); using (var connection = new TapetiConnection(config) { - Params = new TapetiConnectionParams - { - HostName = "localhost", - PrefetchCount = 200 - } + Params = new TapetiAppSettingsConnectionParams() }) { Console.WriteLine("Subscribing..."); diff --git a/Test/Test.csproj b/Test/Test.csproj index 719fffe..28c333a 100644 --- a/Test/Test.csproj +++ b/Test/Test.csproj @@ -59,6 +59,10 @@ + + {6de7b122-eb6a-46b8-aeaf-f84dde18f9c7} + Tapeti.Flow.SQL + {8ab4fd33-4aaa-465c-8579-9db3f3b23813} Tapeti