1
0
mirror of synced 2024-11-24 11:43:12 +01:00

Merge branch 'develop' into release/1.0

This commit is contained in:
Mark van Renswoude 2018-12-19 20:53:25 +01:00
commit 31acbcb8e0
56 changed files with 198 additions and 348 deletions

View File

@ -1,15 +0,0 @@
using System;
namespace Tapeti.Annotations
{
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
public class ExchangeAttribute : Attribute
{
public string Name { get; set; }
public ExchangeAttribute(string name)
{
Name = name;
}
}
}

View File

@ -1,6 +1,6 @@
using System; using System;
namespace Tapeti.Flow.Annotations namespace Tapeti.Annotations
{ {
[AttributeUsage(AttributeTargets.Class)] [AttributeUsage(AttributeTargets.Class)]
public class RequestAttribute : Attribute public class RequestAttribute : Attribute

View File

@ -1,6 +1,8 @@
using System.Collections.Generic; using System.Collections.Generic;
using Tapeti.Config; using Tapeti.Config;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Flow.SQL namespace Tapeti.Flow.SQL
{ {
public static class ConfigExtensions public static class ConfigExtensions

View File

@ -69,7 +69,7 @@ namespace Tapeti.Flow.SQL
public Task CreateState<T>(Guid flowID, T state, DateTime timestamp) public Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
{ {
var stateJason = JsonConvert.SerializeObject(state); //var stateJson = JsonConvert.SerializeObject(state);
throw new NotImplementedException(); throw new NotImplementedException();
} }

View File

@ -1,6 +1,7 @@
using System; using System;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Annotations;
using Tapeti.Config; using Tapeti.Config;
using Tapeti.Flow.Annotations; using Tapeti.Flow.Annotations;
using Tapeti.Helpers; using Tapeti.Helpers;
@ -59,8 +60,7 @@ namespace Tapeti.Flow.Default
private static void RegisterYieldPointResult(IBindingContext context) private static void RegisterYieldPointResult(IBindingContext context)
{ {
bool isTaskOf; if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out var isTaskOf))
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out isTaskOf))
return; return;
if (isTaskOf) if (isTaskOf)
@ -87,7 +87,10 @@ namespace Tapeti.Flow.Default
private static Task HandleParallelResponse(IMessageContext context) private static Task HandleParallelResponse(IMessageContext context)
{ {
var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = context.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(context, new DelegateYieldPoint((a) => Task.CompletedTask)); return flowHandler.Execute(context, new DelegateYieldPoint(async flowContext =>
{
await flowContext.Store();
}));
} }
@ -97,8 +100,7 @@ namespace Tapeti.Flow.Default
if (request?.Response == null) if (request?.Response == null)
return; return;
bool isTaskOf; if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out _))
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out isTaskOf))
throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}"); throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}");
} }
} }

View File

@ -1,8 +1,4 @@
using System; using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default
@ -11,8 +7,7 @@ namespace Tapeti.Flow.Default
{ {
public async Task Handle(IMessageContext context, HandlingResult handlingResult) public async Task Handle(IMessageContext context, HandlingResult handlingResult)
{ {
object flowContextObj; if (!context.Items.TryGetValue(ContextItems.FlowContext, out var flowContextObj))
if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextObj))
return; return;
var flowContext = (FlowContext)flowContextObj; var flowContext = (FlowContext)flowContextObj;

View File

@ -28,8 +28,7 @@ namespace Tapeti.Flow.Default
if (context.Properties.CorrelationId == null) if (context.Properties.CorrelationId == null)
return null; return null;
Guid continuationID; if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID))
if (!Guid.TryParse(context.Properties.CorrelationId, out continuationID))
return null; return null;
var flowStore = context.DependencyResolver.Resolve<IFlowStore>(); var flowStore = context.DependencyResolver.Resolve<IFlowStore>();
@ -44,8 +43,6 @@ namespace Tapeti.Flow.Default
if (flowState == null) if (flowState == null)
return null; return null;
ContinuationMetadata continuation;
var flowContext = new FlowContext var flowContext = new FlowContext
{ {
MessageContext = context, MessageContext = context,
@ -54,7 +51,7 @@ namespace Tapeti.Flow.Default
FlowState = flowState, FlowState = flowState,
ContinuationID = continuationID, ContinuationID = continuationID,
ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out continuation) ? continuation : null ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out var continuation) ? continuation : null
}; };
// IDisposable items in the IMessageContext are automatically disposed // IDisposable items in the IMessageContext are automatically disposed

View File

@ -25,9 +25,6 @@ namespace Tapeti.Flow.Default
await CallConvergeMethod(context, await CallConvergeMethod(context,
flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodName,
flowContext.ContinuationMetadata.ConvergeMethodSync); flowContext.ContinuationMetadata.ConvergeMethodSync);
else if (flowContext.FlowState.Continuations.Count > 0)
// This is a parallel flow waiting for other continuations, always store the state
await flowContext.Store();
} }
else else
await next(); await next();

View File

@ -1,9 +1,11 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using RabbitMQ.Client.Framing; using RabbitMQ.Client.Framing;
using Tapeti.Annotations;
using Tapeti.Config; using Tapeti.Config;
using Tapeti.Flow.Annotations; using Tapeti.Flow.Annotations;
using Tapeti.Flow.FlowHelpers; using Tapeti.Flow.FlowHelpers;
@ -54,6 +56,8 @@ namespace Tapeti.Flow.Default
private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false) string convergeMethodName = null, bool convergeMethodTaskSync = false)
{ {
Debug.Assert(context.FlowState != null, "context.FlowState != null");
if (context.FlowState == null) if (context.FlowState == null)
{ {
await CreateNewFlowState(context); await CreateNewFlowState(context);
@ -114,7 +118,7 @@ namespace Tapeti.Flow.Default
{ {
await context.Delete(); await context.Delete();
if (context.FlowState != null && context.FlowState.Metadata.Reply != null) if (context.FlowState?.Metadata.Reply != null)
throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}");
} }
@ -179,15 +183,12 @@ namespace Tapeti.Flow.Default
public async Task Execute(IMessageContext context, IYieldPoint yieldPoint) public async Task Execute(IMessageContext context, IYieldPoint yieldPoint)
{ {
var executableYieldPoint = yieldPoint as DelegateYieldPoint; if (!(yieldPoint is DelegateYieldPoint executableYieldPoint))
if (executableYieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Binding.Method.Name}"); throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Binding.Method.Name}");
FlowContext flowContext; FlowContext flowContext;
object flowContextItem;
if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextItem)) if (!context.Items.TryGetValue(ContextItems.FlowContext, out var flowContextItem))
{ {
flowContext = new FlowContext flowContext = new FlowContext
{ {
@ -202,7 +203,6 @@ namespace Tapeti.Flow.Default
try try
{ {
if (executableYieldPoint != null)
await executableYieldPoint.Execute(flowContext); await executableYieldPoint.Execute(flowContext);
} }
catch (YieldPointException e) catch (YieldPointException e)

View File

@ -12,16 +12,16 @@ namespace Tapeti.Flow.Default
public FlowMetadata Metadata public FlowMetadata Metadata
{ {
get { return metadata ?? (metadata = new FlowMetadata()); } get => metadata ?? (metadata = new FlowMetadata());
set { metadata = value; } set => metadata = value;
} }
public string Data { get; set; } public string Data { get; set; }
public Dictionary<Guid, ContinuationMetadata> Continuations public Dictionary<Guid, ContinuationMetadata> Continuations
{ {
get { return continuations ?? (continuations = new Dictionary<Guid, ContinuationMetadata>()); } get => continuations ?? (continuations = new Dictionary<Guid, ContinuationMetadata>());
set { continuations = value; } set => continuations = value;
} }

View File

@ -1,9 +1,7 @@
using Newtonsoft.Json; using System;
using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Flow.FlowHelpers; using Tapeti.Flow.FlowHelpers;
@ -11,13 +9,13 @@ namespace Tapeti.Flow.Default
{ {
public class FlowStore : IFlowStore public class FlowStore : IFlowStore
{ {
private readonly ConcurrentDictionary<Guid, FlowState> FlowStates = new ConcurrentDictionary<Guid, FlowState>(); private readonly ConcurrentDictionary<Guid, FlowState> flowStates = new ConcurrentDictionary<Guid, FlowState>();
private readonly ConcurrentDictionary<Guid, Guid> ContinuationLookup = new ConcurrentDictionary<Guid, Guid>(); private readonly ConcurrentDictionary<Guid, Guid> continuationLookup = new ConcurrentDictionary<Guid, Guid>();
private readonly LockCollection<Guid> Locks = new LockCollection<Guid>(EqualityComparer<Guid>.Default); private readonly LockCollection<Guid> locks = new LockCollection<Guid>(EqualityComparer<Guid>.Default);
private readonly IFlowRepository repository; private readonly IFlowRepository repository;
private volatile bool InUse = false; private volatile bool inUse;
public FlowStore(IFlowRepository repository) public FlowStore(IFlowRepository repository)
{ {
@ -27,36 +25,35 @@ namespace Tapeti.Flow.Default
public async Task Load() public async Task Load()
{ {
if (InUse) if (inUse)
throw new InvalidOperationException("Can only load the saved state once."); throw new InvalidOperationException("Can only load the saved state once.");
InUse = true; inUse = true;
FlowStates.Clear(); flowStates.Clear();
ContinuationLookup.Clear(); continuationLookup.Clear();
foreach (var flowStateRecord in await repository.GetStates<FlowState>()) foreach (var flowStateRecord in await repository.GetStates<FlowState>())
{ {
FlowStates.TryAdd(flowStateRecord.Key, flowStateRecord.Value); flowStates.TryAdd(flowStateRecord.Key, flowStateRecord.Value);
foreach (var continuation in flowStateRecord.Value.Continuations) foreach (var continuation in flowStateRecord.Value.Continuations)
ContinuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key); continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key);
} }
} }
public Task<Guid?> FindFlowID(Guid continuationID) public Task<Guid?> FindFlowID(Guid continuationID)
{ {
Guid result; return Task.FromResult(continuationLookup.TryGetValue(continuationID, out var result) ? result : (Guid?)null);
return Task.FromResult(ContinuationLookup.TryGetValue(continuationID, out result) ? result : (Guid?)null);
} }
public async Task<IFlowStateLock> LockFlowState(Guid flowID) public async Task<IFlowStateLock> LockFlowState(Guid flowID)
{ {
InUse = true; inUse = true;
var flowStatelock = new FlowStateLock(this, flowID, await Locks.GetLock(flowID)); var flowStatelock = new FlowStateLock(this, flowID, await locks.GetLock(flowID));
return flowStatelock; return flowStatelock;
} }
@ -74,7 +71,7 @@ namespace Tapeti.Flow.Default
this.flowID = flowID; this.flowID = flowID;
this.flowLock = flowLock; this.flowLock = flowLock;
owner.FlowStates.TryGetValue(flowID, out flowState); owner.flowStates.TryGetValue(flowID, out flowState);
} }
public void Dispose() public void Dispose()
@ -106,20 +103,17 @@ namespace Tapeti.Flow.Default
if (flowState != null) if (flowState != null)
{ {
foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k))) foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k)))
{ owner.continuationLookup.TryRemove(removedContinuation, out _);
Guid removedValue;
owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue);
}
} }
foreach (var addedContinuation in newFlowState.Continuations.Where(c => flowState == null || !flowState.Continuations.ContainsKey(c.Key))) foreach (var addedContinuation in newFlowState.Continuations.Where(c => flowState == null || !flowState.Continuations.ContainsKey(c.Key)))
{ {
owner.ContinuationLookup.TryAdd(addedContinuation.Key, flowID); owner.continuationLookup.TryAdd(addedContinuation.Key, flowID);
} }
var isNew = flowState == null; var isNew = flowState == null;
flowState = newFlowState; flowState = newFlowState;
owner.FlowStates[flowID] = newFlowState; owner.flowStates[flowID] = newFlowState;
// Storing the flowstate in the underlying repository // Storing the flowstate in the underlying repository
if (isNew) if (isNew)
@ -141,13 +135,9 @@ namespace Tapeti.Flow.Default
if (flowState != null) if (flowState != null)
{ {
foreach (var removedContinuation in flowState.Continuations.Keys) foreach (var removedContinuation in flowState.Continuations.Keys)
{ owner.continuationLookup.TryRemove(removedContinuation, out _);
Guid removedValue;
owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue);
}
FlowState removedFlow; owner.flowStates.TryRemove(flowID, out _);
owner.FlowStates.TryRemove(flowID, out removedFlow);
if (flowState != null) if (flowState != null)
{ {

View File

@ -1,6 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Tapeti.Flow.Default namespace Tapeti.Flow.Default

View File

@ -1,7 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Tapeti.Flow.FlowHelpers namespace Tapeti.Flow.FlowHelpers
@ -17,14 +15,14 @@ namespace Tapeti.Flow.FlowHelpers
public Task<IDisposable> GetLock(T key) public Task<IDisposable> GetLock(T key)
{ {
// ReSharper disable once InconsistentlySynchronizedField - by design
LockItem nextLi = new LockItem(locks, key); LockItem nextLi = new LockItem(locks, key);
try try
{ {
bool continueImmediately = false; bool continueImmediately = false;
lock (locks) lock (locks)
{ {
LockItem li; if (!locks.TryGetValue(key, out var li))
if (!locks.TryGetValue(key, out li))
{ {
locks.Add(key, nextLi); locks.Add(key, nextLi);
continueImmediately = true; continueImmediately = true;
@ -80,8 +78,7 @@ namespace Tapeti.Flow.FlowHelpers
{ {
lock (locks) lock (locks)
{ {
LockItem li; if (!locks.TryGetValue(key, out var li))
if (!locks.TryGetValue(key, out li))
return; return;
if (li != this) if (li != this)

View File

@ -6,7 +6,7 @@ namespace Tapeti.Flow
{ {
public class FlowMiddleware : ITapetiExtension public class FlowMiddleware : ITapetiExtension
{ {
private IFlowRepository flowRepository; private readonly IFlowRepository flowRepository;
public FlowMiddleware(IFlowRepository flowRepository) public FlowMiddleware(IFlowRepository flowRepository)
{ {
@ -18,7 +18,7 @@ namespace Tapeti.Flow
container.RegisterDefault<IFlowProvider, FlowProvider>(); container.RegisterDefault<IFlowProvider, FlowProvider>();
container.RegisterDefault<IFlowStarter, FlowStarter>(); container.RegisterDefault<IFlowStarter, FlowStarter>();
container.RegisterDefault<IFlowHandler, FlowProvider>(); container.RegisterDefault<IFlowHandler, FlowProvider>();
container.RegisterDefaultSingleton<IFlowRepository>(() => flowRepository ?? new NonPersistentFlowRepository()); container.RegisterDefaultSingleton(() => flowRepository ?? new NonPersistentFlowRepository());
container.RegisterDefaultSingleton<IFlowStore, FlowStore>(); container.RegisterDefaultSingleton<IFlowStore, FlowStore>();
} }

View File

@ -3,6 +3,8 @@ using System.Linq.Expressions;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Flow namespace Tapeti.Flow
{ {
public interface IFlowProvider public interface IFlowProvider

View File

@ -1,6 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Tapeti.Flow namespace Tapeti.Flow
@ -12,13 +11,4 @@ namespace Tapeti.Flow
Task UpdateState<T>(Guid flowID, T state); Task UpdateState<T>(Guid flowID, T state);
Task DeleteState(Guid flowID); Task DeleteState(Guid flowID);
} }
public class FlowStateRecord
{
public Guid FlowID;
public string Metadata;
public string Data;
public Dictionary<Guid, string> ContinuationMetadata;
}
} }

View File

@ -1,8 +1,9 @@
using System; using System;
using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Flow.Default; using Tapeti.Flow.Default;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Flow namespace Tapeti.Flow
{ {
public interface IFlowStore public interface IFlowStore

View File

@ -5,6 +5,5 @@ namespace Tapeti.Flow
public class YieldPointException : Exception public class YieldPointException : Exception
{ {
public YieldPointException(string message) : base(message) { } public YieldPointException(string message) : base(message) { }
public YieldPointException(string message, Exception innerException) : base(message, innerException) { }
} }
} }

View File

@ -1,6 +1,8 @@
using System; using System;
using ISeriLogger = Serilog.ILogger; using ISeriLogger = Serilog.ILogger;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Serilog namespace Tapeti.Serilog
{ {
public class TapetiSeriLogger: ILogger public class TapetiSeriLogger: ILogger

View File

@ -1,8 +1,7 @@
using Tapeti; using Tapeti.Helpers;
using Tapeti.Helpers;
using Xunit; using Xunit;
namespace Tapet.Tests namespace Tapeti.Tests
{ {
// ReSharper disable InconsistentNaming // ReSharper disable InconsistentNaming
public class ConnectionStringParserTest public class ConnectionStringParserTest
@ -119,9 +118,7 @@ namespace Tapet.Tests
[Fact] [Fact]
public void OnlySemicolons() public void OnlySemicolons()
{ {
AssertConnectionString(";;;", new TapetiConnectionParams AssertConnectionString(";;;", new TapetiConnectionParams());
{
});
} }
[Fact] [Fact]
@ -184,7 +181,8 @@ namespace Tapet.Tests
}); });
} }
private void AssertConnectionString(string connectionstring, TapetiConnectionParams expected) // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local
private static void AssertConnectionString(string connectionstring, TapetiConnectionParams expected)
{ {
var parsed = ConnectionStringParser.Parse(connectionstring); var parsed = ConnectionStringParser.Parse(connectionstring);

View File

@ -2,7 +2,7 @@
using Tapeti.Default; using Tapeti.Default;
using Xunit; using Xunit;
namespace Tapet.Tests namespace Tapeti.Tests
{ {
// ReSharper disable InconsistentNaming // ReSharper disable InconsistentNaming
public class TypeNameRoutingKeyStrategyTests public class TypeNameRoutingKeyStrategyTests

4
Tapeti.sln.DotSettings Normal file
View File

@ -0,0 +1,4 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ID/@EntryIndexedValue">ID</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=KV/@EntryIndexedValue">KV</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /&gt;</s:String></wpf:ResourceDictionary>

View File

@ -1,8 +1,4 @@
using System; using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Config namespace Tapeti.Config
{ {

View File

@ -1,8 +1,6 @@
using System; using System;
using System.Collections.Generic;
using System.Linq; // ReSharper disable UnusedMember.Global
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Config namespace Tapeti.Config
{ {

View File

@ -2,6 +2,8 @@
using System.Collections.Generic; using System.Collections.Generic;
using RabbitMQ.Client; using RabbitMQ.Client;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Config namespace Tapeti.Config
{ {
public interface IMessageContext : IDisposable public interface IMessageContext : IDisposable

View File

@ -1,5 +1,7 @@
using RabbitMQ.Client; using RabbitMQ.Client;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Config namespace Tapeti.Config
{ {
public interface IPublishContext public interface IPublishContext

View File

@ -1,10 +1,4 @@
using System; namespace Tapeti.Connection
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Connection
{ {
public interface IConnectionEventListener public interface IConnectionEventListener
{ {

View File

@ -5,7 +5,6 @@ using System.Runtime.ExceptionServices;
using RabbitMQ.Client; using RabbitMQ.Client;
using Tapeti.Config; using Tapeti.Config;
using Tapeti.Default; using Tapeti.Default;
using Tapeti.Helpers;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Tapeti.Connection namespace Tapeti.Connection
@ -42,7 +41,6 @@ namespace Tapeti.Connection
{ {
Task.Run(async () => Task.Run(async () =>
{ {
ExceptionDispatchInfo exception = null;
MessageContext context = null; MessageContext context = null;
HandlingResult handlingResult = null; HandlingResult handlingResult = null;
try try
@ -67,7 +65,7 @@ namespace Tapeti.Connection
} }
catch (Exception eDispatch) catch (Exception eDispatch)
{ {
exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch)); var exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch));
logger.HandlerException(eDispatch); logger.HandlerException(eDispatch);
try try
{ {
@ -119,10 +117,7 @@ namespace Tapeti.Connection
} }
try try
{ {
if (context != null) context?.Dispose();
{
context.Dispose();
}
} }
catch (Exception eDispose) catch (Exception eDispose)
{ {
@ -178,25 +173,25 @@ namespace Tapeti.Connection
RecursiveCaller firstCaller = null; RecursiveCaller firstCaller = null;
RecursiveCaller currentCaller = null; RecursiveCaller currentCaller = null;
Action<Handler> addHandler = (Handler handle) => void AddHandler(Handler handle)
{ {
var caller = new RecursiveCaller(handle); var caller = new RecursiveCaller(handle);
if (currentCaller == null) if (currentCaller == null)
firstCaller = caller; firstCaller = caller;
else else
currentCaller.next = caller; currentCaller.Next = caller;
currentCaller = caller; currentCaller = caller;
}; }
if (binding.MessageFilterMiddleware != null) if (binding.MessageFilterMiddleware != null)
{ {
foreach (var m in binding.MessageFilterMiddleware) foreach (var m in binding.MessageFilterMiddleware)
{ {
addHandler(m.Handle); AddHandler(m.Handle);
} }
} }
addHandler(async (c, next) => AddHandler(async (c, next) =>
{ {
c.Controller = dependencyResolver.Resolve(binding.Controller); c.Controller = dependencyResolver.Resolve(binding.Controller);
await next(); await next();
@ -204,18 +199,18 @@ namespace Tapeti.Connection
foreach (var m in messageMiddleware) foreach (var m in messageMiddleware)
{ {
addHandler(m.Handle); AddHandler(m.Handle);
} }
if (binding.MessageMiddleware != null) if (binding.MessageMiddleware != null)
{ {
foreach (var m in binding.MessageMiddleware) foreach (var m in binding.MessageMiddleware)
{ {
addHandler(m.Handle); AddHandler(m.Handle);
} }
} }
addHandler(async (c, next) => AddHandler(async (c, next) =>
{ {
await binding.Invoke(c, message); await binding.Invoke(c, message);
}); });
@ -244,10 +239,11 @@ namespace Tapeti.Connection
public class RecursiveCaller public class RecursiveCaller
{ {
private Handler handle; private readonly Handler handle;
private MessageContext currentContext; private MessageContext currentContext;
private MessageContext nextContext; private MessageContext nextContext;
public RecursiveCaller next;
public RecursiveCaller Next;
public RecursiveCaller(Handler handle) public RecursiveCaller(Handler handle)
{ {
@ -263,9 +259,9 @@ namespace Tapeti.Connection
{ {
currentContext = context; currentContext = context;
context.UseNestedContext = next == null ? (Action<MessageContext>)null : UseNestedContext; context.UseNestedContext = Next == null ? (Action<MessageContext>)null : UseNestedContext;
await handle(context, callNext); await handle(context, CallNext);
} }
finally finally
{ {
@ -273,18 +269,18 @@ namespace Tapeti.Connection
} }
} }
private async Task callNext() private async Task CallNext()
{ {
if (next == null) if (Next == null)
return; return;
if (nextContext != null) if (nextContext != null)
{ {
await next.Call(nextContext); await Next.Call(nextContext);
}else }else
{ {
try try
{ {
await next.Call(currentContext); await Next.Call(currentContext);
} }
finally finally
{ {

View File

@ -1,6 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using RabbitMQ.Client; using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Exceptions;

View File

@ -6,17 +6,17 @@ namespace Tapeti.Default
{ {
public void Connect(TapetiConnectionParams connectionParams) public void Connect(TapetiConnectionParams connectionParams)
{ {
throw new System.NotImplementedException(); throw new NotImplementedException();
} }
public void ConnectFailed(TapetiConnectionParams connectionParams) public void ConnectFailed(TapetiConnectionParams connectionParams)
{ {
throw new System.NotImplementedException(); throw new NotImplementedException();
} }
public void ConnectSuccess(TapetiConnectionParams connectionParams) public void ConnectSuccess(TapetiConnectionParams connectionParams)
{ {
throw new System.NotImplementedException(); throw new NotImplementedException();
} }
public void HandlerException(Exception e) public void HandlerException(Exception e)

View File

@ -1,8 +1,4 @@
using System; using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
namespace Tapeti.Default namespace Tapeti.Default
@ -22,19 +18,8 @@ namespace Tapeti.Default
private HandlingResultBuilder handlingResult; private HandlingResultBuilder handlingResult;
public HandlingResultBuilder HandlingResult public HandlingResultBuilder HandlingResult
{ {
get get => handlingResult ?? (handlingResult = new HandlingResultBuilder());
{ set => handlingResult = value;
if (handlingResult == null)
{
handlingResult = new HandlingResultBuilder();
}
return handlingResult;
}
set
{
handlingResult = value;
}
} }
} }
} }

View File

@ -45,12 +45,10 @@ namespace Tapeti.Default
public object Deserialize(byte[] body, IBasicProperties properties) public object Deserialize(byte[] body, IBasicProperties properties)
{ {
object typeName;
if (properties.ContentType == null || !properties.ContentType.Equals(ContentType)) if (properties.ContentType == null || !properties.ContentType.Equals(ContentType))
throw new ArgumentException($"content_type must be {ContentType}"); throw new ArgumentException($"content_type must be {ContentType}");
if (properties.Headers == null || !properties.Headers.TryGetValue(ClassTypeHeader, out typeName)) if (properties.Headers == null || !properties.Headers.TryGetValue(ClassTypeHeader, out var typeName))
throw new ArgumentException($"{ClassTypeHeader} header not present"); throw new ArgumentException($"{ClassTypeHeader} header not present");
var messageType = deserializedTypeNames.GetOrAdd(Encoding.UTF8.GetString((byte[])typeName), DeserializeTypeName); var messageType = deserializedTypeNames.GetOrAdd(Encoding.UTF8.GetString((byte[])typeName), DeserializeTypeName);

View File

@ -21,7 +21,6 @@ namespace Tapeti.Default
public IDictionary<string, object> Items { get; } public IDictionary<string, object> Items { get; }
private readonly MessageContext outerContext;
internal Action<MessageContext> UseNestedContext; internal Action<MessageContext> UseNestedContext;
internal Action<MessageContext> OnContextDisposed; internal Action<MessageContext> OnContextDisposed;
@ -43,8 +42,6 @@ namespace Tapeti.Default
Properties = outerContext.Properties; Properties = outerContext.Properties;
Items = new DeferingDictionary(outerContext.Items); Items = new DeferingDictionary(outerContext.Items);
this.outerContext = outerContext;
} }
public void Dispose() public void Dispose()
@ -71,8 +68,8 @@ namespace Tapeti.Default
private class DeferingDictionary : IDictionary<string, object> private class DeferingDictionary : IDictionary<string, object>
{ {
private IDictionary<string, object> myState; private readonly IDictionary<string, object> myState;
private IDictionary<string, object> deferee; private readonly IDictionary<string, object> deferee;
public DeferingDictionary(IDictionary<string, object> deferee) public DeferingDictionary(IDictionary<string, object> deferee)
{ {
@ -84,10 +81,7 @@ namespace Tapeti.Default
object IDictionary<string, object>.this[string key] object IDictionary<string, object>.this[string key]
{ {
get get => myState.ContainsKey(key) ? myState[key] : deferee[key];
{
return myState.ContainsKey(key) ? myState[key] : deferee[key];
}
set set
{ {
@ -98,37 +92,10 @@ namespace Tapeti.Default
} }
} }
int ICollection<KeyValuePair<string, object>>.Count int ICollection<KeyValuePair<string, object>>.Count => myState.Count + deferee.Count;
{ bool ICollection<KeyValuePair<string, object>>.IsReadOnly => false;
get ICollection<string> IDictionary<string, object>.Keys => myState.Keys.Concat(deferee.Keys).ToList().AsReadOnly();
{ ICollection<object> IDictionary<string, object>.Values => myState.Values.Concat(deferee.Values).ToList().AsReadOnly();
return myState.Count + deferee.Count;
}
}
bool ICollection<KeyValuePair<string, object>>.IsReadOnly
{
get
{
return false;
}
}
ICollection<string> IDictionary<string, object>.Keys
{
get
{
return myState.Keys.Concat(deferee.Keys).ToList().AsReadOnly();
}
}
ICollection<object> IDictionary<string, object>.Values
{
get
{
return myState.Values.Concat(deferee.Values).ToList().AsReadOnly();
}
}
void ICollection<KeyValuePair<string, object>>.Add(KeyValuePair<string, object> item) void ICollection<KeyValuePair<string, object>>.Add(KeyValuePair<string, object> item)
{ {

View File

@ -1,9 +1,4 @@
using System; using Tapeti.Config;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tapeti.Config;
namespace Tapeti.Default namespace Tapeti.Default
{ {

View File

@ -1,5 +1,5 @@
using System; using System;
using System.Linq.Expressions; using System.Diagnostics;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using RabbitMQ.Client.Framing; using RabbitMQ.Client.Framing;
@ -18,15 +18,14 @@ namespace Tapeti.Default
return; return;
bool isTaskOf; if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out var isTaskOf, out var actualType))
Type actualType;
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out isTaskOf, out actualType))
return; return;
if (isTaskOf) if (isTaskOf)
{ {
var handler = GetType().GetMethod("PublishGenericTaskResult", BindingFlags.NonPublic | BindingFlags.Static).MakeGenericMethod(actualType); var handler = GetType().GetMethod("PublishGenericTaskResult", BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType);
Debug.Assert(handler != null, nameof(handler) + " != null");
context.Result.SetHandler(async (messageContext, value) => context.Result.SetHandler(async (messageContext, value) =>
{ {
@ -54,13 +53,5 @@ namespace Tapeti.Default
return publisher.Publish(message, properties); return publisher.Publish(message, properties);
} }
private static async Task PublishGenericTaskResult<T>(IMessageContext messageContext, object value) where T : class
{
var message = await (Task<T>)value;
if (message != null)
await Reply(message, messageContext);
}
} }
} }

View File

@ -1,5 +1,6 @@
using System; using Tapeti.Config;
using Tapeti.Config;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Default namespace Tapeti.Default
{ {

View File

@ -2,7 +2,6 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
namespace Tapeti.Default namespace Tapeti.Default

View File

@ -1,8 +1,4 @@
using System; // ReSharper disable UnusedMember.Global
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti namespace Tapeti
{ {
@ -34,26 +30,14 @@ namespace Tapeti
private HandlingResult data = Default; private HandlingResult data = Default;
public ConsumeResponse ConsumeResponse { public ConsumeResponse ConsumeResponse {
get get => data.ConsumeResponse;
{ set => GetWritableData().ConsumeResponse = value;
return data.ConsumeResponse;
}
set
{
GetWritableData().ConsumeResponse = value;
}
} }
public MessageAction MessageAction public MessageAction MessageAction
{ {
get get => data.MessageAction;
{ set => GetWritableData().MessageAction = value;
return data.MessageAction;
}
set
{
GetWritableData().MessageAction = value;
}
} }
public HandlingResult ToHandlingResult() public HandlingResult ToHandlingResult()

View File

@ -1,18 +1,14 @@
using System; using System.Text;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti.Helpers namespace Tapeti.Helpers
{ {
public class ConnectionStringParser public class ConnectionStringParser
{ {
readonly TapetiConnectionParams result = new TapetiConnectionParams(); private readonly TapetiConnectionParams result = new TapetiConnectionParams();
readonly string connectionstring; private readonly string connectionstring;
int pos = -1; private int pos = -1;
char current = '\0'; private char current = '\0';
public static TapetiConnectionParams Parse(string connectionstring) public static TapetiConnectionParams Parse(string connectionstring)
{ {
@ -33,16 +29,12 @@ namespace Tapeti.Helpers
{ {
var key = ParseKey(); var key = ParseKey();
if (current == '=') if (current != '=')
{ return;
var value = ParseValue(); var value = ParseValue();
SetValue(key, value); SetValue(key, value);
} }
else
{
EnableKey(key);
}
}
private string ParseKey() private string ParseKey()
{ {
@ -112,11 +104,6 @@ namespace Tapeti.Helpers
return false; return false;
} }
private void EnableKey(string key)
{
}
private void SetValue(string key, string value) private void SetValue(string key, string value)
{ {
switch (key.ToLowerInvariant()) { switch (key.ToLowerInvariant()) {

View File

@ -15,18 +15,16 @@ namespace Tapeti.Helpers
return; return;
} }
Action handleNext = null; void HandleNext()
handleNext = () =>
{ {
handlerIndex--; handlerIndex--;
if (handlerIndex >= 0) if (handlerIndex >= 0)
handle(middleware[handlerIndex], handleNext); handle(middleware[handlerIndex], HandleNext);
else else
lastHandler(); lastHandler();
}; }
handle(middleware[handlerIndex], handleNext); handle(middleware[handlerIndex], HandleNext);
} }
@ -39,18 +37,16 @@ namespace Tapeti.Helpers
return; return;
} }
Func<Task> handleNext = null; async Task HandleNext()
handleNext = async () =>
{ {
handlerIndex--; handlerIndex--;
if (handlerIndex >= 0) if (handlerIndex >= 0)
await handle(middleware[handlerIndex], handleNext); await handle(middleware[handlerIndex], HandleNext);
else else
await lastHandler(); await lastHandler();
}; }
await handle(middleware[handlerIndex], handleNext); await handle(middleware[handlerIndex], HandleNext);
} }
} }
} }

View File

@ -34,8 +34,7 @@ namespace Tapeti.Helpers
public static bool IsTypeOrTaskOf(this Type type, Func<Type, bool> predicate, out bool isTaskOf) public static bool IsTypeOrTaskOf(this Type type, Func<Type, bool> predicate, out bool isTaskOf)
{ {
Type actualType; return IsTypeOrTaskOf(type, predicate, out isTaskOf, out _);
return IsTypeOrTaskOf(type, predicate, out isTaskOf, out actualType);
} }
public static bool IsTypeOrTaskOf(this Type type, Type compareTo, out bool isTaskOf) public static bool IsTypeOrTaskOf(this Type type, Type compareTo, out bool isTaskOf)

View File

@ -1,6 +1,8 @@
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
// ReSharper disable UnusedMember.Global
namespace Tapeti namespace Tapeti
{ {
public interface IConnection : IDisposable public interface IConnection : IDisposable

View File

@ -1,5 +1,4 @@
using System; using Tapeti.Config;
using Tapeti.Config;
namespace Tapeti namespace Tapeti
{ {

View File

@ -1,5 +1,7 @@
using System; using System;
// ReSharper disable UnusedMember.Global
namespace Tapeti namespace Tapeti
{ {
// This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog) // This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog)

View File

@ -1,5 +1,7 @@
using Tapeti.Annotations; using Tapeti.Annotations;
// ReSharper disable UnusedMember.Global
namespace Tapeti namespace Tapeti
{ {
/// <summary> /// <summary>

View File

@ -1,8 +1,4 @@
using System; // ReSharper disable UnusedMember.Global
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tapeti namespace Tapeti
{ {

View File

@ -18,19 +18,19 @@ namespace Tapeti
public TapetiAppSettingsConnectionParams(string prefix = DefaultPrefix) public TapetiAppSettingsConnectionParams(string prefix = DefaultPrefix)
{ {
var keys = ConfigurationManager.AppSettings.AllKeys; var keys = ConfigurationManager.AppSettings.AllKeys;
Action<string, Action<string>> getAppSetting = (key, setValue) =>
void GetAppSetting(string key, Action<string> setValue)
{ {
if (keys.Contains(prefix + key)) if (keys.Contains(prefix + key)) setValue(ConfigurationManager.AppSettings[prefix + key]);
setValue(ConfigurationManager.AppSettings[prefix + key]); }
};
getAppSetting(KeyHostname, value => HostName = value); GetAppSetting(KeyHostname, value => HostName = value);
getAppSetting(KeyPort, value => Port = int.Parse(value)); GetAppSetting(KeyPort, value => Port = int.Parse(value));
getAppSetting(KeyVirtualHost, value => VirtualHost = value); GetAppSetting(KeyVirtualHost, value => VirtualHost = value);
getAppSetting(KeyUsername, value => Username = value); GetAppSetting(KeyUsername, value => Username = value);
getAppSetting(KeyPassword, value => Password = value); GetAppSetting(KeyPassword, value => Password = value);
getAppSetting(KeyPrefetchCount, value => PrefetchCount = ushort.Parse(value)); GetAppSetting(KeyPrefetchCount, value => PrefetchCount = ushort.Parse(value));
} }
} }
} }

View File

@ -127,8 +127,7 @@ namespace Tapeti
public TapetiConfig Use(ITapetiExtension extension) public TapetiConfig Use(ITapetiExtension extension)
{ {
var container = dependencyResolver as IDependencyContainer; if (dependencyResolver is IDependencyContainer container)
if (container != null)
extension.RegisterDefaults(container); extension.RegisterDefaults(container);
var middlewareBundle = extension.GetMiddleware(dependencyResolver); var middlewareBundle = extension.GetMiddleware(dependencyResolver);
@ -139,14 +138,14 @@ namespace Tapeti
foreach (var middleware in middlewareBundle) foreach (var middleware in middlewareBundle)
{ {
// ReSharper disable once CanBeReplacedWithTryCastAndCheckForNull // ReSharper disable once CanBeReplacedWithTryCastAndCheckForNull
if (middleware is IBindingMiddleware) if (middleware is IBindingMiddleware bindingExtension)
Use((IBindingMiddleware)middleware); Use(bindingExtension);
else if (middleware is IMessageMiddleware) else if (middleware is IMessageMiddleware messageExtension)
Use((IMessageMiddleware)middleware); Use(messageExtension);
else if (middleware is ICleanupMiddleware) else if (middleware is ICleanupMiddleware cleanupExtension)
Use((ICleanupMiddleware)middleware); Use(cleanupExtension);
else if (middleware is IPublishMiddleware) else if (middleware is IPublishMiddleware publishExtension)
Use((IPublishMiddleware)middleware); Use(publishExtension);
else else
throw new ArgumentException($"Unsupported middleware implementation: {(middleware == null ? "null" : middleware.GetType().Name)}"); throw new ArgumentException($"Unsupported middleware implementation: {(middleware == null ? "null" : middleware.GetType().Name)}");
} }
@ -181,7 +180,7 @@ namespace Tapeti
(dependencyResolver as IDependencyContainer)?.RegisterController(controller); (dependencyResolver as IDependencyContainer)?.RegisterController(controller);
foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance) foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance)
.Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && !(m as MethodInfo).IsSpecialName) .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false)
.Select(m => (MethodInfo)m)) .Select(m => (MethodInfo)m))
{ {
var context = new BindingContext(method); var context = new BindingContext(method);
@ -423,8 +422,7 @@ namespace Tapeti
public IBinding GetBinding(Delegate method) public IBinding GetBinding(Delegate method)
{ {
IBinding binding; return bindingMethodLookup.TryGetValue(method.Method, out var binding) ? binding : null;
return bindingMethodLookup.TryGetValue(method.Method, out binding) ? binding : null;
} }
} }
@ -465,7 +463,7 @@ namespace Tapeti
private QueueInfo queueInfo; private QueueInfo queueInfo;
public QueueInfo QueueInfo public QueueInfo QueueInfo
{ {
get { return queueInfo; } get => queueInfo;
set set
{ {
QueueName = (value?.Dynamic).GetValueOrDefault() ? value?.Name : null; QueueName = (value?.Dynamic).GetValueOrDefault() ? value?.Name : null;

View File

@ -4,6 +4,8 @@ using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
using Tapeti.Connection; using Tapeti.Connection;
// ReSharper disable UnusedMember.Global
namespace Tapeti namespace Tapeti
{ {
public class TapetiConnection : IDisposable public class TapetiConnection : IDisposable

View File

@ -1,5 +1,7 @@
using System; using System;
// ReSharper disable UnusedMember.Global
namespace Tapeti namespace Tapeti
{ {
public class TapetiConnectionParams public class TapetiConnectionParams

View File

@ -3,6 +3,8 @@ using Tapeti.Annotations;
using Tapeti.Flow; using Tapeti.Flow;
using Tapeti.Flow.Annotations; using Tapeti.Flow.Annotations;
// ReSharper disable UnusedMember.Global
namespace Test namespace Test
{ {
[MessageController] [MessageController]

View File

@ -1,31 +1,32 @@
using System; using System;
using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti;
using Tapeti.Annotations; using Tapeti.Annotations;
using Tapeti.Flow; using Tapeti.Flow;
using Tapeti.Flow.Annotations; using Tapeti.Flow.Annotations;
// ReSharper disable UnusedMember.Global
namespace Test namespace Test
{ {
[MessageController] [MessageController]
[DynamicQueue] [DynamicQueue]
public class MarcoController public class MarcoController
{ {
private readonly IPublisher publisher; //private readonly IPublisher publisher;
private readonly IFlowProvider flowProvider; private readonly IFlowProvider flowProvider;
private readonly Visualizer visualizer; //private readonly Visualizer visualizer;
// Public properties are automatically stored and retrieved while in a flow // Public properties are automatically stored and retrieved while in a flow
public Guid StateTestGuid { get; set; } public Guid StateTestGuid { get; set; }
public int Phase; public int Phase;
public MarcoController(IPublisher publisher, IFlowProvider flowProvider, Visualizer visualizer) public MarcoController(/*IPublisher publisher, */IFlowProvider flowProvider/*, Visualizer visualizer*/)
{ {
this.publisher = publisher; //this.publisher = publisher;
this.flowProvider = flowProvider; this.flowProvider = flowProvider;
this.visualizer = visualizer; //this.visualizer = visualizer;
} }
@ -68,16 +69,13 @@ namespace Test
} }
/** [Start]
* The Visualizer could've been injected through the constructor, which is public IYieldPoint TestParallelRequest()
* the recommended way. Just testing the injection middleware here.
*/
public async Task<IYieldPoint> Marco(MarcoMessage message, Visualizer myVisualizer)
{ {
Console.WriteLine(">> Marco (yielding with request)"); Console.WriteLine(">> Marco (yielding with request)");
await myVisualizer.VisualizeMarco();
StateTestGuid = Guid.NewGuid(); StateTestGuid = Guid.NewGuid();
Console.WriteLine($"Starting parallel request with StateTestGuid {StateTestGuid}");
return flowProvider.YieldWithParallelRequest() return flowProvider.YieldWithParallelRequest()
.AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage .AddRequestSync<PoloConfirmationRequestMessage, PoloConfirmationResponseMessage>(new PoloConfirmationRequestMessage
@ -113,7 +111,7 @@ namespace Test
private IYieldPoint ContinuePoloConfirmation() private IYieldPoint ContinuePoloConfirmation()
{ {
Console.WriteLine("> ConvergePoloConfirmation (ending flow)"); Console.WriteLine("> ConvergePoloConfirmation (ending flow)");
return flowProvider.EndWithResponse(new PoloMessage()); return flowProvider.End();
} }

View File

@ -1,18 +1,17 @@
using System.Threading; using System.Threading.Tasks;
using System.Threading.Tasks;
using Tapeti;
namespace Test namespace Test
{ {
public class MarcoEmitter public class MarcoEmitter
{ {
private readonly IPublisher publisher; //private readonly IPublisher publisher;
public MarcoEmitter(IPublisher publisher) /*public MarcoEmitter(IPublisher publisher)
{ {
this.publisher = publisher; this.publisher = publisher;
} }
*/
public async Task Run() public async Task Run()
@ -45,6 +44,7 @@ namespace Test
{ {
await Task.Delay(1000); await Task.Delay(1000);
} }
// ReSharper disable once FunctionNeverReturns
} }
} }
} }

View File

@ -1,10 +1,8 @@
using System; using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tapeti; using Tapeti;
// ReSharper disable UnusedMember.Global
namespace Test namespace Test
{ {
public class MyLogger : ILogger public class MyLogger : ILogger

View File

@ -1,11 +1,8 @@
using System; using System;
using System.Threading.Tasks;
using SimpleInjector; using SimpleInjector;
using Tapeti; using Tapeti;
using Tapeti.DataAnnotations; using Tapeti.DataAnnotations;
using Tapeti.Flow; using Tapeti.Flow;
using Tapeti.Flow.SQL;
using Tapeti.Helpers;
using Tapeti.SimpleInjector; using Tapeti.SimpleInjector;
using System.Threading; using System.Threading;
@ -61,7 +58,8 @@ namespace Test
connection.GetPublisher().Publish(new FlowEndController.PingMessage()); connection.GetPublisher().Publish(new FlowEndController.PingMessage());
container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true); //container.GetInstance<IFlowStarter>().Start<MarcoController, bool>(c => c.StartFlow, true);
container.GetInstance<IFlowStarter>().Start<MarcoController>(c => c.TestParallelRequest);
Thread.Sleep(1000); Thread.Sleep(1000);

View File

@ -1,6 +1,8 @@
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
// ReSharper disable UnusedMember.Global
namespace Test namespace Test
{ {
public class Visualizer public class Visualizer