1
0
mirror of synced 2025-01-22 16:13:07 +01:00

Added AppSettings ConnectionParams helper

Changed BindingFilters to MessageFilterMiddleware (in preparation for SignalR interaction package)
Start of SqlConnectionFlowRepository
This commit is contained in:
Mark van Renswoude 2017-02-08 15:52:24 +01:00
parent 1f41f6bcc0
commit 6779f3a4d0
20 changed files with 252 additions and 83 deletions

View File

@ -5,9 +5,9 @@ namespace Tapeti.Flow.SQL
{
public static class ConfigExtensions
{
public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config)
public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config, string connectionString, int serviceId, string schema = "dbo")
{
config.Use(new FlowSqlRepositoryBundle());
config.Use(new FlowSqlRepositoryBundle(connectionString, serviceId, schema));
return config;
}
}
@ -15,19 +15,25 @@ namespace Tapeti.Flow.SQL
internal class FlowSqlRepositoryBundle : ITapetiExtension
{
/*
public IEnumerable<object> GetContents(IDependencyResolver dependencyResolver)
{
((IDependencyContainer)dependencyResolver)?.RegisterDefault<IFlowRepository, >();
private readonly string connectionString;
private readonly string schema;
private readonly int serviceId;
return null;
public FlowSqlRepositoryBundle(string connectionString, int serviceId, string schema)
{
this.connectionString = connectionString;
this.serviceId = serviceId;
this.schema = schema;
}
*/
public void RegisterDefaults(IDependencyContainer container)
{
container.RegisterDefault<IFlowRepository>(() => new SqlConnectionFlowRepository(connectionString, serviceId, schema));
}
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
{
return null;

View File

@ -0,0 +1,129 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;
namespace Tapeti.Flow.SQL
{
/*
Assumes the following table layout (schema configurable):
create table dbo.Flow
(
FlowID uniqueidentifier not null,
ServiceID int not null,
CreationTime datetime2(3) not null,
Metadata nvarchar(max) null,
Flowdata nvarchar(max) null,
constraint PK_Flow primary key clustered (FlowID)
);
create table dbo.FlowContinuation
(
FlowID uniqueidentifier not null,
ContinuationID uniqueidentifier not null,
Metadata nvarchar(max) null,
constraint PK_FlowContinuation primary key clustered (FlowID, ContinuationID)
);
go;
alter table shared.FlowContinuation with check add constraint FK_FlowContinuation_Flow foreign key (FlowID) references shared.Flow (FlowID);
*/
public class SqlConnectionFlowRepository : IFlowRepository
{
private readonly string connectionString;
private readonly int serviceId;
private readonly string schema;
public SqlConnectionFlowRepository(string connectionString, int serviceId, string schema)
{
this.connectionString = connectionString;
this.serviceId = serviceId;
this.schema = schema;
}
public async Task<IQueryable<FlowStateRecord>> GetStates()
{
var result = new List<FlowStateRecord>();
using (var connection = await GetConnection())
{
var flowQuery = new SqlCommand($"select FlowID, Metadata, Flowdata from {schema}.Flow " +
"where ServiceID = @ServiceID " +
"order by FlowID", connection);
var flowServiceParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int);
var continuationQuery = new SqlCommand($"select FlowID, ContinuationID, Metadata from {schema}.FlowContinuation " +
"where ServiceID = @ServiceID " +
"order by FlowID", connection);
var continuationQueryParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int);
flowServiceParam.Value = serviceId;
continuationQueryParam.Value = serviceId;
var flowReader = await flowQuery.ExecuteReaderAsync();
var continuationReader = await continuationQuery.ExecuteReaderAsync();
var hasContinuation = await continuationReader.ReadAsync();
while (await flowReader.ReadAsync())
{
var flowStateRecord = new FlowStateRecord
{
FlowID = flowReader.GetGuid(0),
Metadata = flowReader.GetString(1),
Data = flowReader.GetString(2),
ContinuationMetadata = new Dictionary<Guid, string>()
};
while (hasContinuation && continuationReader.GetGuid(0) == flowStateRecord.FlowID)
{
flowStateRecord.ContinuationMetadata.Add(
continuationReader.GetGuid(1),
continuationReader.GetString(2)
);
hasContinuation = await continuationReader.ReadAsync();
}
result.Add(flowStateRecord);
}
}
return result.AsQueryable();
}
public Task CreateState(FlowStateRecord stateRecord, DateTime timestamp)
{
throw new NotImplementedException();
}
public Task UpdateState(FlowStateRecord stateRecord)
{
throw new NotImplementedException();
}
public Task DeleteState(Guid flowID)
{
throw new NotImplementedException();
}
private async Task<SqlConnection> GetConnection()
{
var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
return connection;
}
}
}

View File

@ -43,6 +43,7 @@
<ItemGroup>
<Compile Include="ConfigExtensions.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="SqlConnectionFlowRepository.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti.Flow\Tapeti.Flow.csproj">

View File

@ -27,7 +27,7 @@ namespace Tapeti.Flow.Default
if (continuationAttribute == null)
return;
context.Use(new FlowBindingFilter());
context.Use(new FlowMessageFilterMiddleware());
context.Use(new FlowMessageMiddleware());
}

View File

@ -5,15 +5,18 @@ using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default
{
public class FlowBindingFilter : IBindingFilter
public class FlowMessageFilterMiddleware : IMessageFilterMiddleware
{
public async Task<bool> Accept(IMessageContext context, IBinding binding)
public async Task Handle(IMessageContext context, Func<Task> next)
{
var flowContext = await GetFlowContext(context);
if (flowContext?.ContinuationMetadata == null)
return false;
return;
return flowContext.ContinuationMetadata.MethodName == MethodSerializer.Serialize(binding.Method);
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method))
return;
await next();
}

View File

@ -52,7 +52,7 @@
<Compile Include="Annotations\ContinuationAttribute.cs" />
<Compile Include="Annotations\RequestAttribute.cs" />
<Compile Include="ContextItems.cs" />
<Compile Include="Default\FlowBindingFilter.cs" />
<Compile Include="Default\FlowMessageFilterMiddleware.cs" />
<Compile Include="Default\FlowBindingMiddleware.cs" />
<Compile Include="Default\FlowContext.cs" />
<Compile Include="Default\FlowMessageMiddleware.cs" />

View File

@ -17,7 +17,7 @@ namespace Tapeti.Config
IReadOnlyList<IBindingParameter> Parameters { get; }
IBindingResult Result { get; }
void Use(IBindingFilter filter);
void Use(IMessageFilterMiddleware filterMiddleware);
void Use(IMessageMiddleware middleware);
}

View File

@ -1,9 +0,0 @@
using System.Threading.Tasks;
namespace Tapeti.Config
{
public interface IBindingFilter
{
Task<bool> Accept(IMessageContext context, IBinding binding);
}
}

View File

@ -31,10 +31,10 @@ namespace Tapeti.Config
Type MessageClass { get; }
string QueueName { get; }
IReadOnlyList<IMessageFilterMiddleware> MessageFilterMiddleware { get; }
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
IReadOnlyList<IBindingFilter> BindingFilters { get; }
Task<bool> Accept(IMessageContext context, object message);
bool Accept(IMessageContext context, object message);
Task Invoke(IMessageContext context, object message);
}

View File

@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using RabbitMQ.Client;
namespace Tapeti.Config
@ -17,13 +16,10 @@ namespace Tapeti.Config
IDictionary<string, object> Items { get; }
/// <summary>
/// Controller will be null when passed to an IBindingFilter
/// Controller will be null when passed to a IMessageFilterMiddleware
/// </summary>
object Controller { get; }
/// <summary>
/// Binding will be null when passed to an IBindingFilter
/// </summary>
IBinding Binding { get; }
}
}

View File

@ -0,0 +1,10 @@
using System;
using System.Threading.Tasks;
namespace Tapeti.Config
{
public interface IMessageFilterMiddleware
{
Task Handle(IMessageContext context, Func<Task> next);
}
}

View File

@ -53,20 +53,27 @@ namespace Tapeti.Connection
{
foreach (var binding in bindings)
{
if (!binding.Accept(context, message).Result)
if (!binding.Accept(context, message))
continue;
context.Controller = dependencyResolver.Resolve(binding.Controller);
context.Binding = binding;
// ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas
MiddlewareHelper.GoAsync(
binding.MessageMiddleware != null
? messageMiddleware.Concat(binding.MessageMiddleware).ToList()
: messageMiddleware,
binding.MessageFilterMiddleware,
async (handler, next) => await handler.Handle(context, next),
() => binding.Invoke(context, message)
).Wait();
async () =>
{
context.Controller = dependencyResolver.Resolve(binding.Controller);
await MiddlewareHelper.GoAsync(
binding.MessageMiddleware != null
? messageMiddleware.Concat(binding.MessageMiddleware).ToList()
: messageMiddleware,
async (handler, next) => await handler.Handle(context, next),
() => binding.Invoke(context, message)
);
}).Wait();
// ReSharper restore AccessToDisposedClosure
validMessageType = true;

View File

@ -8,8 +8,8 @@ namespace Tapeti.Helpers
{
public static void Go<T>(IReadOnlyList<T> middleware, Action<T, Action> handle, Action lastHandler)
{
var handlerIndex = middleware.Count - 1;
if (handlerIndex == -1)
var handlerIndex = middleware?.Count - 1 ?? -1;
if (middleware == null || handlerIndex == -1)
{
lastHandler();
return;
@ -32,8 +32,8 @@ namespace Tapeti.Helpers
public static async Task GoAsync<T>(IReadOnlyList<T> middleware, Func<T, Func<Task>, Task> handle, Func<Task> lastHandler)
{
var handlerIndex = middleware.Count - 1;
if (handlerIndex == -1)
var handlerIndex = middleware?.Count - 1 ?? -1;
if (middleware == null || handlerIndex == -1)
{
await lastHandler();
return;

View File

@ -40,6 +40,7 @@
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.Configuration" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
@ -53,7 +54,7 @@
<Compile Include="Annotations\MessageControllerAttribute.cs" />
<Compile Include="Annotations\StaticQueueAttribute.cs" />
<Compile Include="Annotations\DynamicQueueAttribute.cs" />
<Compile Include="Config\IBindingFilter.cs" />
<Compile Include="Config\IMessageFilterMiddleware.cs" />
<Compile Include="Connection\TapetiConsumer.cs" />
<Compile Include="Connection\TapetiPublisher.cs" />
<Compile Include="Connection\TapetiSubscriber.cs" />
@ -78,6 +79,7 @@
<Compile Include="Config\IConfig.cs" />
<Compile Include="MessageController.cs" />
<Compile Include="Config\IBindingMiddleware.cs" />
<Compile Include="TapetiAppSettingsConnectionParams.cs" />
<Compile Include="TapetiConnectionParams.cs" />
<Compile Include="TapetiConfig.cs" />
<Compile Include="ConsumeResponse.cs" />

View File

@ -0,0 +1,36 @@
using System;
using System.Configuration;
using System.Linq;
namespace Tapeti
{
public class TapetiAppSettingsConnectionParams : TapetiConnectionParams
{
public const string DefaultPrefix = "rabbitmq:";
public const string KeyHostname = "hostname";
public const string KeyPort = "port";
public const string KeyVirtualHost = "virtualhost";
public const string KeyUsername = "username";
public const string KeyPassword = "password";
public const string KeyPrefetchCount = "prefetchcount";
public TapetiAppSettingsConnectionParams(string prefix = DefaultPrefix)
{
var keys = ConfigurationManager.AppSettings.AllKeys;
Action<string, Action<string>> getAppSetting = (key, setValue) =>
{
if (keys.Contains(prefix + key))
setValue(ConfigurationManager.AppSettings[prefix + key]);
};
getAppSetting(KeyHostname, value => HostName = value);
getAppSetting(KeyPort, value => Port = int.Parse(value));
getAppSetting(KeyVirtualHost, value => VirtualHost = value);
getAppSetting(KeyUsername, value => Username = value);
getAppSetting(KeyPassword, value => Password = value);
getAppSetting(KeyPrefetchCount, value => PrefetchCount = ushort.Parse(value));
}
}
}

View File

@ -150,7 +150,7 @@ namespace Tapeti
MessageClass = context.MessageClass,
MessageHandler = messageHandler,
MessageMiddleware = context.MessageMiddleware,
BindingFilters = context.BindingFilters
MessageFilterMiddleware = context.MessageFilterMiddleware
};
if (methodQueueInfo.Dynamic.GetValueOrDefault())
@ -268,11 +268,9 @@ namespace Tapeti
{
var existing = staticRegistrations[binding.QueueInfo.Name];
// Technically we could easily do multicasting, but it complicates exception handling and requeueing
// TODO allow multiple, if there is a filter which guarantees uniqueness
// TODO move to independant validation middleware
if (existing.Any(h => h.MessageClass == binding.MessageClass))
throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}");
// TODO allow multiple only if there is a filter which guarantees uniqueness? and/or move to independant validation middleware
//if (existing.Any(h => h.MessageClass == binding.MessageClass))
// throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}");
existing.Add(binding);
}
@ -368,7 +366,7 @@ namespace Tapeti
public string QueueName { get; set; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; set; }
public IReadOnlyList<IBindingFilter> BindingFilters { get; set; }
public IReadOnlyList<IMessageFilterMiddleware> MessageFilterMiddleware { get; set; }
private QueueInfo queueInfo;
public QueueInfo QueueInfo
@ -390,21 +388,9 @@ namespace Tapeti
}
public async Task<bool> Accept(IMessageContext context, object message)
public bool Accept(IMessageContext context, object message)
{
if (message.GetType() != MessageClass)
return false;
if (BindingFilters == null)
return true;
foreach (var filter in BindingFilters)
{
if (!await filter.Accept(context, this))
return false;
}
return true;
return message.GetType() == MessageClass;
}
@ -431,7 +417,7 @@ namespace Tapeti
internal class BindingContext : IBindingContext
{
private List<IMessageMiddleware> messageMiddleware;
private List<IBindingFilter> bindingFilters;
private List<IMessageFilterMiddleware> messageFilterMiddleware;
public Type MessageClass { get; set; }
@ -440,7 +426,7 @@ namespace Tapeti
public IBindingResult Result { get; }
public IReadOnlyList<IMessageMiddleware> MessageMiddleware => messageMiddleware;
public IReadOnlyList<IBindingFilter> BindingFilters => bindingFilters;
public IReadOnlyList<IMessageFilterMiddleware> MessageFilterMiddleware => messageFilterMiddleware;
public BindingContext(MethodInfo method)
@ -461,12 +447,12 @@ namespace Tapeti
}
public void Use(IBindingFilter filter)
public void Use(IMessageFilterMiddleware filterMiddleware)
{
if (bindingFilters == null)
bindingFilters = new List<IBindingFilter>();
if (messageFilterMiddleware == null)
messageFilterMiddleware = new List<IMessageFilterMiddleware>();
bindingFilters.Add(filter);
messageFilterMiddleware.Add(filterMiddleware);
}
}

View File

@ -7,8 +7,9 @@ using Tapeti.Flow.Annotations;
namespace Test
{
[MessageController]
[DynamicQueue]
public class MarcoController : MessageController
public class MarcoController
{
private readonly IPublisher publisher;
private readonly IFlowProvider flowProvider;
@ -52,7 +53,6 @@ namespace Test
Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!");
// This should error, as MarcoMessage expects a PoloMessage as a response
return flowProvider.EndWithResponse(new PoloMessage());
}

View File

@ -17,9 +17,8 @@ namespace Test
public async Task Run()
{
await publisher.Publish(new MarcoMessage());
// await publisher.Publish(new MarcoMessage());
/*
var concurrent = new SemaphoreSlim(20);
while (true)
@ -37,14 +36,14 @@ namespace Test
}
}
await Task.Delay(1000);
await Task.Delay(200);
}
*/
/*
while (true)
{
await Task.Delay(1000);
}
}*/
}
}
}

View File

@ -2,6 +2,7 @@
using SimpleInjector;
using Tapeti;
using Tapeti.Flow;
using Tapeti.Flow.SQL;
using Tapeti.SimpleInjector;
namespace Test
@ -12,25 +13,23 @@ namespace Test
{
// TODO SQL based flow store
// TODO logging
// TODO uitzoeken of we consumers kunnen pauzeren (denk: SQL down) --> nee, EFDBContext Get Async maken en retryen? kan dat, of timeout dan Rabbit?
var container = new Container();
container.Register<MarcoEmitter>();
container.Register<Visualizer>();
container.Register<IFlowRepository>();
//container.Register<IFlowRepository>(() => new EF(serviceID));
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
.WithFlow()
//.WithFlowSqlRepository("data source=localhost;initial catalog=lef;integrated security=True;multipleactiveresultsets=True", 1)
.RegisterAllControllers()
.Build();
using (var connection = new TapetiConnection(config)
{
Params = new TapetiConnectionParams
{
HostName = "localhost",
PrefetchCount = 200
}
Params = new TapetiAppSettingsConnectionParams()
})
{
Console.WriteLine("Subscribing...");

View File

@ -59,6 +59,10 @@
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj">
<Project>{6de7b122-eb6a-46b8-aeaf-f84dde18f9c7}</Project>
<Name>Tapeti.Flow.SQL</Name>
</ProjectReference>
<ProjectReference Include="..\Tapeti\Tapeti.csproj">
<Project>{8ab4fd33-4aaa-465c-8579-9db3f3b23813}</Project>
<Name>Tapeti</Name>