1
0
mirror of synced 2025-01-22 08:03:07 +01:00

Merge branch 'release/3.0'

This commit is contained in:
Mark van Renswoude 2023-01-24 10:09:58 +01:00
commit 67031b0c09
156 changed files with 2582 additions and 1256 deletions

View File

@ -2,16 +2,16 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_01_PublishSubscribe</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Autofac" Version="6.2.0" />
<PackageReference Include="Castle.Windsor" Version="5.1.1" />
<PackageReference Include="Ninject" Version="3.3.4" />
<PackageReference Include="SimpleInjector" Version="5.3.0" />
<PackageReference Include="Unity" Version="5.11.10" />
<PackageReference Include="Autofac" Version="6.5.0" />
<PackageReference Include="Castle.Windsor" Version="5.1.2" />
<PackageReference Include="Ninject" Version="3.3.6" />
<PackageReference Include="SimpleInjector" Version="5.4.1" />
</ItemGroup>
<ItemGroup>
@ -20,7 +20,6 @@
<ProjectReference Include="..\..\Tapeti.DataAnnotations\Tapeti.DataAnnotations.csproj" />
<ProjectReference Include="..\..\Tapeti.Ninject\Tapeti.Ninject.csproj" />
<ProjectReference Include="..\..\Tapeti.SimpleInjector\Tapeti.SimpleInjector.csproj" />
<ProjectReference Include="..\..\Tapeti.UnityContainer\Tapeti.UnityContainer.csproj" />
<ProjectReference Include="..\..\Tapeti\Tapeti.csproj" />
<ProjectReference Include="..\ExampleLib\ExampleLib.csproj" />
<ProjectReference Include="..\Messaging.TapetiExample\Messaging.TapetiExample.csproj" />

View File

@ -13,8 +13,6 @@ using Tapeti.DataAnnotations;
using Tapeti.Default;
using Tapeti.Ninject;
using Tapeti.SimpleInjector;
using Tapeti.UnityContainer;
using Unity;
using Container = SimpleInjector.Container;
// ReSharper disable UnusedMember.Global
@ -23,14 +21,13 @@ namespace _01_PublishSubscribe
{
public class Program
{
public static void Main(string[] args)
public static void Main()
{
var dependencyResolver = GetSimpleInjectorDependencyResolver();
// or use your IoC container of choice:
//var dependencyResolver = GetAutofacDependencyResolver();
//var dependencyResolver = GetCastleWindsorDependencyResolver();
//var dependencyResolver = GetUnityDependencyResolver();
//var dependencyResolver = GetNinjectDependencyResolver();
// This helper is used because this example is not run as a service. You do not
@ -47,7 +44,7 @@ namespace _01_PublishSubscribe
.RegisterAllControllers()
.Build();
using (var connection = new TapetiConnection(config)
await using var connection = new TapetiConnection(config)
{
// Params is optional if you want to use the defaults, but we'll set it
// explicitly for this example
@ -63,28 +60,27 @@ namespace _01_PublishSubscribe
{ "example", "01 - Publish Subscribe" }
}
}
})
{
// IoC containers that separate the builder from the resolver (Autofac) must be built after
// creating a TapetConnection, as it modifies the container by injecting IPublisher.
(dependencyResolver as AutofacDependencyResolver)?.Build();
};
// IoC containers that separate the builder from the resolver (Autofac) must be built after
// creating a TapetConnection, as it modifies the container by injecting IPublisher.
(dependencyResolver as AutofacDependencyResolver)?.Build();
// Create the queues and start consuming immediately.
// If you need to do some processing before processing messages, but after the
// queues have initialized, pass false as the startConsuming parameter and store
// the returned ISubscriber. Then call Resume on it later.
await connection.Subscribe();
// Create the queues and start consuming immediately.
// If you need to do some processing before processing messages, but after the
// queues have initialized, pass false as the startConsuming parameter and store
// the returned ISubscriber. Then call Resume on it later.
await connection.Subscribe();
// We could get an IPublisher from the container directly, but since you'll usually use
// it as an injected constructor parameter this shows
await dependencyResolver.Resolve<ExamplePublisher>().SendTestMessage();
// We could get an IPublisher from the container directly, but since you'll usually use
// it as an injected constructor parameter this shows
await dependencyResolver.Resolve<ExamplePublisher>().SendTestMessage();
// Wait for the controller to signal that the message has been received
await waitForDone();
}
// Wait for the controller to signal that the message has been received
await waitForDone();
}
@ -132,17 +128,6 @@ namespace _01_PublishSubscribe
}
internal static IDependencyContainer GetUnityDependencyResolver()
{
var container = new UnityContainer();
container.RegisterType<ILogger, ConsoleLogger>();
container.RegisterType<ExamplePublisher>();
return new UnityDependencyResolver(container);
}
internal static IDependencyContainer GetNinjectDependencyResolver()
{
var kernel = new StandardKernel();

View File

@ -2,12 +2,13 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_02_DeclareDurableQueues</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="5.3.0" />
<PackageReference Include="SimpleInjector" Version="5.4.1" />
</ItemGroup>
<ItemGroup>

View File

@ -11,7 +11,7 @@ namespace _02_DeclareDurableQueues
{
public class Program
{
public static void Main(string[] args)
public static void Main()
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
@ -30,19 +30,18 @@ namespace _02_DeclareDurableQueues
.EnableDeclareDurableQueues()
.Build();
using (var connection = new TapetiConnection(config))
await using var connection = new TapetiConnection(config);
// This creates or updates the durable queue
await connection.Subscribe();
await dependencyResolver.Resolve<IPublisher>().Publish(new PublishSubscribeMessage
{
// This creates or updates the durable queue
await connection.Subscribe();
Greeting = "Hello durable queue!"
});
await dependencyResolver.Resolve<IPublisher>().Publish(new PublishSubscribeMessage
{
Greeting = "Hello durable queue!"
});
// Wait for the controller to signal that the message has been received
await waitForDone();
}
// Wait for the controller to signal that the message has been received
await waitForDone();
}
}
}

View File

@ -2,12 +2,13 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_03_FlowRequestResponse</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="5.3.0" />
<PackageReference Include="SimpleInjector" Version="5.4.1" />
</ItemGroup>
<ItemGroup>

View File

@ -15,9 +15,9 @@ namespace _03_FlowRequestResponse
private readonly IFlowProvider flowProvider;
private readonly IExampleState exampleState;
public string FirstQuote;
public string SecondQuote;
public string ThirdQuote;
public string? FirstQuote;
public string? SecondQuote;
public string? ThirdQuote;
public ParallelFlowController(IFlowProvider flowProvider, IExampleState exampleState)
@ -56,7 +56,7 @@ namespace _03_FlowRequestResponse
[Continuation]
public async Task HandleSecondQuoteResponse(QuoteResponseMessage message, IFlowParallelRequest parallelRequest)
public async ValueTask HandleSecondQuoteResponse(QuoteResponseMessage message, IFlowParallelRequest parallelRequest)
{
Console.WriteLine("[ParallelFlowController] Second quote response received");
SecondQuote = message.Quote;

View File

@ -12,7 +12,7 @@ namespace _03_FlowRequestResponse
{
public class Program
{
public static void Main(string[] args)
public static void Main()
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
@ -33,34 +33,33 @@ namespace _03_FlowRequestResponse
.Build();
using (var connection = new TapetiConnection(config))
await using var connection = new TapetiConnection(config);
// Must be called before using any flow. When using a persistent repository like the
// SQL server implementation, you can run any required update scripts (for example, using DbUp)
// before calling this Load method.
// Call after creating the TapetiConnection, as it modifies the container to inject IPublisher.
await dependencyResolver.Resolve<IFlowStore>().Load();
await connection.Subscribe();
var flowStarter = dependencyResolver.Resolve<IFlowStarter>();
var startData = new SimpleFlowController.StartData
{
// Must be called before using any flow. When using a persistent repository like the
// SQL server implementation, you can run any required update scripts (for example, using DbUp)
// before calling this Load method.
// Call after creating the TapetiConnection, as it modifies the container to inject IPublisher.
await dependencyResolver.Resolve<IFlowStore>().Load();
RequestStartTime = DateTime.Now,
Amount = 1
};
await connection.Subscribe();
await flowStarter.Start<SimpleFlowController, SimpleFlowController.StartData>(c => c.StartFlow, startData);
await flowStarter.Start<ParallelFlowController>(c => c.StartFlow);
var flowStarter = dependencyResolver.Resolve<IFlowStarter>();
var startData = new SimpleFlowController.StartData
{
RequestStartTime = DateTime.Now,
Amount = 1
};
await flowStarter.Start<SimpleFlowController, SimpleFlowController.StartData>(c => c.StartFlow, startData);
await flowStarter.Start<ParallelFlowController>(c => c.StartFlow);
// Wait for the controller to signal that the message has been received
await waitForDone();
}
// Wait for the controller to signal that the message has been received
await waitForDone();
}
}
}

View File

@ -9,25 +9,16 @@ namespace _03_FlowRequestResponse
public class ReceivingMessageController
{
// No publisher required, responses can simply be returned
public async Task<QuoteResponseMessage> HandleQuoteRequest(QuoteRequestMessage message)
public static async Task<QuoteResponseMessage> HandleQuoteRequest(QuoteRequestMessage message)
{
string quote;
switch (message.Amount)
var quote = message.Amount switch
{
case 1:
1 =>
// Well, they asked for it... :-)
quote = "'";
break;
case 2:
quote = "\"";
break;
default:
quote = new string('\'', message.Amount);
break;
}
"'",
2 => "\"",
_ => new string('\'', message.Amount)
};
// Just gonna let them wait for a bit, to demonstrate async message handlers
await Task.Delay(1000);

View File

@ -2,12 +2,13 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_04_Transient</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="5.3.0" />
<PackageReference Include="SimpleInjector" Version="5.4.1" />
</ItemGroup>
<ItemGroup>

View File

@ -13,7 +13,7 @@ namespace _04_Transient
{
public class Program
{
public static void Main(string[] args)
public static void Main()
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
@ -34,22 +34,20 @@ namespace _04_Transient
.Build();
using (var connection = new TapetiConnection(config))
{
await connection.Subscribe();
await using var connection = new TapetiConnection(config);
await connection.Subscribe();
Console.WriteLine("Sending request...");
Console.WriteLine("Sending request...");
var transientPublisher = dependencyResolver.Resolve<ITransientPublisher>();
var response = await transientPublisher.RequestResponse<LoggedInUsersRequestMessage, LoggedInUsersResponseMessage>(
new LoggedInUsersRequestMessage());
var transientPublisher = dependencyResolver.Resolve<ITransientPublisher>();
var response = await transientPublisher.RequestResponse<LoggedInUsersRequestMessage, LoggedInUsersResponseMessage>(
new LoggedInUsersRequestMessage());
Console.WriteLine("Response: " + response.Count);
Console.WriteLine("Response: " + response.Count);
// Unlike the other example, there is no need to call waitForDone, once we're here the response has been handled.
}
// Unlike the other example, there is no need to call waitForDone, once we're here the response has been handled.
}
}
}

View File

@ -2,12 +2,13 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_05_SpeedTest</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="5.3.0" />
<PackageReference Include="SimpleInjector" Version="5.4.1" />
</ItemGroup>
<ItemGroup>

View File

@ -21,7 +21,7 @@ namespace _05_SpeedTest
private const int ConcurrentTasks = 20;
public static void Main(string[] args)
public static void Main()
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
@ -52,34 +52,32 @@ namespace _05_SpeedTest
.Build();
using (var connection = new TapetiConnection(config))
{
var subscriber = await connection.Subscribe(false);
await using var connection = new TapetiConnection(config);
var subscriber = await connection.Subscribe(false);
var publisher = dependencyResolver.Resolve<IPublisher>();
Console.WriteLine($"Publishing {MessageCount} messages...");
var publisher = dependencyResolver.Resolve<IPublisher>();
Console.WriteLine($"Publishing {MessageCount} messages...");
var stopwatch = new Stopwatch();
stopwatch.Start();
var stopwatch = new Stopwatch();
stopwatch.Start();
await PublishMessages(publisher);
await PublishMessages(publisher);
stopwatch.Stop();
Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
stopwatch.Stop();
Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
Console.WriteLine("Consuming messages...");
await subscriber.Resume();
Console.WriteLine("Consuming messages...");
await subscriber.Resume();
stopwatch.Restart();
stopwatch.Restart();
await waitForDone();
await waitForDone();
stopwatch.Stop();
Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
}
stopwatch.Stop();
Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
}

View File

@ -15,9 +15,11 @@ namespace _05_SpeedTest
}
#pragma warning disable IDE0060 // Remove unused parameter
public void HandleSpeedTestMessage(SpeedTestMessage message)
{
messageCounter.Add();
}
#pragma warning restore IDE0060
}
}

View File

@ -2,12 +2,13 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_06_StatelessRequestResponse</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="5.3.0" />
<PackageReference Include="SimpleInjector" Version="5.4.1" />
</ItemGroup>
<ItemGroup>

View File

@ -12,7 +12,7 @@ namespace _06_StatelessRequestResponse
{
public class Program
{
public static void Main(string[] args)
public static void Main()
{
var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container);
@ -32,20 +32,18 @@ namespace _06_StatelessRequestResponse
.Build();
using (var connection = new TapetiConnection(config))
{
await connection.Subscribe();
await using var connection = new TapetiConnection(config);
await connection.Subscribe();
var publisher = dependencyResolver.Resolve<IPublisher>();
await publisher.PublishRequest<ExampleMessageController, QuoteRequestMessage, QuoteResponseMessage>(
new QuoteRequestMessage
{
Amount = 1
},
c => c.HandleQuoteResponse);
var publisher = dependencyResolver.Resolve<IPublisher>();
await publisher.PublishRequest<ExampleMessageController, QuoteRequestMessage, QuoteResponseMessage>(
new QuoteRequestMessage
{
Amount = 1
},
c => c.HandleQuoteResponse);
await waitForDone();
}
await waitForDone();
}
}
}

View File

@ -8,26 +8,16 @@ namespace _06_StatelessRequestResponse
public class ReceivingMessageController
{
// No publisher required, responses can simply be returned
public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message)
public static QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message)
{
string quote;
switch (message.Amount)
var quote = message.Amount switch
{
case 1:
1 =>
// Well, they asked for it... :-)
quote = "'";
break;
case 2:
quote = "\"";
break;
default:
// We have to return a response.
quote = null;
break;
}
"'",
2 => "\"",
_ => null
};
return new QuoteResponseMessage
{

View File

@ -2,12 +2,13 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_07_ParallelizationTest</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="5.3.0" />
<PackageReference Include="SimpleInjector" Version="5.4.1" />
</ItemGroup>
<ItemGroup>

View File

@ -16,9 +16,11 @@ namespace _07_ParallelizationTest
}
#pragma warning disable IDE0060 // Remove unused parameter
public async Task HandleSpeedTestMessage(SpeedTestMessage message)
{
await messageParallelization.WaitForBatch();
}
#pragma warning restore IDE0060
}
}

View File

@ -105,9 +105,9 @@ namespace _07_ParallelizationTest
private readonly Func<bool> done;
private readonly Action<int> timeout;
private int count;
private readonly object waitLock = new object();
private TaskCompletionSource<bool> batchReachedTask = new TaskCompletionSource<bool>();
private Timer messageExpectedTimer;
private readonly object waitLock = new();
private TaskCompletionSource<bool> batchReachedTask = new();
private Timer? messageExpectedTimer;
private readonly TimeSpan messageExpectedTimeout = TimeSpan.FromMilliseconds(5000);
@ -124,7 +124,7 @@ namespace _07_ParallelizationTest
lock (waitLock)
{
if (messageExpectedTimer == null)
messageExpectedTimer = new Timer(state =>
messageExpectedTimer = new Timer(_ =>
{
timeout(count);
}, null, messageExpectedTimeout, Timeout.InfiniteTimeSpan);

View File

@ -2,13 +2,14 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>_08_MessageHandlerLogging</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
<PackageReference Include="SimpleInjector" Version="5.3.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageReference Include="SimpleInjector" Version="5.4.1" />
</ItemGroup>
<ItemGroup>

View File

@ -3,7 +3,6 @@ using System.Threading.Tasks;
using ExampleLib;
using Messaging.TapetiExample;
using Serilog;
using Serilog.Events;
using SimpleInjector;
using Tapeti;
using Tapeti.Serilog;

View File

@ -24,7 +24,7 @@ namespace ExampleLib
private readonly IDependencyContainer dependencyResolver;
private readonly int expectedDoneCount;
private int doneCount;
private readonly TaskCompletionSource<bool> doneSignal = new TaskCompletionSource<bool>();
private readonly TaskCompletionSource<bool> doneSignal = new();
/// <param name="dependencyResolver">Uses Tapeti's IDependencyContainer interface so you can easily switch an example to your favourite IoC container</param>
@ -79,7 +79,7 @@ namespace ExampleLib
{
while (true)
{
if (!(e is AggregateException aggregateException))
if (e is not AggregateException aggregateException)
return e;
if (aggregateException.InnerExceptions.Count != 1)

View File

@ -1,8 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>

View File

@ -6,7 +6,7 @@
<ItemGroup>
<PackageReference Include="System.ComponentModel.Annotations" Version="5.0.0" />
<PackageReference Include="Tapeti.Annotations" Version="3.0.0" />
<PackageReference Include="Tapeti.Annotations" Version="3.*-*" />
</ItemGroup>
</Project>

View File

@ -1,10 +1,12 @@
using System;
using System.Diagnostics.CodeAnalysis;
using Autofac;
using Autofac.Builder;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Autofac
{
/// <inheritdoc />
/// <summary>
/// Dependency resolver and container implementation for Autofac.
/// Since this class needs access to both the ContainerBuilder and the built IContainer,
@ -13,22 +15,21 @@ namespace Tapeti.Autofac
/// </summary>
public class AutofacDependencyResolver : IDependencyContainer
{
private ContainerBuilder containerBuilder;
private IContainer container;
private ContainerBuilder? containerBuilder;
private IContainer? container;
/// <summary>
/// The built container. Either set directly, or use the Build method to built the
/// The built container. Either set directly, or use the Build method to build the
/// update this reference.
/// </summary>
public IContainer Container
{
get => container;
get => container ?? throw new ArgumentNullException(nameof(container));
set
{
container = value;
if (value != null)
containerBuilder = null;
containerBuilder = null;
}
}
@ -49,7 +50,7 @@ namespace Tapeti.Autofac
CheckContainerBuilder();
Container = containerBuilder.Build(options);
return container;
return Container;
}
@ -83,7 +84,7 @@ namespace Tapeti.Autofac
{
CheckContainerBuilder();
containerBuilder
.Register(context => factory())
.Register(_ => factory())
.As<TService>()
.PreserveExistingDefaults();
}
@ -116,7 +117,7 @@ namespace Tapeti.Autofac
{
CheckContainerBuilder();
containerBuilder
.Register(context => factory())
.Register(_ => factory())
.As<TService>()
.SingleInstance()
.PreserveExistingDefaults();
@ -140,6 +141,7 @@ namespace Tapeti.Autofac
}
[MemberNotNull(nameof(containerBuilder))]
private void CheckContainerBuilder()
{
if (containerBuilder == null)

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Authors>Mark van Renswoude</Authors>
<Company />
@ -11,10 +11,12 @@
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<PackageIcon>Tapeti.SimpleInjector.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Autofac" Version="6.2.0" />
<PackageReference Include="Autofac" Version="6.*" />
</ItemGroup>
<ItemGroup>
@ -29,6 +31,6 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,5 @@
using BenchmarkDotNet.Running;
using Tapeti.Benchmarks.Tests;
BenchmarkRunner.Run<MethodInvokeBenchmarks>();
//new MethodInvokeBenchmarks().InvokeExpressionValueFactory();

View File

@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,112 @@
using System.Reflection;
using BenchmarkDotNet.Attributes;
using Tapeti.Helpers;
#pragma warning disable CA1822 // Mark members as static - required for Benchmark.NET
namespace Tapeti.Benchmarks.Tests
{
[MemoryDiagnoser]
public class MethodInvokeBenchmarks
{
private delegate bool MethodToInvokeDelegate(object obj);
private static readonly MethodInfo MethodToInvokeInfo;
private static readonly MethodToInvokeDelegate MethodToInvokeDelegateInstance;
private static readonly ExpressionInvoke MethodToInvokeExpression;
static MethodInvokeBenchmarks()
{
MethodToInvokeInfo = typeof(MethodInvokeBenchmarks).GetMethod(nameof(MethodToInvoke))!;
var inputInstance = new MethodInvokeBenchmarks();
MethodToInvokeDelegateInstance = i => ((MethodInvokeBenchmarks)i).MethodToInvoke(inputInstance.GetSomeObject(), inputInstance.GetCancellationToken());
MethodToInvokeExpression = MethodToInvokeInfo.CreateExpressionInvoke();
/*
Fun experiment, but a bit too tricky for me at the moment.
var dynamicMethodToInvoke = new DynamicMethod(
nameof(MethodToInvoke),
typeof(bool),
new[] { typeof(object) },
typeof(MethodInvokeBenchmarks).Module);
var generator = dynamicMethodToInvoke.GetILGenerator(256);
generator.Emit(OpCodes.Ldarg_0); // Load the first argument (the instance) onto the stack
generator.Emit(OpCodes.Castclass, typeof(MethodInvokeBenchmarks)); // Cast to the expected instance type
generator.Emit(OpCodes.Ldc_I4_S, 42); // Push the first argument onto the stack
generator.EmitCall(OpCodes.Callvirt, MethodToInvokeInfo, null); // Call the method
generator.Emit(OpCodes.Ret);
MethodToInvokeEmitted = dynamicMethodToInvoke.CreateDelegate<MethodToInvokeDelegate>();
*/
}
public bool MethodToInvoke(object someObject, CancellationToken cancellationToken)
{
return true;
}
// ReSharper disable MemberCanBeMadeStatic.Local
private object GetSomeObject()
{
return new object();
}
private CancellationToken GetCancellationToken()
{
return CancellationToken.None;
}
// ReSharper restore MemberCanBeMadeStatic.Local
// For comparison
[Benchmark]
public bool Direct()
{
return MethodToInvoke(GetSomeObject(), GetCancellationToken());
}
// For comparison as well, as we don't know the signature beforehand
[Benchmark]
public bool Delegate()
{
var instance = new MethodInvokeBenchmarks();
return MethodToInvokeDelegateInstance(instance);
}
[Benchmark]
public bool MethodInvoke()
{
var instance = new MethodInvokeBenchmarks();
return (bool)(MethodToInvokeInfo.Invoke(instance, BindingFlags.DoNotWrapExceptions, null, new[] { GetSomeObject(), GetCancellationToken() }, null) ?? false);
}
[Benchmark]
public bool InvokeExpression()
{
var instance = new MethodInvokeBenchmarks();
return (bool)MethodToInvokeExpression(instance, GetSomeObject(), GetCancellationToken());
}
//[Benchmark]
//public bool ReflectionEmit()
//{
// var instance = new MethodInvokeBenchmarks();
// return MethodToInvokeEmitted(instance);
//}
}
}

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Authors>Mark van Renswoude</Authors>
<Company />
@ -11,10 +11,12 @@
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<PackageIcon>Tapeti.SimpleInjector.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Castle.Windsor" Version="5.1.1" />
<PackageReference Include="Castle.Windsor" Version="5.*" />
</ItemGroup>
<ItemGroup>
@ -29,6 +31,6 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
</Project>

View File

@ -4,7 +4,6 @@ using Castle.Windsor;
namespace Tapeti.CastleWindsor
{
/// <inheritdoc />
/// <summary>
/// Dependency resolver and container implementation for Castle Windsor.
/// </summary>

View File

@ -3,7 +3,6 @@ using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
/// <inheritdoc />
/// <summary>
/// Provides the DataAnnotations validation middleware.
/// </summary>

View File

@ -5,19 +5,21 @@ using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
/// <inheritdoc />
/// <summary>
/// Validates consumed messages using System.ComponentModel.DataAnnotations
/// </summary>
internal class DataAnnotationsMessageMiddleware : IMessageMiddleware
{
/// <inheritdoc />
public async Task Handle(IMessageContext context, Func<Task> next)
public ValueTask Handle(IMessageContext context, Func<ValueTask> next)
{
if (context.Message == null)
return next();
var validationContext = new ValidationContext(context.Message);
Validator.ValidateObject(context.Message, validationContext, true);
await next();
return next();
}
}
}

View File

@ -5,19 +5,18 @@ using Tapeti.Config;
namespace Tapeti.DataAnnotations
{
/// <inheritdoc />
/// <summary>
/// Validates published messages using System.ComponentModel.DataAnnotations
/// </summary>
internal class DataAnnotationsPublishMiddleware : IPublishMiddleware
{
/// <inheritdoc />
public async Task Handle(IPublishContext context, Func<Task> next)
public ValueTask Handle(IPublishContext context, Func<ValueTask> next)
{
var validationContext = new ValidationContext(context.Message);
Validator.ValidateObject(context.Message, validationContext, true);
await next();
return next();
}
}
}

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Authors>Mark van Renswoude</Authors>
<Company />
@ -11,6 +11,8 @@
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<PackageIcon>Tapeti.DataAnnotations.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
@ -33,6 +35,6 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
</Project>

View File

@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Linq;
using Tapeti.Config;
// ReSharper disable UnusedMember.Global
@ -45,7 +46,7 @@ namespace Tapeti.Flow.SQL
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
{
return null;
return Enumerable.Empty<object>();
}
}
}

View File

@ -5,9 +5,12 @@ using System.Data.SqlClient;
using System.Threading.Tasks;
using Newtonsoft.Json;
// Neither of these are available in language version 7 required for .NET Standard 2.0
// ReSharper disable ConvertToUsingDeclaration
// ReSharper disable UseAwaitUsing
namespace Tapeti.Flow.SQL
{
/// <inheritdoc />
/// <summary>
/// IFlowRepository implementation for SQL server.
/// </summary>
@ -37,7 +40,7 @@ namespace Tapeti.Flow.SQL
/// <inheritdoc />
public async Task<IEnumerable<FlowRecord<T>>> GetStates<T>()
public async ValueTask<IEnumerable<FlowRecord<T>>> GetStates<T>()
{
return await SqlRetryHelper.Execute(async () =>
{
@ -55,7 +58,8 @@ namespace Tapeti.Flow.SQL
var stateJson = flowReader.GetString(2);
var state = JsonConvert.DeserializeObject<T>(stateJson);
result.Add(new FlowRecord<T>(flowID, creationTime, state));
if (state != null)
result.Add(new FlowRecord<T>(flowID, creationTime, state));
}
return result;
@ -64,7 +68,7 @@ namespace Tapeti.Flow.SQL
}
/// <inheritdoc />
public async Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
public async ValueTask CreateState<T>(Guid flowID, T state, DateTime timestamp)
{
await SqlRetryHelper.Execute(async () =>
{
@ -88,7 +92,7 @@ namespace Tapeti.Flow.SQL
}
/// <inheritdoc />
public async Task UpdateState<T>(Guid flowID, T state)
public async ValueTask UpdateState<T>(Guid flowID, T state)
{
await SqlRetryHelper.Execute(async () =>
{
@ -108,7 +112,7 @@ namespace Tapeti.Flow.SQL
}
/// <inheritdoc />
public async Task DeleteState(Guid flowID)
public async ValueTask DeleteState(Guid flowID)
{
await SqlRetryHelper.Execute(async () =>
{

View File

@ -13,7 +13,7 @@ namespace Tapeti.Flow.SQL
// 2627: Violation of %ls constraint '%.*ls'. Cannot insert duplicate key in object '%.*ls'. The duplicate key value is %ls.
public static bool IsDuplicateKey(SqlException e)
{
return e != null && (e.Number == 2601 || e.Number == 2627);
return e is { Number: 2601 or 2627 };
}
@ -21,12 +21,12 @@ namespace Tapeti.Flow.SQL
{
switch (e)
{
case TimeoutException _:
case TimeoutException:
return true;
case Exception exception:
case not null:
{
var sqlExceptions = ExtractSqlExceptions(exception);
var sqlExceptions = ExtractSqlExceptions(e);
return sqlExceptions.Select(UnwrapSqlErrors).Any(IsRecoverableSQLError);
}
@ -38,11 +38,13 @@ namespace Tapeti.Flow.SQL
/// <summary>
/// Extracts alls SqlExceptions from the main and inner or aggregate exceptions
/// </summary>
public static IEnumerable<SqlException> ExtractSqlExceptions(Exception e)
public static IEnumerable<SqlException> ExtractSqlExceptions(Exception exception)
{
while (e != null)
var exceptionHead = exception;
while (exceptionHead != null)
{
switch (e)
switch (exceptionHead)
{
case AggregateException aggregateException:
foreach (var innerException in aggregateException.InnerExceptions)
@ -56,7 +58,8 @@ namespace Tapeti.Flow.SQL
yield return sqlException;
break;
}
e = e.InnerException;
exceptionHead = exceptionHead.InnerException;
}
}
@ -66,12 +69,14 @@ namespace Tapeti.Flow.SQL
/// </summary>
public static IEnumerable<SqlError> UnwrapSqlErrors(SqlException exception)
{
while (exception != null)
var exceptionHead = exception;
while (exceptionHead != null)
{
foreach (SqlError error in exception.Errors)
foreach (SqlError error in exceptionHead.Errors)
yield return error;
exception = exception.InnerException as SqlException;
exceptionHead = exceptionHead.InnerException as SqlException;
}
}

View File

@ -54,7 +54,7 @@ namespace Tapeti.Flow.SQL
returnValue = await callback();
});
return returnValue;
return returnValue!;
}
}
}

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Authors>Mark van Renswoude</Authors>
<Company />
@ -11,12 +11,19 @@
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<PackageIcon>Tapeti.Flow.SQL.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702</NoWarn>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)'!='netstandard2.0'">
<!-- Suppress 'using statement can be simplified' which requires language version 8 not available in .NET Standard 2.0 -->
<NoWarn>IDE0063</NoWarn>
</PropertyGroup>
<ItemGroup>
<None Remove="scripts\Flow table.sql" />
</ItemGroup>
@ -26,7 +33,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="System.Data.SqlClient" Version="4.8.2" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.5" />
</ItemGroup>
<ItemGroup>
@ -42,6 +49,6 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
</Project>

View File

@ -2,7 +2,6 @@
namespace Tapeti.Flow.Annotations
{
/// <inheritdoc />
/// <summary>
/// Marks a message handler as a response message handler which continues a Tapeti Flow.
/// The method only receives direct messages, and does not create a routing key based binding to the queue.

View File

@ -3,7 +3,6 @@ using JetBrains.Annotations;
namespace Tapeti.Flow.Annotations
{
/// <inheritdoc />
/// <summary>
/// Marks this method as the start of a Tapeti Flow. Use IFlowStarter.Start to begin a new flow and
/// call this method. Must return an IYieldPoint.

View File

@ -1,5 +1,7 @@
using Tapeti.Config;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Flow
{
/// <summary>
@ -13,7 +15,7 @@ namespace Tapeti.Flow
/// <param name="config"></param>
/// <param name="flowRepository">An optional IFlowRepository implementation to persist flow state. If not provided, flow state will be lost when the application restarts.</param>
/// <returns></returns>
public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository flowRepository = null)
public static ITapetiConfigBuilder WithFlow(this ITapetiConfigBuilder config, IFlowRepository? flowRepository = null)
{
config.Use(new FlowExtension(flowRepository));
return config;

View File

@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
@ -9,6 +10,17 @@ using Tapeti.Helpers;
namespace Tapeti.Flow.Default
{
#if !NET7_0_OR_GREATER
#pragma warning disable CS1591
public class UnreachableException : Exception
{
public UnreachableException(string message) : base(message)
{
}
}
#pragma warning restore CS1591
#endif
internal class FlowBindingMiddleware : IControllerBindingMiddleware
{
public void Handle(IControllerBindingContext context, Action next)
@ -31,6 +43,9 @@ namespace Tapeti.Flow.Default
if (continuationAttribute == null)
return;
if (context.Method.IsStatic)
throw new ArgumentException($"Continuation attribute is not valid on static methods in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}");
context.SetBindingTargetMode(BindingTargetMode.Direct);
context.Use(new FlowContinuationMiddleware());
@ -43,16 +58,31 @@ namespace Tapeti.Flow.Default
{
context.Result.SetHandler(async (messageContext, value) =>
{
if (value == null)
throw new InvalidOperationException("Return value should be a Task, not null");
await (Task)value;
await HandleParallelResponse(messageContext);
});
}
if (context.Result.Info.ParameterType == typeof(ValueTask))
{
context.Result.SetHandler(async (messageContext, value) =>
{
if (value == null)
// ValueTask is a struct and should never be null
throw new UnreachableException("Return value should be a ValueTask, not null");
await (ValueTask)value;
await HandleParallelResponse(messageContext);
});
}
else if (context.Result.Info.ParameterType == typeof(void))
{
context.Result.SetHandler((messageContext, value) => HandleParallelResponse(messageContext));
context.Result.SetHandler((messageContext, _) => HandleParallelResponse(messageContext));
}
else
throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context. Method.DeclaringType?.FullName}, method {context.Method.Name}");
throw new ArgumentException($"Result type must be IYieldPoint, Task or void in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}");
foreach (var parameter in context.Parameters.Where(p => !p.HasBinding && p.Info.ParameterType == typeof(IFlowParallelRequest)))
@ -62,34 +92,64 @@ namespace Tapeti.Flow.Default
private static void RegisterYieldPointResult(IControllerBindingContext context)
{
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out var isTaskOf))
if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out var taskType))
return;
if (isTaskOf)
if (context.Method.IsStatic)
throw new ArgumentException($"Yield points are not valid on static methods in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}");
switch (taskType)
{
context.Result.SetHandler(async (messageContext, value) =>
{
var yieldPoint = await (Task<IYieldPoint>)value;
if (yieldPoint != null)
case TaskType.None:
context.Result.SetHandler((messageContext, value) =>
{
if (value == null)
throw new InvalidOperationException("Return value should be an IYieldPoint, not null");
return HandleYieldPoint(messageContext, (IYieldPoint)value);
});
break;
case TaskType.Task:
context.Result.SetHandler(async (messageContext, value) =>
{
if (value == null)
throw new InvalidOperationException("Return value should be a Task<IYieldPoint>, not null");
var yieldPoint = await (Task<IYieldPoint>)value;
await HandleYieldPoint(messageContext, yieldPoint);
});
});
break;
case TaskType.ValueTask:
context.Result.SetHandler(async (messageContext, value) =>
{
if (value == null)
// ValueTask is a struct and should never be null
throw new UnreachableException("Return value should be a ValueTask<IYieldPoint>, not null");
var yieldPoint = await (ValueTask<IYieldPoint>)value;
await HandleYieldPoint(messageContext, yieldPoint);
});
break;
default:
throw new ArgumentOutOfRangeException();
}
else
context.Result.SetHandler((messageContext, value) => HandleYieldPoint(messageContext, (IYieldPoint)value));
}
private static Task HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint)
private static ValueTask HandleYieldPoint(IMessageContext context, IYieldPoint yieldPoint)
{
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(new FlowHandlerContext(context), yieldPoint);
}
private static Task HandleParallelResponse(IMessageContext context)
private static ValueTask HandleParallelResponse(IMessageContext context)
{
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload) && flowPayload.FlowIsConverging)
return Task.CompletedTask;
return default;
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.Execute(new FlowHandlerContext(context), new DelegateYieldPoint(async flowContext =>
@ -110,7 +170,7 @@ namespace Tapeti.Flow.Default
}
private static object ParallelRequestParameterFactory(IMessageContext context)
private static object? ParallelRequestParameterFactory(IMessageContext context)
{
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
return flowHandler.GetParallelRequest(new FlowHandlerContext(context));

View File

@ -6,35 +6,57 @@ namespace Tapeti.Flow.Default
{
internal class FlowContext : IDisposable
{
public IFlowHandlerContext HandlerContext { get; set; }
public IFlowStateLock FlowStateLock { get; set; }
public FlowState FlowState { get; set; }
private readonly IFlowHandlerContext? handlerContext;
private IFlowStateLock? flowStateLock;
private FlowState? flowState;
public IFlowHandlerContext HandlerContext => handlerContext ?? throw new InvalidOperationException("FlowContext does not have a HandlerContext");
public IFlowStateLock FlowStateLock => flowStateLock ?? throw new InvalidOperationException("FlowContext does not have a FlowStateLock");
public FlowState FlowState => flowState ?? throw new InvalidOperationException("FlowContext does not have a FlowState");
public bool HasFlowStateAndLock => flowState != null && flowStateLock != null;
public Guid ContinuationID { get; set; }
public ContinuationMetadata ContinuationMetadata { get; set; }
public ContinuationMetadata? ContinuationMetadata { get; set; }
private int storeCalled;
private int deleteCalled;
public async Task Store(bool persistent)
public FlowContext(IFlowHandlerContext handlerContext, FlowState flowState, IFlowStateLock flowStateLock)
{
this.flowState = flowState;
this.flowStateLock = flowStateLock;
this.handlerContext = handlerContext;
}
public FlowContext(IFlowHandlerContext handlerContext)
{
this.handlerContext = handlerContext;
}
public void SetFlowState(FlowState newFlowState, IFlowStateLock newFlowStateLock)
{
flowState = newFlowState;
flowStateLock = newFlowStateLock;
}
public ValueTask Store(bool persistent)
{
storeCalled++;
if (HandlerContext == null) throw new ArgumentNullException(nameof(HandlerContext));
if (FlowState == null) throw new ArgumentNullException(nameof(FlowState));
if (FlowStateLock == null) throw new ArgumentNullException(nameof(FlowStateLock));
FlowState.Data = Newtonsoft.Json.JsonConvert.SerializeObject(HandlerContext.Controller);
await FlowStateLock.StoreFlowState(FlowState, persistent);
return FlowStateLock.StoreFlowState(FlowState, persistent);
}
public async Task Delete()
public ValueTask Delete()
{
deleteCalled++;
if (FlowStateLock != null)
await FlowStateLock.DeleteFlowState();
return flowStateLock?.DeleteFlowState() ?? default;
}
public bool IsStoredOrDeleted()
@ -45,7 +67,7 @@ namespace Tapeti.Flow.Default
public void EnsureStoreOrDeleteIsCalled()
{
if (!IsStoredOrDeleted())
throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + FlowStateLock?.FlowID);
throw new InvalidProgramException("Neither Store nor Delete are called for the state of the current flow. FlowID = " + flowStateLock?.FlowID);
Debug.Assert(storeCalled <= 1, "Store called more than once!");
Debug.Assert(deleteCalled <= 1, "Delete called more than once!");
@ -53,7 +75,7 @@ namespace Tapeti.Flow.Default
public void Dispose()
{
FlowStateLock?.Dispose();
flowStateLock?.Dispose();
}
}
}

View File

@ -1,5 +1,4 @@
using System;
using System.Reflection;
using System.Threading.Tasks;
using Tapeti.Config;
using Tapeti.Flow.FlowHelpers;
@ -12,7 +11,7 @@ namespace Tapeti.Flow.Default
/// </summary>
internal class FlowContinuationMiddleware : IControllerFilterMiddleware, IControllerMessageMiddleware, IControllerCleanupMiddleware
{
public async Task Filter(IMessageContext context, Func<Task> next)
public async ValueTask Filter(IMessageContext context, Func<ValueTask> next)
{
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return;
@ -28,15 +27,19 @@ namespace Tapeti.Flow.Default
}
public async Task Handle(IMessageContext context, Func<Task> next)
public async ValueTask Handle(IMessageContext context, Func<ValueTask> next)
{
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return;
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload))
{
if (controllerPayload.Controller == null)
throw new InvalidOperationException("Controller is not available (method is static?)");
var flowContext = flowPayload.FlowContext;
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, controllerPayload.Controller);
if (!string.IsNullOrEmpty(flowContext.FlowState.Data))
Newtonsoft.Json.JsonConvert.PopulateObject(flowContext.FlowState.Data, controllerPayload.Controller);
// Remove Continuation now because the IYieldPoint result handler will store the new state
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
@ -54,7 +57,7 @@ namespace Tapeti.Flow.Default
}
public async Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next)
public async ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<ValueTask> next)
{
await next();
@ -66,11 +69,11 @@ namespace Tapeti.Flow.Default
var flowContext = flowPayload.FlowContext;
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
if (flowContext.ContinuationMetadata == null || flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
// Do not call when the controller method was filtered, if the same message has two methods
return;
if (flowContext.FlowStateLock != null)
if (flowContext.HasFlowStateAndLock)
{
if (!flowContext.IsStoredOrDeleted())
// The exception strategy can set the consume result to Success. Instead, check if the yield point
@ -83,7 +86,7 @@ namespace Tapeti.Flow.Default
private static async Task<FlowContext> EnrichWithFlowContext(IMessageContext context)
private static async ValueTask<FlowContext?> EnrichWithFlowContext(IMessageContext context)
{
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload))
return flowPayload.FlowContext;
@ -107,13 +110,8 @@ namespace Tapeti.Flow.Default
if (flowState == null)
return null;
var flowContext = new FlowContext
var flowContext = new FlowContext(new FlowHandlerContext(context), flowState, flowStateLock)
{
HandlerContext = new FlowHandlerContext(context),
FlowStateLock = flowStateLock,
FlowState = flowState,
ContinuationID = continuationID,
ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out var continuation) ? continuation : null
};

View File

@ -3,7 +3,6 @@ using Tapeti.Config;
namespace Tapeti.Flow.Default
{
/// <inheritdoc />
/// <summary>
/// Default implementation for IFlowHandlerContext
/// </summary>
@ -11,8 +10,11 @@ namespace Tapeti.Flow.Default
{
/// <summary>
/// </summary>
public FlowHandlerContext()
public FlowHandlerContext(ITapetiConfig config, object? controller, MethodInfo method)
{
Config = config;
Controller = controller;
Method = method;
}
@ -20,11 +22,7 @@ namespace Tapeti.Flow.Default
/// </summary>
public FlowHandlerContext(IMessageContext source)
{
if (source == null)
return;
if (!source.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return;
var controllerPayload = source.Get<ControllerMessageContextPayload>();
Config = source.Config;
Controller = controllerPayload.Controller;
@ -39,15 +37,15 @@ namespace Tapeti.Flow.Default
}
/// <inheritdoc />
public ITapetiConfig Config { get; set; }
public ITapetiConfig Config { get; }
/// <inheritdoc />
public object Controller { get; set; }
public object? Controller { get; }
/// <inheritdoc />
public MethodInfo Method { get; set; }
public MethodInfo Method { get; }
/// <inheritdoc />
public IMessageContext MessageContext { get; set; }
public IMessageContext? MessageContext { get; }
}
}

View File

@ -32,14 +32,21 @@ namespace Tapeti.Flow.Default
/// <inheritdoc />
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler)
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
/// <inheritdoc />
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler)
public IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
}
/// <inheritdoc />
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
@ -52,7 +59,7 @@ namespace Tapeti.Flow.Default
}
/// <inheritdoc />
public IYieldPoint EndWithResponse<TResponse>(TResponse message)
public IYieldPoint EndWithResponse<TResponse>(TResponse message) where TResponse : class
{
return new DelegateYieldPoint(context => SendResponse(context, message));
}
@ -65,9 +72,9 @@ namespace Tapeti.Flow.Default
internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true)
string? convergeMethodName = null, bool convergeMethodTaskSync = false, bool store = true)
{
if (context.FlowState == null)
if (!context.HasFlowStateAndLock)
{
await CreateNewFlowState(context);
Debug.Assert(context.FlowState != null, "context.FlowState != null");
@ -98,9 +105,9 @@ namespace Tapeti.Flow.Default
private async Task SendResponse(FlowContext context, object message)
{
var reply = context.FlowState == null
? GetReply(context.HandlerContext)
: context.FlowState.Metadata.Reply;
var reply = context.HasFlowStateAndLock
? context.FlowState.Metadata.Reply
: GetReply(context.HandlerContext);
if (reply == null)
throw new YieldPointException("No response is required");
@ -127,7 +134,7 @@ namespace Tapeti.Flow.Default
{
await context.Delete();
if (context.FlowState?.Metadata.Reply != null)
if (context.HasFlowStateAndLock && context.FlowState.Metadata.Reply != null)
throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}");
}
@ -152,16 +159,15 @@ namespace Tapeti.Flow.Default
if (binding.QueueName == null)
throw new ArgumentException("responseHandler is not yet subscribed to a queue, TapetiConnection.Subscribe must be called before starting a flow", nameof(responseHandler));
return new ResponseHandlerInfo
{
MethodName = MethodSerializer.Serialize(responseHandler.Method),
ReplyToQueue = binding.QueueName,
IsDurableQueue = binding.QueueType == QueueType.Durable
};
return new ResponseHandlerInfo(
MethodSerializer.Serialize(responseHandler.Method),
binding.QueueName,
binding.QueueType == QueueType.Durable
);
}
private static ReplyMetadata GetReply(IFlowHandlerContext context)
private static ReplyMetadata? GetReply(IFlowHandlerContext context)
{
var requestAttribute = context.MessageContext?.Message?.GetType().GetCustomAttribute<RequestAttribute>();
if (requestAttribute?.Response == null)
@ -169,7 +175,7 @@ namespace Tapeti.Flow.Default
return new ReplyMetadata
{
CorrelationId = context.MessageContext.Properties.CorrelationId,
CorrelationId = context.MessageContext!.Properties.CorrelationId,
ReplyTo = context.MessageContext.Properties.ReplyTo,
ResponseTypeName = requestAttribute.Response.FullName,
Mandatory = context.MessageContext.Properties.Persistent.GetValueOrDefault(true)
@ -181,39 +187,34 @@ namespace Tapeti.Flow.Default
var flowStore = flowContext.HandlerContext.Config.DependencyResolver.Resolve<IFlowStore>();
var flowID = Guid.NewGuid();
flowContext.FlowStateLock = await flowStore.LockFlowState(flowID);
var flowStateLock = await flowStore.LockFlowState(flowID);
if (flowContext.FlowStateLock == null)
if (flowStateLock == null)
throw new InvalidOperationException("Unable to lock a new flow");
flowContext.FlowState = new FlowState
var flowState = new FlowState
{
Metadata = new FlowMetadata
{
Reply = GetReply(flowContext.HandlerContext)
}
Metadata = new FlowMetadata(GetReply(flowContext.HandlerContext))
};
flowContext.SetFlowState(flowState, flowStateLock);
}
/// <inheritdoc />
public async Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint)
public async ValueTask Execute(IFlowHandlerContext context, IYieldPoint yieldPoint)
{
if (!(yieldPoint is DelegateYieldPoint executableYieldPoint))
throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Method.Name}");
if (yieldPoint is not DelegateYieldPoint executableYieldPoint)
throw new YieldPointException($"Yield point is required in controller {context.Controller?.GetType().Name} for method {context.Method.Name}");
FlowContext flowContext = null;
FlowContext? flowContext = null;
var disposeFlowContext = false;
try
{
var messageContext = context.MessageContext;
if (messageContext == null || !messageContext.TryGet<FlowMessageContextPayload>(out var flowPayload))
if (context.MessageContext == null || !context.MessageContext.TryGet<FlowMessageContextPayload>(out var flowPayload))
{
flowContext = new FlowContext
{
HandlerContext = context
};
flowContext = new FlowContext(context);
// If we ended up here it is because of a Start. No point in storing the new FlowContext
// in the messageContext as the yield point is the last to execute.
@ -229,7 +230,7 @@ namespace Tapeti.Flow.Default
catch (YieldPointException e)
{
// Useful for debugging
e.Data["Tapeti.Controller.Name"] = context.Controller.GetType().FullName;
e.Data["Tapeti.Controller.Name"] = context.Controller?.GetType().FullName;
e.Data["Tapeti.Controller.Method"] = context.Method.Name;
throw;
}
@ -239,44 +240,66 @@ namespace Tapeti.Flow.Default
finally
{
if (disposeFlowContext)
flowContext.Dispose();
flowContext?.Dispose();
}
}
/// <inheritdoc />
public IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context)
public IFlowParallelRequest? GetParallelRequest(IFlowHandlerContext context)
{
return context.MessageContext.TryGet<FlowMessageContextPayload>(out var flowPayload)
return context.MessageContext != null && context.MessageContext.TryGet<FlowMessageContextPayload>(out var flowPayload)
? new ParallelRequest(config, this, flowPayload.FlowContext)
: null;
}
/// <inheritdoc />
public Task Converge(IFlowHandlerContext context)
public ValueTask Converge(IFlowHandlerContext context)
{
return Execute(context, new DelegateYieldPoint(flowContext =>
Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync)));
return Execute(context, new DelegateYieldPoint(async flowContext =>
{
if (flowContext.ContinuationMetadata == null)
throw new InvalidOperationException("Missing ContinuationMetadata in FlowContext");
if (flowContext.ContinuationMetadata.ConvergeMethodName == null)
throw new InvalidOperationException("Missing ConvergeMethodName in FlowContext ContinuationMetadata");
await Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync);
}));
}
internal async Task Converge(FlowContext flowContext, string convergeMethodName, bool convergeMethodSync)
{
IYieldPoint yieldPoint;
IYieldPoint? yieldPoint;
if (flowContext.HandlerContext == null)
throw new InvalidOperationException($"Missing HandleContext in FlowContext for converge method {convergeMethodName}");
if (flowContext.HandlerContext.MessageContext == null)
throw new InvalidOperationException($"Missing MessageContext in FlowContext for converge method {convergeMethodName}");
if (!flowContext.HandlerContext.MessageContext.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
throw new ArgumentException("Context does not contain a controller payload", nameof(flowContext));
if (controllerPayload.Controller == null)
throw new InvalidOperationException($"Controller is not available for converge method {convergeMethodName} (method is static?)");
var method = controllerPayload.Controller.GetType().GetMethod(convergeMethodName, BindingFlags.NonPublic | BindingFlags.Instance);
if (method == null)
throw new ArgumentException($"Unknown converge method in controller {controllerPayload.Controller.GetType().Name}: {convergeMethodName}");
if (convergeMethodSync)
yieldPoint = (IYieldPoint)method.Invoke(controllerPayload.Controller, new object[] { });
yieldPoint = (IYieldPoint?)method.Invoke(controllerPayload.Controller, new object[] { });
else
yieldPoint = await(Task<IYieldPoint>)method.Invoke(controllerPayload.Controller, new object[] { });
{
var yieldPointTask = method.Invoke(controllerPayload.Controller, new object[] { });
if (yieldPointTask == null)
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}");
yieldPoint = await (Task<IYieldPoint>)yieldPointTask;
}
if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}");
@ -290,14 +313,21 @@ namespace Tapeti.Flow.Default
{
private class RequestInfo
{
public object Message { get; set; }
public ResponseHandlerInfo ResponseHandlerInfo { get; set; }
public object Message { get; }
public ResponseHandlerInfo ResponseHandlerInfo { get; }
public RequestInfo(object message, ResponseHandlerInfo responseHandlerInfo)
{
Message = message;
ResponseHandlerInfo = responseHandlerInfo;
}
}
private readonly ITapetiConfig config;
private readonly FlowProvider flowProvider;
private readonly List<RequestInfo> requests = new List<RequestInfo>();
private readonly List<RequestInfo> requests = new();
public ParallelRequestBuilder(ITapetiConfig config, FlowProvider flowProvider)
@ -307,32 +337,40 @@ namespace Tapeti.Flow.Default
}
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler)
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, ValueTask> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse, IFlowParallelRequest> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler)
private IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler)
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler)
{
return InternalAddRequest(message, responseHandler);
}
public IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler)
{
requests.Add(new RequestInfo
{
Message = message,
ResponseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler)
});
requests.Add(new RequestInfo(message, GetResponseHandlerInfo(config, message, responseHandler)));
return this;
}
@ -342,7 +380,6 @@ namespace Tapeti.Flow.Default
return BuildYieldPoint(continuation, false, noRequestsBehaviour);
}
public IYieldPoint YieldSync(Func<IYieldPoint> continuation, FlowNoRequestsBehaviour noRequestsBehaviour = FlowNoRequestsBehaviour.Exception)
{
return BuildYieldPoint(continuation, true, noRequestsBehaviour);
@ -353,29 +390,21 @@ namespace Tapeti.Flow.Default
{
if (requests.Count == 0)
{
switch (noRequestsBehaviour)
return noRequestsBehaviour switch
{
case FlowNoRequestsBehaviour.Exception:
throw new YieldPointException("At least one request must be added before yielding a parallel request");
case FlowNoRequestsBehaviour.Converge:
return new DelegateYieldPoint(context =>
flowProvider.Converge(context, convergeMethod.Method.Name, convergeMethodSync));
case FlowNoRequestsBehaviour.EndFlow:
return new DelegateYieldPoint(EndFlow);
default:
throw new ArgumentOutOfRangeException(nameof(noRequestsBehaviour), noRequestsBehaviour, null);
}
FlowNoRequestsBehaviour.Exception => throw new YieldPointException("At least one request must be added before yielding a parallel request"),
FlowNoRequestsBehaviour.Converge => new DelegateYieldPoint(context => flowProvider.Converge(context, convergeMethod.Method.Name, convergeMethodSync)),
FlowNoRequestsBehaviour.EndFlow => new DelegateYieldPoint(EndFlow),
_ => throw new ArgumentOutOfRangeException(nameof(noRequestsBehaviour), noRequestsBehaviour, null)
};
}
if (convergeMethod?.Method == null)
if (convergeMethod.Method == null)
throw new ArgumentNullException(nameof(convergeMethod));
return new DelegateYieldPoint(async context =>
{
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller.GetType())
if (convergeMethod.Method.DeclaringType != context.HandlerContext.Controller?.GetType())
throw new YieldPointException("Converge method must be in the same controller class");
await Task.WhenAll(requests.Select(requestInfo =>
@ -408,19 +437,19 @@ namespace Tapeti.Flow.Default
}
public Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler)
public Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler)
public Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
public Task AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler)
public Task AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler) where TRequest : class where TResponse : class
{
return InternalAddRequest(message, responseHandler);
}
@ -430,6 +459,9 @@ namespace Tapeti.Flow.Default
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
if (flowContext.ContinuationMetadata == null)
throw new InvalidOperationException("No ContinuationMetadata in FlowContext");
return flowProvider.SendRequest(
flowContext,
message,
@ -443,9 +475,17 @@ namespace Tapeti.Flow.Default
internal class ResponseHandlerInfo
{
public string MethodName { get; set; }
public string ReplyToQueue { get; set; }
public bool IsDurableQueue { get; set; }
public string MethodName { get; }
public string ReplyToQueue { get; }
public bool IsDurableQueue { get; }
public ResponseHandlerInfo(string methodName, string replyToQueue, bool isDurableQueue)
{
MethodName = methodName;
ReplyToQueue = replyToQueue;
IsDurableQueue = isDurableQueue;
}
}
}
}

View File

@ -6,7 +6,6 @@ using Tapeti.Config;
namespace Tapeti.Flow.Default
{
/// <inheritdoc />
/// <summary>
/// Default implementation for IFlowStarter.
/// </summary>
@ -26,39 +25,38 @@ namespace Tapeti.Flow.Default
/// <inheritdoc />
public async Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class
{
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] { });
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), Array.Empty<object?>());
}
/// <inheritdoc />
public async Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class
{
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, Array.Empty<object?>());
}
/// <inheritdoc />
public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class
{
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object[] {parameter});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object?[] {parameter});
}
/// <inheritdoc />
public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class
{
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object[] {parameter});
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object?[] {parameter});
}
private async Task CallControllerMethod<TController>(MethodInfo method, Func<object, Task<IYieldPoint>> getYieldPointResult, object[] parameters) where TController : class
private async Task CallControllerMethod<TController>(MethodInfo method, Func<object, Task<IYieldPoint>> getYieldPointResult, object?[] parameters) where TController : class
{
var controller = config.DependencyResolver.Resolve<TController>();
var yieldPoint = await getYieldPointResult(method.Invoke(controller, parameters));
var result = method.Invoke(controller, parameters);
if (result == null)
throw new InvalidOperationException($"Method {method.Name} must return an IYieldPoint or Task<IYieldPoint>, got null");
var context = new FlowHandlerContext
{
Config = config,
Controller = controller,
Method = method
};
var yieldPoint = await getYieldPointResult(result);
var context = new FlowHandlerContext(config, controller, method);
var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(context, yieldPoint);

View File

@ -9,8 +9,8 @@ namespace Tapeti.Flow.Default
/// </summary>
public class FlowState
{
private FlowMetadata metadata;
private Dictionary<Guid, ContinuationMetadata> continuations;
private FlowMetadata? metadata;
private Dictionary<Guid, ContinuationMetadata>? continuations;
/// <summary>
@ -18,7 +18,7 @@ namespace Tapeti.Flow.Default
/// </summary>
public FlowMetadata Metadata
{
get => metadata ?? (metadata = new FlowMetadata());
get => metadata ??= new FlowMetadata(null);
set => metadata = value;
}
@ -26,7 +26,7 @@ namespace Tapeti.Flow.Default
/// <summary>
/// Contains the serialized state which is restored when a flow continues.
/// </summary>
public string Data { get; set; }
public string? Data { get; set; }
/// <summary>
@ -34,7 +34,7 @@ namespace Tapeti.Flow.Default
/// </summary>
public Dictionary<Guid, ContinuationMetadata> Continuations
{
get => continuations ?? (continuations = new Dictionary<Guid, ContinuationMetadata>());
get => continuations ??= new Dictionary<Guid, ContinuationMetadata>();
set => continuations = value;
}
@ -45,7 +45,7 @@ namespace Tapeti.Flow.Default
public FlowState Clone()
{
return new FlowState {
metadata = metadata.Clone(),
metadata = metadata?.Clone(),
Data = Data,
continuations = continuations?.ToDictionary(kv => kv.Key, kv => kv.Value.Clone())
};
@ -61,18 +61,22 @@ namespace Tapeti.Flow.Default
/// <summary>
/// Contains information about the expected response for this flow.
/// </summary>
public ReplyMetadata Reply { get; set; }
public ReplyMetadata? Reply { get; }
/// <inheritdoc cref="FlowMetadata"/>
public FlowMetadata(ReplyMetadata? reply)
{
Reply = reply;
}
/// <summary>
/// Creates a deep clone of this FlowMetadata.
/// </summary>
public FlowMetadata Clone()
{
return new FlowMetadata
{
Reply = Reply?.Clone()
};
return new FlowMetadata(Reply);
}
}
@ -85,17 +89,17 @@ namespace Tapeti.Flow.Default
/// <summary>
/// The queue to which the response should be sent.
/// </summary>
public string ReplyTo { get; set; }
public string? ReplyTo { get; set; }
/// <summary>
/// The correlation ID included in the original request.
/// </summary>
public string CorrelationId { get; set; }
public string? CorrelationId { get; set; }
/// <summary>
/// The expected response message class.
/// </summary>
public string ResponseTypeName { get; set; }
public string? ResponseTypeName { get; set; }
/// <summary>
/// Indicates whether the response should be sent a mandatory.
@ -128,12 +132,12 @@ namespace Tapeti.Flow.Default
/// <summary>
/// The name of the method which will handle the response.
/// </summary>
public string MethodName { get; set; }
public string? MethodName { get; set; }
/// <summary>
/// The name of the method which is called when all responses have been processed.
/// </summary>
public string ConvergeMethodName { get; set; }
public string? ConvergeMethodName { get; set; }
/// <summary>
/// Determines if the converge method is synchronous or asynchronous.

View File

@ -9,7 +9,6 @@ using Tapeti.Flow.FlowHelpers;
namespace Tapeti.Flow.Default
{
/// <inheritdoc />
/// <summary>
/// Default implementation of IFlowStore.
/// </summary>
@ -17,11 +16,11 @@ namespace Tapeti.Flow.Default
{
private class CachedFlowState
{
public readonly FlowState FlowState;
public readonly FlowState? FlowState;
public readonly DateTime CreationTime;
public readonly bool IsPersistent;
public CachedFlowState(FlowState flowState, DateTime creationTime, bool isPersistent)
public CachedFlowState(FlowState? flowState, DateTime creationTime, bool isPersistent)
{
FlowState = flowState;
CreationTime = creationTime;
@ -29,10 +28,10 @@ namespace Tapeti.Flow.Default
}
}
private readonly ConcurrentDictionary<Guid, CachedFlowState> flowStates = new ConcurrentDictionary<Guid, CachedFlowState>();
private readonly ConcurrentDictionary<Guid, Guid> continuationLookup = new ConcurrentDictionary<Guid, Guid>();
private readonly LockCollection<Guid> locks = new LockCollection<Guid>(EqualityComparer<Guid>.Default);
private HashSet<string> validatedMethods;
private readonly ConcurrentDictionary<Guid, CachedFlowState> flowStates = new();
private readonly ConcurrentDictionary<Guid, Guid> continuationLookup = new();
private readonly LockCollection<Guid> locks = new(EqualityComparer<Guid>.Default);
private HashSet<string>? validatedMethods;
private readonly IFlowRepository repository;
private readonly ITapetiConfig config;
@ -51,7 +50,7 @@ namespace Tapeti.Flow.Default
/// <inheritdoc />
public async Task Load()
public async ValueTask Load()
{
if (inUse)
throw new InvalidOperationException("Can only load the saved state once.");
@ -86,10 +85,13 @@ namespace Tapeti.Flow.Default
private void ValidateContinuation(Guid flowId, Guid continuationId, ContinuationMetadata metadata)
{
if (string.IsNullOrEmpty(metadata.MethodName))
return;
// We could check all the things that are required for a continuation or converge method, but this should suffice
// for the common scenario where you change code without realizing that it's signature has been persisted
// ReSharper disable once InvertIf
if (validatedMethods.Add(metadata.MethodName))
if (validatedMethods!.Add(metadata.MethodName))
{
var methodInfo = MethodSerializer.Deserialize(metadata.MethodName);
if (methodInfo == null)
@ -114,17 +116,17 @@ namespace Tapeti.Flow.Default
/// <inheritdoc />
public Task<Guid?> FindFlowID(Guid continuationID)
public ValueTask<Guid?> FindFlowID(Guid continuationID)
{
if (!loaded)
throw new InvalidOperationException("Flow store is not yet loaded.");
return Task.FromResult(continuationLookup.TryGetValue(continuationID, out var result) ? result : (Guid?)null);
return new ValueTask<Guid?>(continuationLookup.TryGetValue(continuationID, out var result) ? result : null);
}
/// <inheritdoc />
public async Task<IFlowStateLock> LockFlowState(Guid flowID)
public async ValueTask<IFlowStateLock> LockFlowState(Guid flowID)
{
if (!loaded)
throw new InvalidOperationException("Flow store should be loaded before storing flows.");
@ -137,22 +139,22 @@ namespace Tapeti.Flow.Default
/// <inheritdoc />
public Task<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge)
public ValueTask<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge)
{
var maximumDateTime = DateTime.UtcNow - minimumAge;
return Task.FromResult(flowStates
return new ValueTask<IEnumerable<ActiveFlow>>(flowStates
.Where(p => p.Value.CreationTime <= maximumDateTime)
.Select(p => new ActiveFlow(p.Key, p.Value.CreationTime))
.ToArray() as IEnumerable<ActiveFlow>);
.ToArray());
}
private class FlowStateLock : IFlowStateLock
{
private readonly FlowStore owner;
private volatile IDisposable flowLock;
private CachedFlowState cachedFlowState;
private volatile IDisposable? flowLock;
private CachedFlowState? cachedFlowState;
public Guid FlowID { get; }
@ -173,15 +175,15 @@ namespace Tapeti.Flow.Default
l?.Dispose();
}
public Task<FlowState> GetFlowState()
public ValueTask<FlowState?> GetFlowState()
{
if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock");
return Task.FromResult(cachedFlowState?.FlowState?.Clone());
return new ValueTask<FlowState?>(cachedFlowState?.FlowState?.Clone());
}
public async Task StoreFlowState(FlowState newFlowState, bool persistent)
public async ValueTask StoreFlowState(FlowState newFlowState, bool persistent)
{
if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock");
@ -190,13 +192,13 @@ namespace Tapeti.Flow.Default
newFlowState = newFlowState.Clone();
// Update the lookup dictionary for the ContinuationIDs
if (cachedFlowState != null)
if (cachedFlowState?.FlowState != null)
{
foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k)))
owner.continuationLookup.TryRemove(removedContinuation, out _);
}
foreach (var addedContinuation in newFlowState.Continuations.Where(c => cachedFlowState == null || !cachedFlowState.FlowState.Continuations.ContainsKey(c.Key)))
foreach (var addedContinuation in newFlowState.Continuations.Where(c => cachedFlowState?.FlowState == null || !cachedFlowState.FlowState.Continuations.ContainsKey(c.Key)))
{
owner.continuationLookup.TryAdd(addedContinuation.Key, FlowID);
}
@ -204,7 +206,7 @@ namespace Tapeti.Flow.Default
var isNew = cachedFlowState == null;
var wasPersistent = cachedFlowState?.IsPersistent ?? false;
cachedFlowState = new CachedFlowState(newFlowState, isNew ? DateTime.UtcNow : cachedFlowState.CreationTime, persistent);
cachedFlowState = new CachedFlowState(newFlowState, isNew ? DateTime.UtcNow : cachedFlowState!.CreationTime, persistent);
owner.flowStates[FlowID] = cachedFlowState;
if (persistent)
@ -227,12 +229,12 @@ namespace Tapeti.Flow.Default
}
}
public async Task DeleteFlowState()
public async ValueTask DeleteFlowState()
{
if (flowLock == null)
throw new ObjectDisposedException("FlowStateLock");
if (cachedFlowState != null)
if (cachedFlowState?.FlowState != null)
{
foreach (var removedContinuation in cachedFlowState.FlowState.Continuations.Keys)
owner.continuationLookup.TryRemove(removedContinuation, out _);
@ -240,7 +242,7 @@ namespace Tapeti.Flow.Default
owner.flowStates.TryRemove(FlowID, out var removedFlowState);
cachedFlowState = null;
if (removedFlowState.IsPersistent)
if (removedFlowState is { IsPersistent: true })
await owner.repository.DeleteState(FlowID);
}
}

View File

@ -5,33 +5,32 @@ using System.Threading.Tasks;
namespace Tapeti.Flow.Default
{
/// <inheritdoc />
/// <summary>
/// Default implementation for IFlowRepository. Does not persist any state, relying on the FlowStore's cache instead.
/// </summary>
public class NonPersistentFlowRepository : IFlowRepository
{
Task<IEnumerable<FlowRecord<T>>> IFlowRepository.GetStates<T>()
ValueTask<IEnumerable<FlowRecord<T>>> IFlowRepository.GetStates<T>()
{
return Task.FromResult(Enumerable.Empty<FlowRecord<T>>());
return new ValueTask<IEnumerable<FlowRecord<T>>>(Enumerable.Empty<FlowRecord<T>>());
}
/// <inheritdoc />
public Task CreateState<T>(Guid flowID, T state, DateTime timestamp)
public ValueTask CreateState<T>(Guid flowID, T state, DateTime timestamp)
{
return Task.CompletedTask;
return default;
}
/// <inheritdoc />
public Task UpdateState<T>(Guid flowID, T state)
public ValueTask UpdateState<T>(Guid flowID, T state)
{
return Task.CompletedTask;
return default;
}
/// <inheritdoc />
public Task DeleteState(Guid flowID)
public ValueTask DeleteState(Guid flowID)
{
return Task.CompletedTask;
return default;
}
}
}

View File

@ -4,17 +4,16 @@ using Tapeti.Flow.Default;
namespace Tapeti.Flow
{
/// <inheritdoc />
/// <summary>
/// Provides the Flow middleware.
/// </summary>
public class FlowExtension : ITapetiExtension
{
private readonly IFlowRepository flowRepository;
private readonly IFlowRepository? flowRepository;
/// <summary>
/// </summary>
public FlowExtension(IFlowRepository flowRepository)
public FlowExtension(IFlowRepository? flowRepository)
{
this.flowRepository = flowRepository;
}

View File

@ -7,7 +7,7 @@ namespace Tapeti.Flow.FlowHelpers
/// <summary>
/// Implementation of an asynchronous locking mechanism.
/// </summary>
public class LockCollection<T>
public class LockCollection<T> where T : notnull
{
private readonly Dictionary<T, LockItem> locks;
@ -57,10 +57,10 @@ namespace Tapeti.Flow.FlowHelpers
private class LockItem : IDisposable
{
internal volatile LockItem Next;
internal volatile LockItem? Next;
private readonly Dictionary<T, LockItem> locks;
private readonly TaskCompletionSource<IDisposable> tcs = new TaskCompletionSource<IDisposable>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource<IDisposable> tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly T key;
public LockItem(Dictionary<T, LockItem> locks, T key)

View File

@ -18,14 +18,14 @@ namespace Tapeti.Flow.FlowHelpers
}
private static readonly Regex DeserializeRegex = new Regex("^(?<method>.+?)@(?<assembly>.+?):(?<type>.+?)$");
private static readonly Regex DeserializeRegex = new("^(?<method>.+?)@(?<assembly>.+?):(?<type>.+?)$");
/// <summary>
/// Deserializes the serialized method representation back into it's MethodInfo, or null if not found.
/// </summary>
/// <param name="serializedMethod"></param>
public static MethodInfo Deserialize(string serializedMethod)
public static MethodInfo? Deserialize(string serializedMethod)
{
var match = DeserializeRegex.Match(serializedMethod);
if (!match.Success)
@ -35,8 +35,6 @@ namespace Tapeti.Flow.FlowHelpers
try
{
assembly = Assembly.Load(match.Groups["assembly"].Value);
if (assembly == null)
return null;
}
catch
{

View File

@ -16,9 +16,8 @@ namespace Tapeti.Flow
/// parallel flow is done and the convergeMethod will be called.
/// Temporarily disables storing the flow state.
/// </summary>
public bool FlowIsConverging => FlowContext != null &&
FlowContext.FlowState.Continuations.Count == 0 &&
FlowContext.ContinuationMetadata.ConvergeMethodName != null;
public bool FlowIsConverging => FlowContext.FlowState.Continuations.Count == 0 &&
FlowContext.ContinuationMetadata?.ConvergeMethodName != null;
public FlowMessageContextPayload(FlowContext flowContext)
@ -29,7 +28,7 @@ namespace Tapeti.Flow
public void Dispose()
{
FlowContext?.Dispose();
FlowContext.Dispose();
}
}
}

View File

@ -4,7 +4,6 @@ using Tapeti.Config;
namespace Tapeti.Flow
{
/// <inheritdoc />
/// <summary>
/// Provides information about the handler for the flow.
/// </summary>
@ -19,7 +18,7 @@ namespace Tapeti.Flow
/// <summary>
/// An instance of the controller which starts or continues the flow.
/// </summary>
object Controller { get; }
object? Controller { get; }
/// <summary>
@ -32,6 +31,6 @@ namespace Tapeti.Flow
/// Access to the message context if this is a continuated flow.
/// Will be null when in a starting flow.
/// </summary>
IMessageContext MessageContext { get; }
IMessageContext? MessageContext { get; }
}
}

View File

@ -20,7 +20,19 @@ namespace Tapeti.Flow
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler);
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
@ -39,7 +51,7 @@ namespace Tapeti.Flow
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler);
IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
@ -55,7 +67,7 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="message"></param>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint EndWithResponse<TResponse>(TResponse message);
IYieldPoint EndWithResponse<TResponse>(TResponse message) where TResponse : class;
/// <summary>
@ -108,19 +120,19 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="context"></param>
/// <param name="yieldPoint"></param>
Task Execute(IFlowHandlerContext context, IYieldPoint yieldPoint);
ValueTask Execute(IFlowHandlerContext context, IYieldPoint yieldPoint);
/// <summary>
/// Returns the parallel request for the given message context.
/// </summary>
IFlowParallelRequest GetParallelRequest(IFlowHandlerContext context);
IFlowParallelRequest? GetParallelRequest(IFlowHandlerContext context);
/// <summary>
/// Calls the converge method for a parallel flow.
/// </summary>
Task Converge(IFlowHandlerContext context);
ValueTask Converge(IFlowHandlerContext context);
}
@ -162,14 +174,31 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
/// Note that the response handler can not influence the flow as it does not return a YieldPoint.
/// It can instead store state in the controller for the continuation passed to the Yield method.
/// Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask> responseHandler) where TRequest : class where TResponse : class;
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,Task})"/>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler);
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler) where TRequest : class where TResponse : class;
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,ValueTask})"/>
IFlowParallelRequestBuilder AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, ValueTask> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
@ -179,7 +208,14 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler) where TRequest : class where TResponse : class;
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequestSync{TRequest,TResponse}(TRequest,Action{TResponse})"/>
IFlowParallelRequestBuilder AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse, IFlowParallelRequest> responseHandler) where TRequest : class where TResponse : class;
/// There is no Sync overload with an IFlowParallelRequest parameter, as the AddRequest methods for that are
/// async, so you should always await them.
@ -224,14 +260,14 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler);
Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, Task> responseHandler) where TRequest : class where TResponse : class;
/// <remarks>
/// This overload allows the response handler access to the IFlowParallelRequest interface, which
/// can be used to add additional requests to the parallel request before the continuation method passed to the Yield method is called.
/// </remarks>
/// <inheritdoc cref="AddRequest{TRequest,TResponse}(TRequest,Func{TResponse,Task})"/>
Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler);
Task AddRequest<TRequest, TResponse>(TRequest message, Func<TResponse, IFlowParallelRequest, Task> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message and continue the flow when the response arrives.
@ -241,7 +277,7 @@ namespace Tapeti.Flow
/// </summary>
/// <param name="message"></param>
/// <param name="responseHandler"></param>
Task AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler);
Task AddRequestSync<TRequest, TResponse>(TRequest message, Action<TResponse> responseHandler) where TRequest : class where TResponse : class;
}

View File

@ -13,7 +13,7 @@ namespace Tapeti.Flow
/// Load the previously persisted flow states.
/// </summary>
/// <returns>A list of flow states, where the key is the unique Flow ID and the value is the deserialized T.</returns>
Task<IEnumerable<FlowRecord<T>>> GetStates<T>();
ValueTask<IEnumerable<FlowRecord<T>>> GetStates<T>();
/// <summary>
/// Stores a new flow state. Guaranteed to be run in a lock for the specified flow ID.
@ -22,20 +22,20 @@ namespace Tapeti.Flow
/// <param name="state">The flow state to be stored.</param>
/// <param name="timestamp">The time when the flow was initially created.</param>
/// <returns></returns>
Task CreateState<T>(Guid flowID, T state, DateTime timestamp);
ValueTask CreateState<T>(Guid flowID, T state, DateTime timestamp);
/// <summary>
/// Updates an existing flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary>
/// <param name="flowID">The unique ID of the flow.</param>
/// <param name="state">The flow state to be stored.</param>
Task UpdateState<T>(Guid flowID, T state);
ValueTask UpdateState<T>(Guid flowID, T state);
/// <summary>
/// Delete a flow state. Guaranteed to be run in a lock for the specified flow ID.
/// </summary>
/// <param name="flowID">The unique ID of the flow.</param>
Task DeleteState(Guid flowID);
ValueTask DeleteState(Guid flowID);
}

View File

@ -17,19 +17,19 @@ namespace Tapeti.Flow
/// If using an IFlowRepository that requires an update (such as creating tables) make
/// sure it is called before calling Load.
/// </summary>
Task Load();
ValueTask Load();
/// <summary>
/// Looks up the FlowID corresponding to a ContinuationID. For internal use.
/// </summary>
/// <param name="continuationID"></param>
Task<Guid?> FindFlowID(Guid continuationID);
ValueTask<Guid?> FindFlowID(Guid continuationID);
/// <summary>
/// Acquires a lock on the flow with the specified FlowID.
/// </summary>
/// <param name="flowID"></param>
Task<IFlowStateLock> LockFlowState(Guid flowID);
ValueTask<IFlowStateLock> LockFlowState(Guid flowID);
/// <summary>
/// Returns information about the currently active flows.
@ -38,11 +38,10 @@ namespace Tapeti.Flow
/// This is intended for monitoring purposes and should be treated as a snapshot.
/// </remarks>
/// <param name="minimumAge">The minimum age of the flow before it is included in the result. Set to TimeSpan.Zero to return all active flows.</param>
Task<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge);
ValueTask<IEnumerable<ActiveFlow>> GetActiveFlows(TimeSpan minimumAge);
}
/// <inheritdoc />
/// <summary>
/// Represents a lock on the flow state, to provide thread safety.
/// </summary>
@ -56,19 +55,19 @@ namespace Tapeti.Flow
/// <summary>
/// Acquires a copy of the flow state.
/// </summary>
Task<FlowState> GetFlowState();
ValueTask<FlowState?> GetFlowState();
/// <summary>
/// Stores the new flow state.
/// </summary>
/// <param name="flowState"></param>
/// <param name="persistent"></param>
Task StoreFlowState(FlowState flowState, bool persistent);
ValueTask StoreFlowState(FlowState flowState, bool persistent);
/// <summary>
/// Disposes of the flow state corresponding to this Flow ID.
/// </summary>
Task DeleteFlowState();
ValueTask DeleteFlowState();
}

View File

@ -170,11 +170,11 @@ namespace JetBrains.Annotations
{
public PublicAPIAttribute() { }
public PublicAPIAttribute([NotNull] string comment)
public PublicAPIAttribute(string comment)
{
Comment = comment;
}
[CanBeNull] public string Comment { get; }
public string? Comment { get; }
}
}

View File

@ -2,7 +2,6 @@
namespace Tapeti.Flow
{
/// <inheritdoc />
/// <summary>
/// Raised when a response is expected to end a flow, but none was provided.
/// </summary>

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Authors>Menno van Lavieren, Mark van Renswoude</Authors>
<Company />
@ -11,12 +11,19 @@
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<PackageIcon>Tapeti.Flow.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702</NoWarn>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)'!='netstandard2.0'">
<!-- Suppress 'Use switch expression' which requires language version 8 not available in .NET Standard 2.0 -->
<NoWarn>IDE0066</NoWarn>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>
@ -29,7 +36,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="Tapeti.Annotations" Version="3.0.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="Tapeti.Annotations" Version="3.*-*" />
</ItemGroup>
</Project>

View File

@ -2,7 +2,6 @@
namespace Tapeti.Flow
{
/// <inheritdoc />
/// <summary>
/// Raised when an invalid yield point is returned.
/// </summary>

View File

@ -4,7 +4,6 @@ using Ninject;
namespace Tapeti.Ninject
{
/// <inheritdoc />
/// <summary>
/// Dependency resolver and container implementation for Ninject.
/// </summary>
@ -49,7 +48,7 @@ namespace Tapeti.Ninject
if (kernel.GetBindings(typeof(TService)).Any())
return;
kernel.Bind<TService>().ToMethod(context => factory());
kernel.Bind<TService>().ToMethod(_ => factory());
}
@ -77,7 +76,7 @@ namespace Tapeti.Ninject
if (kernel.GetBindings(typeof(TService)).Any())
return;
kernel.Bind<TService>().ToMethod(context => factory()).InSingletonScope();
kernel.Bind<TService>().ToMethod(_ => factory()).InSingletonScope();
}

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Authors>Mark van Renswoude</Authors>
<Company />
@ -11,10 +11,12 @@
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<PackageIcon>Tapeti.SimpleInjector.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Ninject" Version="3.3.4" />
<PackageReference Include="Ninject" Version="3.*" />
</ItemGroup>
<ItemGroup>
@ -29,6 +31,6 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
</Project>

View File

@ -10,7 +10,7 @@ namespace Tapeti.Serilog.Default
public class DiagnosticContext : IDiagnosticContext
{
private readonly global::Serilog.ILogger logger;
private readonly List<LogEventProperty> properties = new List<LogEventProperty>();
private readonly List<LogEventProperty> properties = new();
/// <summary>

View File

@ -43,7 +43,7 @@ namespace Tapeti.Serilog.Middleware
}
private static object DiagnosticContextFactory(IMessageContext context)
private static object? DiagnosticContextFactory(IMessageContext context)
{
return context.TryGet<DiagnosticContextPayload>(out var diagnosticContextPayload)
? diagnosticContextPayload.DiagnosticContext

View File

@ -29,7 +29,7 @@ namespace Tapeti.Serilog.Middleware
}
/// <inheritdoc />
public async Task Handle(IMessageContext context, Func<Task> next)
public async ValueTask Handle(IMessageContext context, Func<ValueTask> next)
{
var logger = context.Config.DependencyResolver.Resolve<global::Serilog.ILogger>();
@ -41,6 +41,7 @@ namespace Tapeti.Serilog.Middleware
await next();
stopwatch.Stop();

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Authors>Hans Mulder, Mark van Renswoude</Authors>
<Company />
@ -11,6 +11,8 @@
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<PackageIcon>Tapeti.Serilog.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
@ -18,7 +20,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Serilog" Version="2.10.0" />
<PackageReference Include="Serilog" Version="2.*" />
</ItemGroup>
<ItemGroup>
@ -33,6 +35,6 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
</Project>

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Text;
using Tapeti.Config;
using Tapeti.Connection;
using ISerilogLogger = Serilog.ILogger;
// ReSharper disable UnusedMember.Global
@ -86,7 +87,7 @@ namespace Tapeti.Serilog
public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
var message = new StringBuilder("Tapeti: exception in message handler");
var messageParams = new List<object>();
var messageParams = new List<object?>();
var contextLogger = seriLogger
.ForContext("consumeResult", consumeResult)
@ -129,10 +130,11 @@ namespace Tapeti.Serilog
}
/// <inheritdoc />
public void QueueExistsWarning(string queueName, Dictionary<string, string> arguments)
public void QueueExistsWarning(string queueName, IRabbitMQArguments? existingArguments, IRabbitMQArguments? arguments)
{
seriLogger.Warning("Tapeti: durable queue {queueName} exists with incompatible x-arguments ({arguments}) and will not be redeclared, queue will be consumed as-is",
seriLogger.Warning("Tapeti: durable queue {queueName} exists with incompatible x-arguments ({existingArguments} vs. {arguments}) and will not be redeclared, queue will be consumed as-is",
queueName,
existingArguments,
arguments);
}

View File

@ -4,19 +4,18 @@ using SimpleInjector;
namespace Tapeti.SimpleInjector
{
/// <inheritdoc />
/// <summary>
/// Dependency resolver and container implementation for SimpleInjector.
/// </summary>
public class SimpleInjectorDependencyResolver : IDependencyContainer
{
private readonly Container container;
private readonly Lifestyle defaultsLifestyle;
private readonly Lifestyle controllersLifestyle;
private readonly Lifestyle? defaultsLifestyle;
private readonly Lifestyle? controllersLifestyle;
/// <summary>
/// </summary>
public SimpleInjectorDependencyResolver(Container container, Lifestyle defaultsLifestyle = null, Lifestyle controllersLifestyle = null)
public SimpleInjectorDependencyResolver(Container container, Lifestyle? defaultsLifestyle = null, Lifestyle? controllersLifestyle = null)
{
this.container = container;
this.defaultsLifestyle = defaultsLifestyle;

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Authors>Mark van Renswoude</Authors>
<Company />
@ -11,7 +11,8 @@
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<PackageIcon>Tapeti.SimpleInjector.png</PackageIcon>
<Version>2.0.0</Version>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
@ -19,7 +20,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SimpleInjector" Version="5.3.0" />
<PackageReference Include="SimpleInjector" Version="5.*" />
</ItemGroup>
<ItemGroup>
@ -34,6 +35,6 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,79 @@
using System;
using System.Threading.Tasks;
using Docker.DotNet;
using Docker.DotNet.Models;
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Configurations;
using DotNet.Testcontainers.Containers;
using Xunit;
namespace Tapeti.Tests.Client
{
[CollectionDefinition(Name)]
public sealed class RabbitMQCollection : ICollectionFixture<RabbitMQFixture>
{
public const string Name = "RabbitMQ";
}
public sealed class RabbitMQFixture : IAsyncLifetime
{
public static string RabbitMQUsername => "tapetitests";
public static string RabbitMQPassword => "topsecret1234";
public ushort RabbitMQPort { get; private set; }
public ushort RabbitMQManagementPort { get; private set; }
private TestcontainerMessageBroker? testcontainers;
private const int DefaultRabbitMQPort = 5672;
private const int DefaultRabbitMQManagementPort = 15672;
private const string ImageName = "rabbitmq";
private const string ImageTag = "3.11.3-alpine";
public async Task InitializeAsync()
{
// Testcontainers does not seem to pull the image the first time.
// I didn't get it to work, even using WithImagePullPolicy from the latest beta.
// Note: running it the first time can take a while.
var client = new DockerClientConfiguration().CreateClient();
await client.Images.CreateImageAsync(
new ImagesCreateParameters
{
FromImage = ImageName,
Tag = ImageTag
},
null,
new Progress<JSONMessage>());
// If you get a "Sequence contains no elements" error here: make sure Docker Desktop is running
var testcontainersBuilder = new TestcontainersBuilder<RabbitMqTestcontainer>()
.WithMessageBroker(new RabbitMqTestcontainerConfiguration($"{ImageName}:{ImageTag}")
{
Username = RabbitMQUsername,
Password = RabbitMQPassword
})
.WithExposedPort(DefaultRabbitMQManagementPort)
.WithPortBinding(0, DefaultRabbitMQManagementPort);
testcontainers = testcontainersBuilder.Build();
await testcontainers.StartAsync();
RabbitMQPort = testcontainers.GetMappedPublicPort(DefaultRabbitMQPort);
RabbitMQManagementPort = testcontainers.GetMappedPublicPort(DefaultRabbitMQManagementPort);
}
public async Task DisposeAsync()
{
if (testcontainers != null)
await testcontainers.DisposeAsync();
}
}
}

View File

@ -0,0 +1,116 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Tapeti.Connection;
using Tapeti.Default;
using Tapeti.Exceptions;
using Tapeti.Tests.Mock;
using Xunit;
using Xunit.Abstractions;
namespace Tapeti.Tests.Client
{
[Collection(RabbitMQCollection.Name)]
[Trait("Category", "Requires Docker")]
public class TapetiClientTests : IAsyncLifetime
{
private readonly RabbitMQFixture fixture;
private readonly MockDependencyResolver dependencyResolver = new();
private TapetiClient client = null!;
public TapetiClientTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper)
{
this.fixture = fixture;
dependencyResolver.Set<ILogger>(new MockLogger(testOutputHelper));
}
public Task InitializeAsync()
{
client = CreateClient();
return Task.CompletedTask;
}
public async Task DisposeAsync()
{
await client.Close();
}
[Fact]
public void Fixture()
{
fixture.RabbitMQPort.Should().BeGreaterThan(0);
fixture.RabbitMQManagementPort.Should().BeGreaterThan(0);
}
[Fact]
public async Task DynamicQueueDeclareNoPrefix()
{
var queueName = await client.DynamicQueueDeclare(null, null, CancellationToken.None);
queueName.Should().NotBeNullOrEmpty();
}
[Fact]
public async Task DynamicQueueDeclarePrefix()
{
var queueName = await client.DynamicQueueDeclare("dynamicprefix", null, CancellationToken.None);
queueName.Should().StartWith("dynamicprefix");
}
[Fact]
public async Task PublishHandleOverflow()
{
var queue1 = await client.DynamicQueueDeclare(null, new RabbitMQArguments
{
{ "x-max-length", 5 },
{ "x-overflow", "reject-publish" }
}, CancellationToken.None);
var queue2 = await client.DynamicQueueDeclare(null, null, CancellationToken.None);
var body = Encoding.UTF8.GetBytes("Hello world!");
var properties = new MessageProperties();
for (var i = 0; i < 5; i++)
await client.Publish(body, properties, null, queue1, true);
var publishOverMaxLength = () => client.Publish(body, properties, null, queue1, true);
await publishOverMaxLength.Should().ThrowAsync<NackException>();
// The channel should recover and allow further publishing
await client.Publish(body, properties, null, queue2, true);
}
// TODO test the other methods
private TapetiClient CreateClient()
{
return new TapetiClient(
new TapetiConfig.Config(dependencyResolver),
new TapetiConnectionParams
{
HostName = "127.0.0.1",
Port = fixture.RabbitMQPort,
ManagementPort = fixture.RabbitMQManagementPort,
Username = RabbitMQFixture.RabbitMQUsername,
Password = RabbitMQFixture.RabbitMQPassword,
PrefetchCount = 50
});
}
}
}

View File

@ -0,0 +1,29 @@
using JetBrains.Annotations;
using Tapeti.Config;
using Tapeti.Tests.Mock;
namespace Tapeti.Tests.Config
{
public class BaseControllerTest
{
protected readonly MockDependencyResolver DependencyResolver = new();
protected ITapetiConfig GetControllerConfig<[MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)] T>() where T : class
{
var configBuilder = new TapetiConfig(DependencyResolver);
configBuilder.EnableDeclareDurableQueues();
configBuilder.RegisterController(typeof(T));
var config = configBuilder.Build();
return config;
}
protected ITapetiConfigBindings GetControllerBindings<[MeansImplicitUse(ImplicitUseTargetFlags.WithMembers)] T>() where T : class
{
return GetControllerConfig<T>().Bindings;
}
}
}

View File

@ -0,0 +1,210 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using FluentAssertions.Execution;
using Moq;
using Tapeti.Annotations;
using Tapeti.Config;
using Tapeti.Connection;
using Xunit;
namespace Tapeti.Tests.Config
{
internal static class UTF8StringExtensions
{
public static string AsUTF8String(this object value)
{
value.Should().BeOfType<byte[]>();
return Encoding.UTF8.GetString((byte[])value);
}
}
public class QueueArgumentsTest : BaseControllerTest
{
private static readonly MockRepository MoqRepository = new(MockBehavior.Strict);
private readonly Mock<ITapetiClient> client;
private readonly Dictionary<string, IRabbitMQArguments> declaredQueues = new();
public QueueArgumentsTest()
{
client = MoqRepository.Create<ITapetiClient>();
var routingKeyStrategy = MoqRepository.Create<IRoutingKeyStrategy>();
var exchangeStrategy = MoqRepository.Create<IExchangeStrategy>();
DependencyResolver.Set(routingKeyStrategy.Object);
DependencyResolver.Set(exchangeStrategy.Object);
routingKeyStrategy
.Setup(s => s.GetRoutingKey(typeof(TestMessage1)))
.Returns("testmessage1");
routingKeyStrategy
.Setup(s => s.GetRoutingKey(typeof(TestMessage2)))
.Returns("testmessage2");
exchangeStrategy
.Setup(s => s.GetExchange(It.IsAny<Type>()))
.Returns("exchange");
var queue = 0;
client
.Setup(c => c.DynamicQueueDeclare(null, It.IsAny<IRabbitMQArguments>(), It.IsAny<CancellationToken>()))
.Callback((string _, IRabbitMQArguments arguments, CancellationToken _) =>
{
queue++;
declaredQueues.Add($"queue-{queue}", arguments);
})
.ReturnsAsync(() => $"queue-{queue}");
client
.Setup(c => c.DurableQueueDeclare(It.IsAny<string>(), It.IsAny<IEnumerable<QueueBinding>>(), It.IsAny<IRabbitMQArguments>(), It.IsAny<CancellationToken>()))
.Callback((string queueName, IEnumerable<QueueBinding> _, IRabbitMQArguments arguments, CancellationToken _) =>
{
declaredQueues.Add(queueName, arguments);
})
.Returns(Task.CompletedTask);
client
.Setup(c => c.DynamicQueueBind(It.IsAny<string>(), It.IsAny<QueueBinding>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
}
[Fact]
public async Task SingleQueueArguments()
{
var config = GetControllerConfig<TestController>();
var binding1 = config.Bindings.Single(b => b is IControllerMethodBinding cmb && cmb.Method.Name == "HandleMessage1");
binding1.Should().NotBeNull();
var binding2 = config.Bindings.Single(b => b is IControllerMethodBinding cmb && cmb.Method.Name == "HandleMessage2");
binding2.Should().NotBeNull();
var subscriber = new TapetiSubscriber(() => client.Object, config);
await subscriber.ApplyBindings();
declaredQueues.Should().HaveCount(1);
var arguments = declaredQueues["queue-1"];
arguments.Should().ContainKey("x-custom").WhoseValue.AsUTF8String().Should().Be("custom value");
arguments.Should().ContainKey("x-another").WhoseValue.Should().Be(true);
arguments.Should().ContainKey("x-max-length").WhoseValue.Should().Be(100);
arguments.Should().ContainKey("x-max-length-bytes").WhoseValue.Should().Be(100000);
arguments.Should().ContainKey("x-message-ttl").WhoseValue.Should().Be(4269);
arguments.Should().ContainKey("x-overflow").WhoseValue.AsUTF8String().Should().Be("reject-publish");
}
[Fact]
public async Task ConflictingDynamicQueueArguments()
{
var config = GetControllerConfig<ConflictingArgumentsTestController>();
var subscriber = new TapetiSubscriber(() => client.Object, config);
await subscriber.ApplyBindings();
declaredQueues.Should().HaveCount(2);
var arguments1 = declaredQueues["queue-1"];
arguments1.Should().ContainKey("x-max-length").WhoseValue.Should().Be(100);
var arguments2 = declaredQueues["queue-2"];
arguments2.Should().ContainKey("x-max-length-bytes").WhoseValue.Should().Be(100000);
}
[Fact]
public async Task ConflictingDurableQueueArguments()
{
var config = GetControllerConfig<ConflictingArgumentsDurableQueueTestController>();
var testApplyBindings = () =>
{
var subscriber = new TapetiSubscriber(() => client.Object, config);
return subscriber.ApplyBindings();
};
using (new AssertionScope())
{
await testApplyBindings.Should().ThrowAsync<TopologyConfigurationException>();
declaredQueues.Should().HaveCount(0);
}
}
// ReSharper disable all
#pragma warning disable
private class TestMessage1
{
}
private class TestMessage2
{
}
[DynamicQueue]
[QueueArguments("x-custom", "custom value", "x-another", true, MaxLength = 100, MaxLengthBytes = 100000, MessageTTL = 4269, Overflow = RabbitMQOverflow.RejectPublish)]
private class TestController
{
public void HandleMessage1(TestMessage1 message)
{
}
public void HandleMessage2(TestMessage2 message)
{
}
}
[DynamicQueue]
[QueueArguments(MaxLength = 100)]
private class ConflictingArgumentsTestController
{
public void HandleMessage1(TestMessage1 message)
{
}
[QueueArguments(MaxLengthBytes = 100000)]
public void HandleMessage2(TestMessage1 message)
{
}
}
[DurableQueue("durable")]
[QueueArguments(MaxLength = 100)]
private class ConflictingArgumentsDurableQueueTestController
{
public void HandleMessage1(TestMessage1 message)
{
}
[QueueArguments(MaxLengthBytes = 100000)]
public void HandleMessage2(TestMessage1 message)
{
}
}
#pragma warning restore
// ReSharper restore all
}
}

View File

@ -0,0 +1,54 @@
using System.Linq;
using FluentAssertions;
using Tapeti.Annotations;
using Tapeti.Config;
using Xunit;
namespace Tapeti.Tests.Config
{
public class SimpleControllerTest : BaseControllerTest
{
[Fact]
public void RegisterController()
{
var bindings = GetControllerBindings<TestController>();
bindings.Should().HaveCount(2);
var handleSimpleMessageBinding = bindings.Single(b => b is IControllerMethodBinding cmb &&
cmb.Controller == typeof(TestController) &&
cmb.Method.Name == "HandleSimpleMessage");
handleSimpleMessageBinding.QueueType.Should().Be(QueueType.Dynamic);
var handleSimpleMessageStaticBinding = bindings.Single(b => b is IControllerMethodBinding cmb &&
cmb.Controller == typeof(TestController) &&
cmb.Method.Name == "HandleSimpleMessageStatic");
handleSimpleMessageStaticBinding.QueueType.Should().Be(QueueType.Dynamic);
}
// ReSharper disable all
#pragma warning disable
private class TestMessage
{
}
[DynamicQueue]
private class TestController
{
public void HandleSimpleMessage(TestMessage message)
{
}
public static void HandleSimpleMessageStatic(TestMessage message)
{
}
}
#pragma warning restore
// ReSharper restore all
}
}

View File

@ -0,0 +1,227 @@
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using FluentAssertions;
using Tapeti.Helpers;
using Xunit;
namespace Tapeti.Tests.Helpers
{
public class ExpressionInvokerTest
{
[Fact]
public void InstanceMethodVoidNoParameters()
{
const string methodName = nameof(InvokeTarget.InstanceMethodVoidNoParameters);
var invoker = InvokerFor(methodName);
var target = new InvokeTarget();
invoker.Invoke(target);
target.Verify(methodName);
}
[Fact]
public void InstanceMethodReturnValueNoParameters()
{
const string methodName = nameof(InvokeTarget.InstanceMethodReturnValueNoParameters);
var invoker = InvokerFor(methodName);
var target = new InvokeTarget();
var returnValue = invoker.Invoke(target);
target.Verify(methodName);
returnValue.Should().Be("Hello world!");
}
[Fact]
public void InstanceMethodVoidParameters()
{
const string methodName = nameof(InvokeTarget.InstanceMethodVoidParameters);
var invoker = InvokerFor(methodName);
var target = new InvokeTarget();
invoker.Invoke(target, 42);
target.Verify(methodName, "42");
}
[Fact]
public void InstanceMethodReturnValueParameters()
{
const string methodName = nameof(InvokeTarget.InstanceMethodReturnValueParameters);
var invoker = InvokerFor(methodName);
var target = new InvokeTarget();
var returnValue = invoker.Invoke(target, new byte[] { 42, 69 });
target.Verify(methodName, "42,69");
returnValue.Should().Be(true);
}
[Fact]
public void StaticMethodVoidNoParameters()
{
InvokeTarget.ResetStatic();
const string methodName = nameof(InvokeTarget.StaticMethodVoidNoParameters);
var invoker = InvokerFor(methodName);
invoker.Invoke(null);
InvokeTarget.VerifyStatic(methodName);
}
[Fact]
public void StaticMethodReturnValueNoParameters()
{
InvokeTarget.ResetStatic();
const string methodName = nameof(InvokeTarget.StaticMethodReturnValueNoParameters);
var invoker = InvokerFor(methodName);
var returnValue = invoker.Invoke(null);
InvokeTarget.VerifyStatic(methodName);
returnValue.Should().Be("Hello world!");
}
[Fact]
public void StaticMethodVoidParameters()
{
InvokeTarget.ResetStatic();
const string methodName = nameof(InvokeTarget.StaticMethodVoidParameters);
var invoker = InvokerFor(methodName);
invoker.Invoke(null, 42);
InvokeTarget.VerifyStatic(methodName, "42");
}
[Fact]
public void StaticMethodReturnValueParameters()
{
InvokeTarget.ResetStatic();
const string methodName = nameof(InvokeTarget.StaticMethodReturnValueParameters);
var invoker = InvokerFor(methodName);
var returnValue = invoker.Invoke(null, new byte[] { 42, 69 });
InvokeTarget.VerifyStatic(methodName, "42,69");
returnValue.Should().Be(true);
}
private static ExpressionInvoke InvokerFor(string invokeTargetMethodName)
{
var method = typeof(InvokeTarget).GetMethod(invokeTargetMethodName);
return method!.CreateExpressionInvoke();
}
// ReSharper disable ParameterHidesMember
private class InvokeTarget
{
private static string? staticMethodName;
private static string? staticParameters;
private string? methodName;
private string? parameters;
public void InstanceMethodVoidNoParameters()
{
MethodCalled();
}
public string InstanceMethodReturnValueNoParameters()
{
MethodCalled();
return "Hello world!";
}
public void InstanceMethodVoidParameters(int answer)
{
MethodCalled(answer.ToString());
}
public bool InstanceMethodReturnValueParameters(IEnumerable<byte> values)
{
MethodCalled(string.Join(',', values.Select(v => v.ToString())));
return true;
}
public static void StaticMethodVoidNoParameters()
{
StaticMethodCalled();
}
public static string StaticMethodReturnValueNoParameters()
{
StaticMethodCalled();
return "Hello world!";
}
public static void StaticMethodVoidParameters(int answer)
{
StaticMethodCalled(answer.ToString());
}
public static bool StaticMethodReturnValueParameters(IEnumerable<byte> values)
{
StaticMethodCalled(string.Join(',', values.Select(v => v.ToString())));
return true;
}
private void MethodCalled(string parameters = "", [CallerMemberName]string methodName = "")
{
this.methodName.Should().BeNull();
this.methodName = methodName;
this.parameters = parameters;
}
public static void ResetStatic()
{
staticMethodName = null;
staticParameters = null;
}
private static void StaticMethodCalled(string parameters = "", [CallerMemberName] string methodName = "")
{
staticMethodName.Should().BeNull();
staticMethodName = methodName;
staticParameters = parameters;
}
public void Verify(string methodName, string parameters = "")
{
this.methodName.Should().Be(methodName);
this.parameters.Should().Be(parameters);
}
public static void VerifyStatic(string methodName, string parameters = "")
{
staticMethodName.Should().Be(methodName);
staticParameters.Should().Be(parameters);
}
}
}
}

View File

@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
namespace Tapeti.Tests.Mock
{
public class MockDependencyResolver : IDependencyResolver
{
private readonly Dictionary<Type, object> container = new();
public void Set<TInterface>(TInterface instance) where TInterface : class
{
container.Add(typeof(TInterface), instance);
}
public T Resolve<T>() where T : class
{
return (T)Resolve(typeof(T));
}
public object Resolve(Type type)
{
return container[type];
}
}
}

View File

@ -0,0 +1,98 @@
using System;
using System.Text;
using Tapeti.Config;
using Tapeti.Connection;
using Xunit.Abstractions;
namespace Tapeti.Tests.Mock
{
internal class MockLogger : IBindingLogger
{
private readonly ITestOutputHelper testOutputHelper;
public MockLogger(ITestOutputHelper testOutputHelper)
{
this.testOutputHelper = testOutputHelper;
}
public void Connect(IConnectContext connectContext)
{
testOutputHelper.WriteLine($"{(connectContext.IsReconnect ? "Reconnecting" : "Connecting")} to {connectContext.ConnectionParams.HostName}:{connectContext.ConnectionParams.Port}{connectContext.ConnectionParams.VirtualHost}");
}
public void ConnectFailed(IConnectFailedContext connectContext)
{
testOutputHelper.WriteLine($"Connection failed: {connectContext.Exception}");
}
public void ConnectSuccess(IConnectSuccessContext connectContext)
{
testOutputHelper.WriteLine($"{(connectContext.IsReconnect ? "Reconnected" : "Connected")} using local port {connectContext.LocalPort}");
}
public void Disconnect(IDisconnectContext disconnectContext)
{
testOutputHelper.WriteLine($"Connection closed: {(!string.IsNullOrEmpty(disconnectContext.ReplyText) ? disconnectContext.ReplyText : "<no reply text>")} (reply code: {disconnectContext.ReplyCode})");
}
public void ConsumeException(Exception exception, IMessageContext messageContext, ConsumeResult consumeResult)
{
testOutputHelper.WriteLine(exception.Message);
}
public void QueueDeclare(string queueName, bool durable, bool passive)
{
testOutputHelper.WriteLine(passive
? $"Verifying durable queue {queueName}"
: $"Declaring {(durable ? "durable" : "dynamic")} queue {queueName}");
}
public void QueueExistsWarning(string queueName, IRabbitMQArguments? existingArguments, IRabbitMQArguments? arguments)
{
testOutputHelper.WriteLine($"[Tapeti] Durable queue {queueName} exists with incompatible x-arguments ({GetArgumentsText(existingArguments)} vs. {GetArgumentsText(arguments)}) and will not be redeclared, queue will be consumed as-is");
}
private static string GetArgumentsText(IRabbitMQArguments? arguments)
{
if (arguments == null || arguments.Count == 0)
return "empty";
var argumentsText = new StringBuilder();
foreach (var pair in arguments)
{
if (argumentsText.Length > 0)
argumentsText.Append(", ");
argumentsText.Append($"{pair.Key} = {pair.Value}");
}
return argumentsText.ToString();
}
public void QueueBind(string queueName, bool durable, string exchange, string routingKey)
{
testOutputHelper.WriteLine($"Binding {queueName} to exchange {exchange} with routing key {routingKey}");
}
public void QueueUnbind(string queueName, string exchange, string routingKey)
{
testOutputHelper.WriteLine($"Removing binding for {queueName} to exchange {exchange} with routing key {routingKey}");
}
public void ExchangeDeclare(string exchange)
{
testOutputHelper.WriteLine($"Declaring exchange {exchange}");
}
public void QueueObsolete(string queueName, bool deleted, uint messageCount)
{
testOutputHelper.WriteLine(deleted
? $"Obsolete queue was deleted: {queueName}"
: $"Obsolete queue bindings removed: {queueName}, {messageCount} messages remaining");
}
}
}

View File

@ -1,7 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
@ -9,10 +10,13 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="FluentAssertions" Version="5.10.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.10.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PackageReference Include="FluentAssertions" Version="6.8.0" />
<PackageReference Include="JetBrains.Annotations" Version="2022.3.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
<PackageReference Include="Moq" Version="4.18.2" />
<PackageReference Include="Testcontainers" Version="2.2.0" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
@ -22,8 +26,4 @@
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Core\" />
</ItemGroup>
</Project>

View File

@ -1,6 +1,8 @@
using System;
using Tapeti.Config;
// ReSharper disable UnusedMember.Global
namespace Tapeti.Transient
{
/// <summary>

View File

@ -15,6 +15,6 @@ namespace Tapeti.Transient
/// <param name="request"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request);
Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request) where TRequest : class where TResponse : class;
}
}

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Authors>Menno van Lavieren, Mark van Renswoude</Authors>
<Company />
@ -11,6 +11,8 @@
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<PackageIcon>Tapeti.Flow.png</PackageIcon>
<Version>2.0.0</Version>
<LangVersion>9</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
@ -29,6 +31,6 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
</Project>

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Tapeti.Config;
namespace Tapeti.Transient
@ -31,7 +32,7 @@ namespace Tapeti.Transient
/// <inheritdoc />
public IEnumerable<object> GetMiddleware(IDependencyResolver dependencyResolver)
{
return null;
return Enumerable.Empty<object>();
}

View File

@ -4,7 +4,6 @@ using Tapeti.Config;
namespace Tapeti.Transient
{
/// <inheritdoc />
/// <summary>
/// Implements a binding for transient request response messages.
/// Register this binding using the WithTransient config extension method.
@ -15,10 +14,10 @@ namespace Tapeti.Transient
private readonly string dynamicQueuePrefix;
/// <inheritdoc />
public string QueueName { get; private set; }
public string? QueueName { get; private set; }
/// <inheritdoc />
public QueueType QueueType => QueueType.Dynamic;
public QueueType? QueueType => Config.QueueType.Dynamic;
/// <summary>
@ -31,9 +30,9 @@ namespace Tapeti.Transient
/// <inheritdoc />
public async Task Apply(IBindingTarget target)
public async ValueTask Apply(IBindingTarget target)
{
QueueName = await target.BindDynamicDirect(dynamicQueuePrefix);
QueueName = await target.BindDynamicDirect(dynamicQueuePrefix, null);
router.TransientResponseQueueName = QueueName;
}
@ -46,17 +45,17 @@ namespace Tapeti.Transient
/// <inheritdoc />
public Task Invoke(IMessageContext context)
public ValueTask Invoke(IMessageContext context)
{
router.HandleMessage(context);
return Task.CompletedTask;
return default;
}
/// <inheritdoc />
public Task Cleanup(IMessageContext context, ConsumeResult consumeResult)
public ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult)
{
return Task.CompletedTask;
return default;
}
}
}

View File

@ -2,7 +2,6 @@
namespace Tapeti.Transient
{
/// <inheritdoc />
/// <summary>
/// Default implementation of ITransientPublisher
/// </summary>
@ -22,7 +21,7 @@ namespace Tapeti.Transient
/// <inheritdoc />
public async Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request)
public async Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request) where TRequest : class where TResponse : class
{
return (TResponse)await router.RequestResponse(publisher, request);
}

View File

@ -13,12 +13,12 @@ namespace Tapeti.Transient
internal class TransientRouter
{
private readonly int defaultTimeoutMs;
private readonly ConcurrentDictionary<Guid, TaskCompletionSource<object>> map = new ConcurrentDictionary<Guid, TaskCompletionSource<object>>();
private readonly ConcurrentDictionary<Guid, TaskCompletionSource<object>> map = new();
/// <summary>
/// The generated name of the dynamic queue to which responses should be sent.
/// </summary>
public string TransientResponseQueueName { get; set; }
public string? TransientResponseQueueName { get; set; }
/// <summary>
@ -41,8 +41,13 @@ namespace Tapeti.Transient
if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID))
return;
if (map.TryRemove(continuationID, out var tcs))
tcs.TrySetResult(context.Message);
if (!map.TryRemove(continuationID, out var tcs))
return;
if (context.Message == null)
throw new InvalidOperationException();
tcs.TrySetResult(context.Message);
}
@ -55,7 +60,7 @@ namespace Tapeti.Transient
public async Task<object> RequestResponse(IPublisher publisher, object request)
{
var correlation = Guid.NewGuid();
var tcs = map.GetOrAdd(correlation, c => new TaskCompletionSource<object>());
var tcs = map.GetOrAdd(correlation, _ => new TaskCompletionSource<object>());
try
{
@ -72,20 +77,22 @@ namespace Tapeti.Transient
{
// Simple cleanup of the task and map dictionary.
if (map.TryRemove(correlation, out tcs))
tcs.TrySetResult(null);
tcs.TrySetResult(null!);
throw;
}
using (new Timer(TimeoutResponse, tcs, defaultTimeoutMs, -1))
await using (new Timer(TimeoutResponse, tcs, defaultTimeoutMs, -1))
{
return await tcs.Task;
}
}
private void TimeoutResponse(object tcs)
private void TimeoutResponse(object? tcs)
{
ArgumentNullException.ThrowIfNull(tcs, nameof(tcs));
((TaskCompletionSource<object>)tcs).TrySetException(new TimeoutException("Transient RequestResponse timed out at (ms) " + defaultTimeoutMs));
}
}

View File

@ -1,34 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Authors>Mark van Renswoude</Authors>
<Company />
<Description>Unity container integration package for Tapeti</Description>
<PackageTags>rabbitmq tapeti unity</PackageTags>
<PackageLicenseExpression>Unlicense</PackageLicenseExpression>
<PackageProjectUrl>https://github.com/MvRens/Tapeti</PackageProjectUrl>
<PackageIcon>Tapeti.SimpleInjector.png</PackageIcon>
<Version>2.0.0</Version>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Unity" Version="5.11.10" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup>
<ItemGroup>
<None Include="..\resources\icons\Tapeti.SimpleInjector.png">
<Pack>True</Pack>
<PackagePath></PackagePath>
</None>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
</ItemGroup>
</Project>

View File

@ -1,90 +0,0 @@
using System;
using Unity;
using Unity.Lifetime;
namespace Tapeti.UnityContainer
{
/// <inheritdoc />
/// <summary>
/// Dependency resolver and container implementation for SimpleInjector.
/// </summary>
public class UnityDependencyResolver : IDependencyContainer
{
private readonly IUnityContainer container;
/// <summary>
/// </summary>
public UnityDependencyResolver(IUnityContainer container)
{
this.container = container;
}
/// <inheritdoc />
public T Resolve<T>() where T : class
{
return container.Resolve<T>();
}
/// <inheritdoc />
public object Resolve(Type type)
{
return container.Resolve(type);
}
/// <inheritdoc />
public void RegisterDefault<TService, TImplementation>() where TService : class where TImplementation : class, TService
{
if (container.IsRegistered(typeof(TService)))
return;
container.RegisterType<TService, TImplementation>();
}
/// <inheritdoc />
public void RegisterDefault<TService>(Func<TService> factory) where TService : class
{
if (container.IsRegistered(typeof(TService)))
return;
container.RegisterFactory<TService>(c => factory());
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService, TImplementation>() where TService : class where TImplementation : class, TService
{
if (container.IsRegistered(typeof(TService)))
return;
container.RegisterSingleton<TService, TImplementation>();
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService>(TService instance) where TService : class
{
if (container.IsRegistered(typeof(TService)))
return;
container.RegisterInstance(instance);
}
/// <inheritdoc />
public void RegisterDefaultSingleton<TService>(Func<TService> factory) where TService : class
{
if (container.IsRegistered(typeof(TService)))
return;
container.RegisterFactory<TService>(c => factory(), new SingletonLifetimeManager());
}
/// <inheritdoc />
public void RegisterController(Type type)
{
container.RegisterType(type);
}
}
}

View File

@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.31005.135
# Visual Studio Version 17
VisualStudioVersion = 17.0.32112.339
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti", "Tapeti\Tapeti.csproj", "{2952B141-C54D-4E6F-8108-CAD735B0279F}"
EndProject
@ -45,8 +45,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.CastleWindsor", "Tap
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Autofac", "Tapeti.Autofac\Tapeti.Autofac.csproj", "{B3802005-C941-41B6-A9A5-20573A7C24AE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.UnityContainer", "Tapeti.UnityContainer\Tapeti.UnityContainer.csproj", "{BA8CA9A2-BAFF-42BB-8439-3DD9D1F6C32E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Ninject", "Tapeti.Ninject\Tapeti.Ninject.csproj", "{29478B10-FC53-4E93-ADEF-A775D9408131}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "06-StatelessRequestResponse", "Examples\06-StatelessRequestResponse\06-StatelessRequestResponse.csproj", "{152227AA-3165-4550-8997-6EA80C84516E}"
@ -55,6 +53,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "07-ParallelizationTest", "E
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "08-MessageHandlerLogging", "Examples\08-MessageHandlerLogging\08-MessageHandlerLogging.csproj", "{906605A6-2CAB-4B29-B0DD-B735BF265E39}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Benchmarks", "Tapeti.Benchmarks\Tapeti.Benchmarks.csproj", "{DBE56131-9207-4CEA-BA3E-031351677C48}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -129,10 +129,6 @@ Global
{B3802005-C941-41B6-A9A5-20573A7C24AE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B3802005-C941-41B6-A9A5-20573A7C24AE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B3802005-C941-41B6-A9A5-20573A7C24AE}.Release|Any CPU.Build.0 = Release|Any CPU
{BA8CA9A2-BAFF-42BB-8439-3DD9D1F6C32E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BA8CA9A2-BAFF-42BB-8439-3DD9D1F6C32E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BA8CA9A2-BAFF-42BB-8439-3DD9D1F6C32E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BA8CA9A2-BAFF-42BB-8439-3DD9D1F6C32E}.Release|Any CPU.Build.0 = Release|Any CPU
{29478B10-FC53-4E93-ADEF-A775D9408131}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{29478B10-FC53-4E93-ADEF-A775D9408131}.Debug|Any CPU.Build.0 = Debug|Any CPU
{29478B10-FC53-4E93-ADEF-A775D9408131}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -149,6 +145,10 @@ Global
{906605A6-2CAB-4B29-B0DD-B735BF265E39}.Debug|Any CPU.Build.0 = Debug|Any CPU
{906605A6-2CAB-4B29-B0DD-B735BF265E39}.Release|Any CPU.ActiveCfg = Release|Any CPU
{906605A6-2CAB-4B29-B0DD-B735BF265E39}.Release|Any CPU.Build.0 = Release|Any CPU
{DBE56131-9207-4CEA-BA3E-031351677C48}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DBE56131-9207-4CEA-BA3E-031351677C48}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DBE56131-9207-4CEA-BA3E-031351677C48}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DBE56131-9207-4CEA-BA3E-031351677C48}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -170,7 +170,6 @@ Global
{330D05CE-5321-4C7D-8017-2070B891289E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
{374AAE64-598B-4F67-8870-4A05168FF987} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F}
{B3802005-C941-41B6-A9A5-20573A7C24AE} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F}
{BA8CA9A2-BAFF-42BB-8439-3DD9D1F6C32E} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F}
{29478B10-FC53-4E93-ADEF-A775D9408131} = {99380F97-AD1A-459F-8AB3-D404E1E6AD4F}
{152227AA-3165-4550-8997-6EA80C84516E} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}
{E69E6BA5-68E7-4A4D-A38C-B2526AA66E96} = {266B9B94-A4D2-41C2-860C-24A7C3B63B56}

View File

@ -4,7 +4,9 @@
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ID/@EntryIndexedValue">ID</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=JSON/@EntryIndexedValue">JSON</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=KV/@EntryIndexedValue">KV</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=MQ/@EntryIndexedValue">MQ</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SQL/@EntryIndexedValue">SQL</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=UTF/@EntryIndexedValue">UTF</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /&gt;</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>

View File

@ -1,15 +1,14 @@
namespace Tapeti.Config
{
/// <inheritdoc />
/// <summary>
/// Extends the message context with information about the controller.
/// </summary>
public class ControllerMessageContextPayload : IMessageContextPayload
{
/// <summary>
/// An instance of the controller referenced by the binding. Note: can be null during Cleanup.
/// An instance of the controller referenced by the binding. Note: can be null during Cleanup or when bound to static methods.
/// </summary>
public object Controller { get; }
public object? Controller { get; }
/// <remarks>
@ -23,7 +22,7 @@
/// </summary>
/// <param name="controller">An instance of the controller referenced by the binding</param>
/// <param name="binding">The binding which is currently processing the message</param>
public ControllerMessageContextPayload(object controller, IControllerMethodBinding binding)
public ControllerMessageContextPayload(object? controller, IControllerMethodBinding binding)
{
Controller = controller;
Binding = binding;

View File

@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using Tapeti.Connection;
namespace Tapeti.Config
{
@ -28,20 +29,20 @@ namespace Tapeti.Config
/// <summary>
/// The name of the queue the binding is consuming. May change after a reconnect for dynamic queues.
/// </summary>
string QueueName { get; }
string? QueueName { get; }
/// <summary>
/// Determines the type of queue the binding registers
/// </summary>
QueueType QueueType { get; }
QueueType? QueueType { get; }
/// <summary>
/// Called after a connection is established to set up the binding.
/// </summary>
/// <param name="target"></param>
Task Apply(IBindingTarget target);
ValueTask Apply(IBindingTarget target);
/// <summary>
@ -55,7 +56,7 @@ namespace Tapeti.Config
/// Invokes the handler for the message as specified by the context.
/// </summary>
/// <param name="context"></param>
Task Invoke(IMessageContext context);
ValueTask Invoke(IMessageContext context);
/// <summary>
@ -64,7 +65,7 @@ namespace Tapeti.Config
/// <param name="context"></param>
/// <param name="consumeResult"></param>
/// <returns></returns>
Task Cleanup(IMessageContext context, ConsumeResult consumeResult);
ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult);
}
@ -80,7 +81,8 @@ namespace Tapeti.Config
/// </summary>
/// <param name="messageClass">The message class to be bound to the queue</param>
/// <param name="queueName">The name of the durable queue</param>
Task BindDurable(Type messageClass, string queueName);
/// <param name="arguments">Optional arguments</param>
ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments);
/// <summary>
/// Binds the messageClass to a dynamic auto-delete queue.
@ -91,15 +93,17 @@ namespace Tapeti.Config
/// </remarks>
/// <param name="messageClass">The message class to be bound to the queue</param>
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <param name="arguments">Optional arguments</param>
/// <returns>The generated name of the dynamic queue</returns>
Task<string> BindDynamic(Type messageClass, string queuePrefix = null);
ValueTask<string> BindDynamic(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments);
/// <summary>
/// Declares a durable queue but does not add a binding for a messageClass' routing key.
/// Used for direct-to-queue messages.
/// </summary>
/// <param name="queueName">The name of the durable queue</param>
Task BindDurableDirect(string queueName);
/// <param name="arguments">Optional arguments</param>
ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments);
/// <summary>
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
@ -107,22 +111,24 @@ namespace Tapeti.Config
/// </summary>
/// <param name="messageClass">The message class which will be handled on the queue. It is not actually bound to the queue.</param>
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <param name="arguments">Optional arguments</param>
/// <returns>The generated name of the dynamic queue</returns>
Task<string> BindDynamicDirect(Type messageClass = null, string queuePrefix = null);
ValueTask<string> BindDynamicDirect(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments);
/// <summary>
/// Declares a dynamic queue but does not add a binding for a messageClass' routing key.
/// Used for direct-to-queue messages. Guarantees a unique queue.
/// </summary>
/// <param name="queuePrefix">An optional prefix for the dynamic queue's name. If not provided, RabbitMQ's default logic will be used to create an amq.gen queue.</param>
/// <param name="arguments">Optional arguments</param>
/// <returns>The generated name of the dynamic queue</returns>
Task<string> BindDynamicDirect(string queuePrefix = null);
ValueTask<string> BindDynamicDirect(string? queuePrefix, IRabbitMQArguments? arguments);
/// <summary>
/// Marks the specified durable queue as having an obsolete binding. If after all bindings have subscribed, the queue only contains obsolete
/// bindings and is empty, it will be removed.
/// </summary>
/// <param name="queueName">The name of the durable queue</param>
Task BindDurableObsolete(string queueName);
ValueTask BindDurableObsolete(string queueName);
}
}

View File

@ -11,7 +11,7 @@ namespace Tapeti.Config
/// Injects a value for a controller method parameter.
/// </summary>
/// <param name="context"></param>
public delegate object ValueFactory(IMessageContext context);
public delegate object? ValueFactory(IMessageContext context);
/// <summary>
@ -19,7 +19,7 @@ namespace Tapeti.Config
/// </summary>
/// <param name="context"></param>
/// <param name="value"></param>
public delegate Task ResultHandler(IMessageContext context, object value);
public delegate ValueTask ResultHandler(IMessageContext context, object? value);
/// <summary>
@ -48,7 +48,7 @@ namespace Tapeti.Config
/// The message class for this method. Can be null if not yet set by the default MessageBinding or other middleware.
/// If required, call next first to ensure it is available.
/// </summary>
Type MessageClass { get; }
Type? MessageClass { get; }
/// <summary>
/// Determines if SetMessageClass has already been called.

View File

@ -2,18 +2,17 @@
namespace Tapeti.Config
{
/// <inheritdoc />
/// <summary>
/// Called when a Controller method is registered.
/// </summary>
public interface IControllerBindingMiddleware : IControllerMiddlewareBase
{
/// <summary>
/// Called before a Controller method is registered. Can change the way parameters and return values are handled,
/// and can inject message middleware specific to a method.
/// </summary>
/// <param name="context"></param>
/// <param name="next">Must be called to activate the new layer of middleware.</param>
void Handle(IControllerBindingContext context, Action next);
/// <summary>
/// Called before a Controller method is registered. Can change the way parameters and return values are handled,
/// and can inject message middleware specific to a method.
/// </summary>
/// <param name="context"></param>
/// <param name="next">Must be called to activate the new layer of middleware.</param>
void Handle(IControllerBindingContext context, Action next);
}
}

View File

@ -14,6 +14,6 @@ namespace Tapeti.Config
/// <param name="context"></param>
/// <param name="consumeResult"></param>
/// <param name="next">Always call to allow the next in the chain to clean up</param>
Task Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<Task> next);
ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<ValueTask> next);
}
}

View File

@ -3,7 +3,6 @@ using System.Threading.Tasks;
namespace Tapeti.Config
{
/// <inheritdoc />
/// <summary>
/// Denotes middleware that runs before the controller is instantiated.
/// </summary>
@ -15,6 +14,6 @@ namespace Tapeti.Config
/// <param name="context"></param>
/// <param name="next"></param>
/// <returns></returns>
Task Filter(IMessageContext context, Func<Task> next);
ValueTask Filter(IMessageContext context, Func<ValueTask> next);
}
}

Some files were not shown because too many files have changed in this diff Show More