Added AppSettings ConnectionParams helper
Changed BindingFilters to MessageFilterMiddleware (in preparation for SignalR interaction package) Start of SqlConnectionFlowRepository
This commit is contained in:
parent
e06a631700
commit
1a0d9b2570
@ -5,9 +5,9 @@ namespace Tapeti.Flow.SQL
|
|||||||
{
|
{
|
||||||
public static class ConfigExtensions
|
public static class ConfigExtensions
|
||||||
{
|
{
|
||||||
public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config)
|
public static TapetiConfig WithFlowSqlRepository(this TapetiConfig config, string connectionString, int serviceId, string schema = "dbo")
|
||||||
{
|
{
|
||||||
config.Use(new FlowSqlRepositoryBundle());
|
config.Use(new FlowSqlRepositoryBundle(connectionString, serviceId, schema));
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -15,19 +15,25 @@ namespace Tapeti.Flow.SQL
|
|||||||
|
|
||||||
internal class FlowSqlRepositoryBundle : ITapetiExtension
|
internal class FlowSqlRepositoryBundle : ITapetiExtension
|
||||||
{
|
{
|
||||||
/*
|
private readonly string connectionString;
|
||||||
public IEnumerable<object> GetContents(IDependencyResolver dependencyResolver)
|
private readonly string schema;
|
||||||
{
|
private readonly int serviceId;
|
||||||
((IDependencyContainer)dependencyResolver)?.RegisterDefault<IFlowRepository, >();
|
|
||||||
|
|
||||||
return null;
|
|
||||||
|
public FlowSqlRepositoryBundle(string connectionString, int serviceId, string schema)
|
||||||
|
{
|
||||||
|
this.connectionString = connectionString;
|
||||||
|
this.serviceId = serviceId;
|
||||||
|
this.schema = schema;
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
public void RegisterDefaults(IDependencyContainer container)
|
public void RegisterDefaults(IDependencyContainer container)
|
||||||
{
|
{
|
||||||
|
container.RegisterDefault<IFlowRepository>(() => new SqlConnectionFlowRepository(connectionString, serviceId, schema));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
|
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
|
||||||
{
|
{
|
||||||
return null;
|
return null;
|
||||||
|
129
Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs
Normal file
129
Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Data;
|
||||||
|
using System.Data.SqlClient;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Tapeti.Flow.SQL
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
Assumes the following table layout (schema configurable):
|
||||||
|
|
||||||
|
|
||||||
|
create table dbo.Flow
|
||||||
|
(
|
||||||
|
FlowID uniqueidentifier not null,
|
||||||
|
ServiceID int not null,
|
||||||
|
CreationTime datetime2(3) not null,
|
||||||
|
Metadata nvarchar(max) null,
|
||||||
|
Flowdata nvarchar(max) null,
|
||||||
|
|
||||||
|
constraint PK_Flow primary key clustered (FlowID)
|
||||||
|
);
|
||||||
|
|
||||||
|
create table dbo.FlowContinuation
|
||||||
|
(
|
||||||
|
FlowID uniqueidentifier not null,
|
||||||
|
ContinuationID uniqueidentifier not null,
|
||||||
|
Metadata nvarchar(max) null,
|
||||||
|
|
||||||
|
constraint PK_FlowContinuation primary key clustered (FlowID, ContinuationID)
|
||||||
|
);
|
||||||
|
go;
|
||||||
|
|
||||||
|
alter table shared.FlowContinuation with check add constraint FK_FlowContinuation_Flow foreign key (FlowID) references shared.Flow (FlowID);
|
||||||
|
*/
|
||||||
|
public class SqlConnectionFlowRepository : IFlowRepository
|
||||||
|
{
|
||||||
|
private readonly string connectionString;
|
||||||
|
private readonly int serviceId;
|
||||||
|
private readonly string schema;
|
||||||
|
|
||||||
|
|
||||||
|
public SqlConnectionFlowRepository(string connectionString, int serviceId, string schema)
|
||||||
|
{
|
||||||
|
this.connectionString = connectionString;
|
||||||
|
this.serviceId = serviceId;
|
||||||
|
this.schema = schema;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public async Task<IQueryable<FlowStateRecord>> GetStates()
|
||||||
|
{
|
||||||
|
var result = new List<FlowStateRecord>();
|
||||||
|
|
||||||
|
using (var connection = await GetConnection())
|
||||||
|
{
|
||||||
|
var flowQuery = new SqlCommand($"select FlowID, Metadata, Flowdata from {schema}.Flow " +
|
||||||
|
"where ServiceID = @ServiceID " +
|
||||||
|
"order by FlowID", connection);
|
||||||
|
var flowServiceParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int);
|
||||||
|
|
||||||
|
var continuationQuery = new SqlCommand($"select FlowID, ContinuationID, Metadata from {schema}.FlowContinuation " +
|
||||||
|
"where ServiceID = @ServiceID " +
|
||||||
|
"order by FlowID", connection);
|
||||||
|
var continuationQueryParam = flowQuery.Parameters.Add("@ServiceID", SqlDbType.Int);
|
||||||
|
|
||||||
|
|
||||||
|
flowServiceParam.Value = serviceId;
|
||||||
|
continuationQueryParam.Value = serviceId;
|
||||||
|
|
||||||
|
|
||||||
|
var flowReader = await flowQuery.ExecuteReaderAsync();
|
||||||
|
var continuationReader = await continuationQuery.ExecuteReaderAsync();
|
||||||
|
var hasContinuation = await continuationReader.ReadAsync();
|
||||||
|
|
||||||
|
while (await flowReader.ReadAsync())
|
||||||
|
{
|
||||||
|
var flowStateRecord = new FlowStateRecord
|
||||||
|
{
|
||||||
|
FlowID = flowReader.GetGuid(0),
|
||||||
|
Metadata = flowReader.GetString(1),
|
||||||
|
Data = flowReader.GetString(2),
|
||||||
|
ContinuationMetadata = new Dictionary<Guid, string>()
|
||||||
|
};
|
||||||
|
|
||||||
|
while (hasContinuation && continuationReader.GetGuid(0) == flowStateRecord.FlowID)
|
||||||
|
{
|
||||||
|
flowStateRecord.ContinuationMetadata.Add(
|
||||||
|
continuationReader.GetGuid(1),
|
||||||
|
continuationReader.GetString(2)
|
||||||
|
);
|
||||||
|
|
||||||
|
hasContinuation = await continuationReader.ReadAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Add(flowStateRecord);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.AsQueryable();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public Task CreateState(FlowStateRecord stateRecord, DateTime timestamp)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task UpdateState(FlowStateRecord stateRecord)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task DeleteState(Guid flowID)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private async Task<SqlConnection> GetConnection()
|
||||||
|
{
|
||||||
|
var connection = new SqlConnection(connectionString);
|
||||||
|
await connection.OpenAsync();
|
||||||
|
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -43,6 +43,7 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<Compile Include="ConfigExtensions.cs" />
|
<Compile Include="ConfigExtensions.cs" />
|
||||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||||
|
<Compile Include="SqlConnectionFlowRepository.cs" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="..\Tapeti.Flow\Tapeti.Flow.csproj">
|
<ProjectReference Include="..\Tapeti.Flow\Tapeti.Flow.csproj">
|
||||||
|
@ -27,7 +27,7 @@ namespace Tapeti.Flow.Default
|
|||||||
if (continuationAttribute == null)
|
if (continuationAttribute == null)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
context.Use(new FlowBindingFilter());
|
context.Use(new FlowMessageFilterMiddleware());
|
||||||
context.Use(new FlowMessageMiddleware());
|
context.Use(new FlowMessageMiddleware());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,15 +5,18 @@ using Tapeti.Flow.FlowHelpers;
|
|||||||
|
|
||||||
namespace Tapeti.Flow.Default
|
namespace Tapeti.Flow.Default
|
||||||
{
|
{
|
||||||
public class FlowBindingFilter : IBindingFilter
|
public class FlowMessageFilterMiddleware : IMessageFilterMiddleware
|
||||||
{
|
{
|
||||||
public async Task<bool> Accept(IMessageContext context, IBinding binding)
|
public async Task Handle(IMessageContext context, Func<Task> next)
|
||||||
{
|
{
|
||||||
var flowContext = await GetFlowContext(context);
|
var flowContext = await GetFlowContext(context);
|
||||||
if (flowContext?.ContinuationMetadata == null)
|
if (flowContext?.ContinuationMetadata == null)
|
||||||
return false;
|
return;
|
||||||
|
|
||||||
return flowContext.ContinuationMetadata.MethodName == MethodSerializer.Serialize(binding.Method);
|
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(context.Binding.Method))
|
||||||
|
return;
|
||||||
|
|
||||||
|
await next();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -52,7 +52,7 @@
|
|||||||
<Compile Include="Annotations\ContinuationAttribute.cs" />
|
<Compile Include="Annotations\ContinuationAttribute.cs" />
|
||||||
<Compile Include="Annotations\RequestAttribute.cs" />
|
<Compile Include="Annotations\RequestAttribute.cs" />
|
||||||
<Compile Include="ContextItems.cs" />
|
<Compile Include="ContextItems.cs" />
|
||||||
<Compile Include="Default\FlowBindingFilter.cs" />
|
<Compile Include="Default\FlowMessageFilterMiddleware.cs" />
|
||||||
<Compile Include="Default\FlowBindingMiddleware.cs" />
|
<Compile Include="Default\FlowBindingMiddleware.cs" />
|
||||||
<Compile Include="Default\FlowContext.cs" />
|
<Compile Include="Default\FlowContext.cs" />
|
||||||
<Compile Include="Default\FlowMessageMiddleware.cs" />
|
<Compile Include="Default\FlowMessageMiddleware.cs" />
|
||||||
|
@ -17,7 +17,7 @@ namespace Tapeti.Config
|
|||||||
IReadOnlyList<IBindingParameter> Parameters { get; }
|
IReadOnlyList<IBindingParameter> Parameters { get; }
|
||||||
IBindingResult Result { get; }
|
IBindingResult Result { get; }
|
||||||
|
|
||||||
void Use(IBindingFilter filter);
|
void Use(IMessageFilterMiddleware filterMiddleware);
|
||||||
void Use(IMessageMiddleware middleware);
|
void Use(IMessageMiddleware middleware);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,9 +0,0 @@
|
|||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace Tapeti.Config
|
|
||||||
{
|
|
||||||
public interface IBindingFilter
|
|
||||||
{
|
|
||||||
Task<bool> Accept(IMessageContext context, IBinding binding);
|
|
||||||
}
|
|
||||||
}
|
|
@ -31,10 +31,10 @@ namespace Tapeti.Config
|
|||||||
Type MessageClass { get; }
|
Type MessageClass { get; }
|
||||||
string QueueName { get; }
|
string QueueName { get; }
|
||||||
|
|
||||||
|
IReadOnlyList<IMessageFilterMiddleware> MessageFilterMiddleware { get; }
|
||||||
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
|
IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; }
|
||||||
IReadOnlyList<IBindingFilter> BindingFilters { get; }
|
|
||||||
|
|
||||||
Task<bool> Accept(IMessageContext context, object message);
|
bool Accept(IMessageContext context, object message);
|
||||||
Task Invoke(IMessageContext context, object message);
|
Task Invoke(IMessageContext context, object message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Reflection;
|
|
||||||
using RabbitMQ.Client;
|
using RabbitMQ.Client;
|
||||||
|
|
||||||
namespace Tapeti.Config
|
namespace Tapeti.Config
|
||||||
@ -17,13 +16,10 @@ namespace Tapeti.Config
|
|||||||
IDictionary<string, object> Items { get; }
|
IDictionary<string, object> Items { get; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Controller will be null when passed to an IBindingFilter
|
/// Controller will be null when passed to a IMessageFilterMiddleware
|
||||||
/// </summary>
|
/// </summary>
|
||||||
object Controller { get; }
|
object Controller { get; }
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Binding will be null when passed to an IBindingFilter
|
|
||||||
/// </summary>
|
|
||||||
IBinding Binding { get; }
|
IBinding Binding { get; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
10
Tapeti/Config/IMessageFilterMiddleware.cs
Normal file
10
Tapeti/Config/IMessageFilterMiddleware.cs
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Tapeti.Config
|
||||||
|
{
|
||||||
|
public interface IMessageFilterMiddleware
|
||||||
|
{
|
||||||
|
Task Handle(IMessageContext context, Func<Task> next);
|
||||||
|
}
|
||||||
|
}
|
@ -53,20 +53,27 @@ namespace Tapeti.Connection
|
|||||||
{
|
{
|
||||||
foreach (var binding in bindings)
|
foreach (var binding in bindings)
|
||||||
{
|
{
|
||||||
if (!binding.Accept(context, message).Result)
|
if (!binding.Accept(context, message))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
context.Controller = dependencyResolver.Resolve(binding.Controller);
|
|
||||||
context.Binding = binding;
|
context.Binding = binding;
|
||||||
|
|
||||||
// ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas
|
// ReSharper disable AccessToDisposedClosure - MiddlewareHelper will not keep a reference to the lambdas
|
||||||
MiddlewareHelper.GoAsync(
|
MiddlewareHelper.GoAsync(
|
||||||
binding.MessageMiddleware != null
|
binding.MessageFilterMiddleware,
|
||||||
? messageMiddleware.Concat(binding.MessageMiddleware).ToList()
|
|
||||||
: messageMiddleware,
|
|
||||||
async (handler, next) => await handler.Handle(context, next),
|
async (handler, next) => await handler.Handle(context, next),
|
||||||
() => binding.Invoke(context, message)
|
async () =>
|
||||||
).Wait();
|
{
|
||||||
|
context.Controller = dependencyResolver.Resolve(binding.Controller);
|
||||||
|
|
||||||
|
await MiddlewareHelper.GoAsync(
|
||||||
|
binding.MessageMiddleware != null
|
||||||
|
? messageMiddleware.Concat(binding.MessageMiddleware).ToList()
|
||||||
|
: messageMiddleware,
|
||||||
|
async (handler, next) => await handler.Handle(context, next),
|
||||||
|
() => binding.Invoke(context, message)
|
||||||
|
);
|
||||||
|
}).Wait();
|
||||||
// ReSharper restore AccessToDisposedClosure
|
// ReSharper restore AccessToDisposedClosure
|
||||||
|
|
||||||
validMessageType = true;
|
validMessageType = true;
|
||||||
|
@ -8,8 +8,8 @@ namespace Tapeti.Helpers
|
|||||||
{
|
{
|
||||||
public static void Go<T>(IReadOnlyList<T> middleware, Action<T, Action> handle, Action lastHandler)
|
public static void Go<T>(IReadOnlyList<T> middleware, Action<T, Action> handle, Action lastHandler)
|
||||||
{
|
{
|
||||||
var handlerIndex = middleware.Count - 1;
|
var handlerIndex = middleware?.Count - 1 ?? -1;
|
||||||
if (handlerIndex == -1)
|
if (middleware == null || handlerIndex == -1)
|
||||||
{
|
{
|
||||||
lastHandler();
|
lastHandler();
|
||||||
return;
|
return;
|
||||||
@ -32,8 +32,8 @@ namespace Tapeti.Helpers
|
|||||||
|
|
||||||
public static async Task GoAsync<T>(IReadOnlyList<T> middleware, Func<T, Func<Task>, Task> handle, Func<Task> lastHandler)
|
public static async Task GoAsync<T>(IReadOnlyList<T> middleware, Func<T, Func<Task>, Task> handle, Func<Task> lastHandler)
|
||||||
{
|
{
|
||||||
var handlerIndex = middleware.Count - 1;
|
var handlerIndex = middleware?.Count - 1 ?? -1;
|
||||||
if (handlerIndex == -1)
|
if (middleware == null || handlerIndex == -1)
|
||||||
{
|
{
|
||||||
await lastHandler();
|
await lastHandler();
|
||||||
return;
|
return;
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
<Private>True</Private>
|
<Private>True</Private>
|
||||||
</Reference>
|
</Reference>
|
||||||
<Reference Include="System" />
|
<Reference Include="System" />
|
||||||
|
<Reference Include="System.Configuration" />
|
||||||
<Reference Include="System.Core" />
|
<Reference Include="System.Core" />
|
||||||
<Reference Include="System.Xml.Linq" />
|
<Reference Include="System.Xml.Linq" />
|
||||||
<Reference Include="System.Data.DataSetExtensions" />
|
<Reference Include="System.Data.DataSetExtensions" />
|
||||||
@ -53,7 +54,7 @@
|
|||||||
<Compile Include="Annotations\MessageControllerAttribute.cs" />
|
<Compile Include="Annotations\MessageControllerAttribute.cs" />
|
||||||
<Compile Include="Annotations\StaticQueueAttribute.cs" />
|
<Compile Include="Annotations\StaticQueueAttribute.cs" />
|
||||||
<Compile Include="Annotations\DynamicQueueAttribute.cs" />
|
<Compile Include="Annotations\DynamicQueueAttribute.cs" />
|
||||||
<Compile Include="Config\IBindingFilter.cs" />
|
<Compile Include="Config\IMessageFilterMiddleware.cs" />
|
||||||
<Compile Include="Connection\TapetiConsumer.cs" />
|
<Compile Include="Connection\TapetiConsumer.cs" />
|
||||||
<Compile Include="Connection\TapetiPublisher.cs" />
|
<Compile Include="Connection\TapetiPublisher.cs" />
|
||||||
<Compile Include="Connection\TapetiSubscriber.cs" />
|
<Compile Include="Connection\TapetiSubscriber.cs" />
|
||||||
@ -78,6 +79,7 @@
|
|||||||
<Compile Include="Config\IConfig.cs" />
|
<Compile Include="Config\IConfig.cs" />
|
||||||
<Compile Include="MessageController.cs" />
|
<Compile Include="MessageController.cs" />
|
||||||
<Compile Include="Config\IBindingMiddleware.cs" />
|
<Compile Include="Config\IBindingMiddleware.cs" />
|
||||||
|
<Compile Include="TapetiAppSettingsConnectionParams.cs" />
|
||||||
<Compile Include="TapetiConnectionParams.cs" />
|
<Compile Include="TapetiConnectionParams.cs" />
|
||||||
<Compile Include="TapetiConfig.cs" />
|
<Compile Include="TapetiConfig.cs" />
|
||||||
<Compile Include="ConsumeResponse.cs" />
|
<Compile Include="ConsumeResponse.cs" />
|
||||||
|
36
Tapeti/TapetiAppSettingsConnectionParams.cs
Normal file
36
Tapeti/TapetiAppSettingsConnectionParams.cs
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
using System;
|
||||||
|
using System.Configuration;
|
||||||
|
using System.Linq;
|
||||||
|
|
||||||
|
namespace Tapeti
|
||||||
|
{
|
||||||
|
public class TapetiAppSettingsConnectionParams : TapetiConnectionParams
|
||||||
|
{
|
||||||
|
public const string DefaultPrefix = "rabbitmq:";
|
||||||
|
public const string KeyHostname = "hostname";
|
||||||
|
public const string KeyPort = "port";
|
||||||
|
public const string KeyVirtualHost = "virtualhost";
|
||||||
|
public const string KeyUsername = "username";
|
||||||
|
public const string KeyPassword = "password";
|
||||||
|
public const string KeyPrefetchCount = "prefetchcount";
|
||||||
|
|
||||||
|
|
||||||
|
public TapetiAppSettingsConnectionParams(string prefix = DefaultPrefix)
|
||||||
|
{
|
||||||
|
var keys = ConfigurationManager.AppSettings.AllKeys;
|
||||||
|
Action<string, Action<string>> getAppSetting = (key, setValue) =>
|
||||||
|
{
|
||||||
|
if (keys.Contains(prefix + key))
|
||||||
|
setValue(ConfigurationManager.AppSettings[prefix + key]);
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
getAppSetting(KeyHostname, value => HostName = value);
|
||||||
|
getAppSetting(KeyPort, value => Port = int.Parse(value));
|
||||||
|
getAppSetting(KeyVirtualHost, value => VirtualHost = value);
|
||||||
|
getAppSetting(KeyUsername, value => Username = value);
|
||||||
|
getAppSetting(KeyPassword, value => Password = value);
|
||||||
|
getAppSetting(KeyPrefetchCount, value => PrefetchCount = ushort.Parse(value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -150,7 +150,7 @@ namespace Tapeti
|
|||||||
MessageClass = context.MessageClass,
|
MessageClass = context.MessageClass,
|
||||||
MessageHandler = messageHandler,
|
MessageHandler = messageHandler,
|
||||||
MessageMiddleware = context.MessageMiddleware,
|
MessageMiddleware = context.MessageMiddleware,
|
||||||
BindingFilters = context.BindingFilters
|
MessageFilterMiddleware = context.MessageFilterMiddleware
|
||||||
};
|
};
|
||||||
|
|
||||||
if (methodQueueInfo.Dynamic.GetValueOrDefault())
|
if (methodQueueInfo.Dynamic.GetValueOrDefault())
|
||||||
@ -268,11 +268,9 @@ namespace Tapeti
|
|||||||
{
|
{
|
||||||
var existing = staticRegistrations[binding.QueueInfo.Name];
|
var existing = staticRegistrations[binding.QueueInfo.Name];
|
||||||
|
|
||||||
// Technically we could easily do multicasting, but it complicates exception handling and requeueing
|
// TODO allow multiple only if there is a filter which guarantees uniqueness? and/or move to independant validation middleware
|
||||||
// TODO allow multiple, if there is a filter which guarantees uniqueness
|
//if (existing.Any(h => h.MessageClass == binding.MessageClass))
|
||||||
// TODO move to independant validation middleware
|
// throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}");
|
||||||
if (existing.Any(h => h.MessageClass == binding.MessageClass))
|
|
||||||
throw new TopologyConfigurationException($"Multiple handlers for message class {binding.MessageClass.Name} in queue {binding.QueueInfo.Name}");
|
|
||||||
|
|
||||||
existing.Add(binding);
|
existing.Add(binding);
|
||||||
}
|
}
|
||||||
@ -368,7 +366,7 @@ namespace Tapeti
|
|||||||
public string QueueName { get; set; }
|
public string QueueName { get; set; }
|
||||||
|
|
||||||
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; set; }
|
public IReadOnlyList<IMessageMiddleware> MessageMiddleware { get; set; }
|
||||||
public IReadOnlyList<IBindingFilter> BindingFilters { get; set; }
|
public IReadOnlyList<IMessageFilterMiddleware> MessageFilterMiddleware { get; set; }
|
||||||
|
|
||||||
private QueueInfo queueInfo;
|
private QueueInfo queueInfo;
|
||||||
public QueueInfo QueueInfo
|
public QueueInfo QueueInfo
|
||||||
@ -390,21 +388,9 @@ namespace Tapeti
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public async Task<bool> Accept(IMessageContext context, object message)
|
public bool Accept(IMessageContext context, object message)
|
||||||
{
|
{
|
||||||
if (message.GetType() != MessageClass)
|
return message.GetType() == MessageClass;
|
||||||
return false;
|
|
||||||
|
|
||||||
if (BindingFilters == null)
|
|
||||||
return true;
|
|
||||||
|
|
||||||
foreach (var filter in BindingFilters)
|
|
||||||
{
|
|
||||||
if (!await filter.Accept(context, this))
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -431,7 +417,7 @@ namespace Tapeti
|
|||||||
internal class BindingContext : IBindingContext
|
internal class BindingContext : IBindingContext
|
||||||
{
|
{
|
||||||
private List<IMessageMiddleware> messageMiddleware;
|
private List<IMessageMiddleware> messageMiddleware;
|
||||||
private List<IBindingFilter> bindingFilters;
|
private List<IMessageFilterMiddleware> messageFilterMiddleware;
|
||||||
|
|
||||||
public Type MessageClass { get; set; }
|
public Type MessageClass { get; set; }
|
||||||
|
|
||||||
@ -440,7 +426,7 @@ namespace Tapeti
|
|||||||
public IBindingResult Result { get; }
|
public IBindingResult Result { get; }
|
||||||
|
|
||||||
public IReadOnlyList<IMessageMiddleware> MessageMiddleware => messageMiddleware;
|
public IReadOnlyList<IMessageMiddleware> MessageMiddleware => messageMiddleware;
|
||||||
public IReadOnlyList<IBindingFilter> BindingFilters => bindingFilters;
|
public IReadOnlyList<IMessageFilterMiddleware> MessageFilterMiddleware => messageFilterMiddleware;
|
||||||
|
|
||||||
|
|
||||||
public BindingContext(MethodInfo method)
|
public BindingContext(MethodInfo method)
|
||||||
@ -461,12 +447,12 @@ namespace Tapeti
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void Use(IBindingFilter filter)
|
public void Use(IMessageFilterMiddleware filterMiddleware)
|
||||||
{
|
{
|
||||||
if (bindingFilters == null)
|
if (messageFilterMiddleware == null)
|
||||||
bindingFilters = new List<IBindingFilter>();
|
messageFilterMiddleware = new List<IMessageFilterMiddleware>();
|
||||||
|
|
||||||
bindingFilters.Add(filter);
|
messageFilterMiddleware.Add(filterMiddleware);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,8 +7,9 @@ using Tapeti.Flow.Annotations;
|
|||||||
|
|
||||||
namespace Test
|
namespace Test
|
||||||
{
|
{
|
||||||
|
[MessageController]
|
||||||
[DynamicQueue]
|
[DynamicQueue]
|
||||||
public class MarcoController : MessageController
|
public class MarcoController
|
||||||
{
|
{
|
||||||
private readonly IPublisher publisher;
|
private readonly IPublisher publisher;
|
||||||
private readonly IFlowProvider flowProvider;
|
private readonly IFlowProvider flowProvider;
|
||||||
@ -52,7 +53,6 @@ namespace Test
|
|||||||
|
|
||||||
Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!");
|
Console.WriteLine(message.ShouldMatchState.Equals(StateTestGuid) ? "Confirmed!" : "Oops! Mismatch!");
|
||||||
|
|
||||||
// This should error, as MarcoMessage expects a PoloMessage as a response
|
|
||||||
return flowProvider.EndWithResponse(new PoloMessage());
|
return flowProvider.EndWithResponse(new PoloMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,9 +17,8 @@ namespace Test
|
|||||||
|
|
||||||
public async Task Run()
|
public async Task Run()
|
||||||
{
|
{
|
||||||
await publisher.Publish(new MarcoMessage());
|
// await publisher.Publish(new MarcoMessage());
|
||||||
|
|
||||||
/*
|
|
||||||
var concurrent = new SemaphoreSlim(20);
|
var concurrent = new SemaphoreSlim(20);
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
@ -37,14 +36,14 @@ namespace Test
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await Task.Delay(1000);
|
await Task.Delay(200);
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
|
/*
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
await Task.Delay(1000);
|
await Task.Delay(1000);
|
||||||
}
|
}*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
using SimpleInjector;
|
using SimpleInjector;
|
||||||
using Tapeti;
|
using Tapeti;
|
||||||
using Tapeti.Flow;
|
using Tapeti.Flow;
|
||||||
|
using Tapeti.Flow.SQL;
|
||||||
using Tapeti.SimpleInjector;
|
using Tapeti.SimpleInjector;
|
||||||
|
|
||||||
namespace Test
|
namespace Test
|
||||||
@ -12,25 +13,23 @@ namespace Test
|
|||||||
{
|
{
|
||||||
// TODO SQL based flow store
|
// TODO SQL based flow store
|
||||||
// TODO logging
|
// TODO logging
|
||||||
|
// TODO uitzoeken of we consumers kunnen pauzeren (denk: SQL down) --> nee, EFDBContext Get Async maken en retryen? kan dat, of timeout dan Rabbit?
|
||||||
|
|
||||||
var container = new Container();
|
var container = new Container();
|
||||||
container.Register<MarcoEmitter>();
|
container.Register<MarcoEmitter>();
|
||||||
container.Register<Visualizer>();
|
container.Register<Visualizer>();
|
||||||
|
|
||||||
container.Register<IFlowRepository>();
|
//container.Register<IFlowRepository>(() => new EF(serviceID));
|
||||||
|
|
||||||
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
|
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
|
||||||
.WithFlow()
|
.WithFlow()
|
||||||
|
//.WithFlowSqlRepository("data source=localhost;initial catalog=lef;integrated security=True;multipleactiveresultsets=True", 1)
|
||||||
.RegisterAllControllers()
|
.RegisterAllControllers()
|
||||||
.Build();
|
.Build();
|
||||||
|
|
||||||
using (var connection = new TapetiConnection(config)
|
using (var connection = new TapetiConnection(config)
|
||||||
{
|
{
|
||||||
Params = new TapetiConnectionParams
|
Params = new TapetiAppSettingsConnectionParams()
|
||||||
{
|
|
||||||
HostName = "localhost",
|
|
||||||
PrefetchCount = 200
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
{
|
{
|
||||||
Console.WriteLine("Subscribing...");
|
Console.WriteLine("Subscribing...");
|
||||||
|
@ -59,6 +59,10 @@
|
|||||||
<None Include="packages.config" />
|
<None Include="packages.config" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\Tapeti.Flow.SQL\Tapeti.Flow.SQL.csproj">
|
||||||
|
<Project>{6de7b122-eb6a-46b8-aeaf-f84dde18f9c7}</Project>
|
||||||
|
<Name>Tapeti.Flow.SQL</Name>
|
||||||
|
</ProjectReference>
|
||||||
<ProjectReference Include="..\Tapeti\Tapeti.csproj">
|
<ProjectReference Include="..\Tapeti\Tapeti.csproj">
|
||||||
<Project>{8ab4fd33-4aaa-465c-8579-9db3f3b23813}</Project>
|
<Project>{8ab4fd33-4aaa-465c-8579-9db3f3b23813}</Project>
|
||||||
<Name>Tapeti</Name>
|
<Name>Tapeti</Name>
|
||||||
|
Loading…
Reference in New Issue
Block a user