1
0
mirror of synced 2024-11-24 19:53:10 +01:00

Code style cleanup

This commit is contained in:
Mark van Renswoude 2022-02-09 09:19:56 +01:00
parent 302e6a0a42
commit b816e56018
21 changed files with 123 additions and 140 deletions

View File

@ -23,7 +23,7 @@ namespace _01_PublishSubscribe
{ {
public class Program public class Program
{ {
public static void Main(string[] args) public static void Main()
{ {
var dependencyResolver = GetSimpleInjectorDependencyResolver(); var dependencyResolver = GetSimpleInjectorDependencyResolver();
@ -47,7 +47,7 @@ namespace _01_PublishSubscribe
.RegisterAllControllers() .RegisterAllControllers()
.Build(); .Build();
using (var connection = new TapetiConnection(config) await using var connection = new TapetiConnection(config)
{ {
// Params is optional if you want to use the defaults, but we'll set it // Params is optional if you want to use the defaults, but we'll set it
// explicitly for this example // explicitly for this example
@ -63,28 +63,27 @@ namespace _01_PublishSubscribe
{ "example", "01 - Publish Subscribe" } { "example", "01 - Publish Subscribe" }
} }
} }
}) };
{
// IoC containers that separate the builder from the resolver (Autofac) must be built after // IoC containers that separate the builder from the resolver (Autofac) must be built after
// creating a TapetConnection, as it modifies the container by injecting IPublisher. // creating a TapetConnection, as it modifies the container by injecting IPublisher.
(dependencyResolver as AutofacDependencyResolver)?.Build(); (dependencyResolver as AutofacDependencyResolver)?.Build();
// Create the queues and start consuming immediately. // Create the queues and start consuming immediately.
// If you need to do some processing before processing messages, but after the // If you need to do some processing before processing messages, but after the
// queues have initialized, pass false as the startConsuming parameter and store // queues have initialized, pass false as the startConsuming parameter and store
// the returned ISubscriber. Then call Resume on it later. // the returned ISubscriber. Then call Resume on it later.
await connection.Subscribe(); await connection.Subscribe();
// We could get an IPublisher from the container directly, but since you'll usually use // We could get an IPublisher from the container directly, but since you'll usually use
// it as an injected constructor parameter this shows // it as an injected constructor parameter this shows
await dependencyResolver.Resolve<ExamplePublisher>().SendTestMessage(); await dependencyResolver.Resolve<ExamplePublisher>().SendTestMessage();
// Wait for the controller to signal that the message has been received // Wait for the controller to signal that the message has been received
await waitForDone(); await waitForDone();
}
} }

View File

@ -11,7 +11,7 @@ namespace _02_DeclareDurableQueues
{ {
public class Program public class Program
{ {
public static void Main(string[] args) public static void Main()
{ {
var container = new Container(); var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container); var dependencyResolver = new SimpleInjectorDependencyResolver(container);
@ -30,19 +30,18 @@ namespace _02_DeclareDurableQueues
.EnableDeclareDurableQueues() .EnableDeclareDurableQueues()
.Build(); .Build();
using (var connection = new TapetiConnection(config)) await using var connection = new TapetiConnection(config);
// This creates or updates the durable queue
await connection.Subscribe();
await dependencyResolver.Resolve<IPublisher>().Publish(new PublishSubscribeMessage
{ {
// This creates or updates the durable queue Greeting = "Hello durable queue!"
await connection.Subscribe(); });
await dependencyResolver.Resolve<IPublisher>().Publish(new PublishSubscribeMessage // Wait for the controller to signal that the message has been received
{ await waitForDone();
Greeting = "Hello durable queue!"
});
// Wait for the controller to signal that the message has been received
await waitForDone();
}
} }
} }
} }

View File

@ -12,7 +12,7 @@ namespace _03_FlowRequestResponse
{ {
public class Program public class Program
{ {
public static void Main(string[] args) public static void Main()
{ {
var container = new Container(); var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container); var dependencyResolver = new SimpleInjectorDependencyResolver(container);
@ -33,34 +33,33 @@ namespace _03_FlowRequestResponse
.Build(); .Build();
using (var connection = new TapetiConnection(config)) await using var connection = new TapetiConnection(config);
// Must be called before using any flow. When using a persistent repository like the
// SQL server implementation, you can run any required update scripts (for example, using DbUp)
// before calling this Load method.
// Call after creating the TapetiConnection, as it modifies the container to inject IPublisher.
await dependencyResolver.Resolve<IFlowStore>().Load();
await connection.Subscribe();
var flowStarter = dependencyResolver.Resolve<IFlowStarter>();
var startData = new SimpleFlowController.StartData
{ {
// Must be called before using any flow. When using a persistent repository like the RequestStartTime = DateTime.Now,
// SQL server implementation, you can run any required update scripts (for example, using DbUp) Amount = 1
// before calling this Load method. };
// Call after creating the TapetiConnection, as it modifies the container to inject IPublisher.
await dependencyResolver.Resolve<IFlowStore>().Load();
await connection.Subscribe(); await flowStarter.Start<SimpleFlowController, SimpleFlowController.StartData>(c => c.StartFlow, startData);
await flowStarter.Start<ParallelFlowController>(c => c.StartFlow);
var flowStarter = dependencyResolver.Resolve<IFlowStarter>(); // Wait for the controller to signal that the message has been received
await waitForDone();
var startData = new SimpleFlowController.StartData
{
RequestStartTime = DateTime.Now,
Amount = 1
};
await flowStarter.Start<SimpleFlowController, SimpleFlowController.StartData>(c => c.StartFlow, startData);
await flowStarter.Start<ParallelFlowController>(c => c.StartFlow);
// Wait for the controller to signal that the message has been received
await waitForDone();
}
} }
} }
} }

View File

@ -9,25 +9,17 @@ namespace _03_FlowRequestResponse
public class ReceivingMessageController public class ReceivingMessageController
{ {
// No publisher required, responses can simply be returned // No publisher required, responses can simply be returned
#pragma warning disable CA1822 // Mark members as static - not supported yet by Tapeti
public async Task<QuoteResponseMessage> HandleQuoteRequest(QuoteRequestMessage message) public async Task<QuoteResponseMessage> HandleQuoteRequest(QuoteRequestMessage message)
{ {
string quote; var quote = message.Amount switch
switch (message.Amount)
{ {
case 1: 1 =>
// Well, they asked for it... :-) // Well, they asked for it... :-)
quote = "'"; "'",
break; 2 => "\"",
_ => new string('\'', message.Amount)
case 2: };
quote = "\"";
break;
default:
quote = new string('\'', message.Amount);
break;
}
// Just gonna let them wait for a bit, to demonstrate async message handlers // Just gonna let them wait for a bit, to demonstrate async message handlers
await Task.Delay(1000); await Task.Delay(1000);
@ -37,5 +29,6 @@ namespace _03_FlowRequestResponse
Quote = quote Quote = quote
}; };
} }
#pragma warning restore CA1822
} }
} }

View File

@ -13,7 +13,7 @@ namespace _04_Transient
{ {
public class Program public class Program
{ {
public static void Main(string[] args) public static void Main()
{ {
var container = new Container(); var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container); var dependencyResolver = new SimpleInjectorDependencyResolver(container);
@ -34,22 +34,20 @@ namespace _04_Transient
.Build(); .Build();
using (var connection = new TapetiConnection(config)) await using var connection = new TapetiConnection(config);
{ await connection.Subscribe();
await connection.Subscribe();
Console.WriteLine("Sending request..."); Console.WriteLine("Sending request...");
var transientPublisher = dependencyResolver.Resolve<ITransientPublisher>(); var transientPublisher = dependencyResolver.Resolve<ITransientPublisher>();
var response = await transientPublisher.RequestResponse<LoggedInUsersRequestMessage, LoggedInUsersResponseMessage>( var response = await transientPublisher.RequestResponse<LoggedInUsersRequestMessage, LoggedInUsersResponseMessage>(
new LoggedInUsersRequestMessage()); new LoggedInUsersRequestMessage());
Console.WriteLine("Response: " + response.Count); Console.WriteLine("Response: " + response.Count);
// Unlike the other example, there is no need to call waitForDone, once we're here the response has been handled. // Unlike the other example, there is no need to call waitForDone, once we're here the response has been handled.
}
} }
} }
} }

View File

@ -21,7 +21,7 @@ namespace _05_SpeedTest
private const int ConcurrentTasks = 20; private const int ConcurrentTasks = 20;
public static void Main(string[] args) public static void Main()
{ {
var container = new Container(); var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container); var dependencyResolver = new SimpleInjectorDependencyResolver(container);
@ -52,34 +52,32 @@ namespace _05_SpeedTest
.Build(); .Build();
using (var connection = new TapetiConnection(config)) await using var connection = new TapetiConnection(config);
{ var subscriber = await connection.Subscribe(false);
var subscriber = await connection.Subscribe(false);
var publisher = dependencyResolver.Resolve<IPublisher>(); var publisher = dependencyResolver.Resolve<IPublisher>();
Console.WriteLine($"Publishing {MessageCount} messages..."); Console.WriteLine($"Publishing {MessageCount} messages...");
var stopwatch = new Stopwatch(); var stopwatch = new Stopwatch();
stopwatch.Start(); stopwatch.Start();
await PublishMessages(publisher); await PublishMessages(publisher);
stopwatch.Stop(); stopwatch.Stop();
Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec"); Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
Console.WriteLine("Consuming messages..."); Console.WriteLine("Consuming messages...");
await subscriber.Resume(); await subscriber.Resume();
stopwatch.Restart(); stopwatch.Restart();
await waitForDone(); await waitForDone();
stopwatch.Stop(); stopwatch.Stop();
Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec"); Console.WriteLine($"Took {stopwatch.ElapsedMilliseconds} ms, {MessageCount / (stopwatch.ElapsedMilliseconds / 1000F):F0} messages/sec");
}
} }

View File

@ -12,7 +12,7 @@ namespace _06_StatelessRequestResponse
{ {
public class Program public class Program
{ {
public static void Main(string[] args) public static void Main()
{ {
var container = new Container(); var container = new Container();
var dependencyResolver = new SimpleInjectorDependencyResolver(container); var dependencyResolver = new SimpleInjectorDependencyResolver(container);
@ -32,20 +32,18 @@ namespace _06_StatelessRequestResponse
.Build(); .Build();
using (var connection = new TapetiConnection(config)) await using var connection = new TapetiConnection(config);
{ await connection.Subscribe();
await connection.Subscribe();
var publisher = dependencyResolver.Resolve<IPublisher>(); var publisher = dependencyResolver.Resolve<IPublisher>();
await publisher.PublishRequest<ExampleMessageController, QuoteRequestMessage, QuoteResponseMessage>( await publisher.PublishRequest<ExampleMessageController, QuoteRequestMessage, QuoteResponseMessage>(
new QuoteRequestMessage new QuoteRequestMessage
{ {
Amount = 1 Amount = 1
}, },
c => c.HandleQuoteResponse); c => c.HandleQuoteResponse);
await waitForDone(); await waitForDone();
}
} }
} }
} }

View File

@ -8,31 +8,23 @@ namespace _06_StatelessRequestResponse
public class ReceivingMessageController public class ReceivingMessageController
{ {
// No publisher required, responses can simply be returned // No publisher required, responses can simply be returned
#pragma warning disable CA1822 // Mark members as static - not supported yet by Tapeti
public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message) public QuoteResponseMessage HandleQuoteRequest(QuoteRequestMessage message)
{ {
string quote; var quote = message.Amount switch
switch (message.Amount)
{ {
case 1: 1 =>
// Well, they asked for it... :-) // Well, they asked for it... :-)
quote = "'"; "'",
break; 2 => "\"",
_ => null
case 2: };
quote = "\"";
break;
default:
// We have to return a response.
quote = null;
break;
}
return new QuoteResponseMessage return new QuoteResponseMessage
{ {
Quote = quote Quote = quote
}; };
} }
#pragma warning restore CA1822
} }
} }

View File

@ -16,9 +16,11 @@ namespace _07_ParallelizationTest
} }
#pragma warning disable IDE0060 // Remove unused parameter
public async Task HandleSpeedTestMessage(SpeedTestMessage message) public async Task HandleSpeedTestMessage(SpeedTestMessage message)
{ {
await messageParallelization.WaitForBatch(); await messageParallelization.WaitForBatch();
} }
#pragma warning restore IDE0060
} }
} }

View File

@ -105,8 +105,8 @@ namespace _07_ParallelizationTest
private readonly Func<bool> done; private readonly Func<bool> done;
private readonly Action<int> timeout; private readonly Action<int> timeout;
private int count; private int count;
private readonly object waitLock = new object(); private readonly object waitLock = new();
private TaskCompletionSource<bool> batchReachedTask = new TaskCompletionSource<bool>(); private TaskCompletionSource<bool> batchReachedTask = new();
private Timer messageExpectedTimer; private Timer messageExpectedTimer;
private readonly TimeSpan messageExpectedTimeout = TimeSpan.FromMilliseconds(5000); private readonly TimeSpan messageExpectedTimeout = TimeSpan.FromMilliseconds(5000);
@ -124,7 +124,7 @@ namespace _07_ParallelizationTest
lock (waitLock) lock (waitLock)
{ {
if (messageExpectedTimer == null) if (messageExpectedTimer == null)
messageExpectedTimer = new Timer(state => messageExpectedTimer = new Timer(_ =>
{ {
timeout(count); timeout(count);
}, null, messageExpectedTimeout, Timeout.InfiniteTimeSpan); }, null, messageExpectedTimeout, Timeout.InfiniteTimeSpan);

View File

@ -3,7 +3,6 @@ using System.Threading.Tasks;
using ExampleLib; using ExampleLib;
using Messaging.TapetiExample; using Messaging.TapetiExample;
using Serilog; using Serilog;
using Serilog.Events;
using SimpleInjector; using SimpleInjector;
using Tapeti; using Tapeti;
using Tapeti.Serilog; using Tapeti.Serilog;

View File

@ -1,5 +1,4 @@
using System; using System;
using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using Tapeti.Config; using Tapeti.Config;
using Tapeti.Flow.FlowHelpers; using Tapeti.Flow.FlowHelpers;

View File

@ -325,7 +325,7 @@ namespace Tapeti.Flow.Default
} }
public IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler) private IFlowParallelRequestBuilder InternalAddRequest(object message, Delegate responseHandler)
{ {
requests.Add(new RequestInfo requests.Add(new RequestInfo
{ {

View File

@ -17,6 +17,11 @@
<NoWarn>1701;1702</NoWarn> <NoWarn>1701;1702</NoWarn>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)'!='netstandard2.0'">
<!-- Supress 'Use switch expression' which requires language version 8 not available in .NET Standard 2.0 -->
<NoWarn>IDE0066</NoWarn>
</PropertyGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Tapeti\Tapeti.csproj" /> <ProjectReference Include="..\Tapeti\Tapeti.csproj" />
</ItemGroup> </ItemGroup>

View File

@ -7,7 +7,7 @@
public class ControllerMessageContextPayload : IMessageContextPayload public class ControllerMessageContextPayload : IMessageContextPayload
{ {
/// <summary> /// <summary>
/// An instance of the controller referenced by the binding. Note: can be null during Cleanup. /// An instance of the controller referenced by the binding. Note: can be null during Cleanup or when bound to static methods.
/// </summary> /// </summary>
public object Controller { get; } public object Controller { get; }

View File

@ -663,7 +663,7 @@ namespace Tapeti.Connection
} }
catch (WebException e) catch (WebException e)
{ {
if (!(e.Response is HttpWebResponse response)) if (e.Response is not HttpWebResponse response)
throw; throw;
if (!TransientStatusCodes.Contains(response.StatusCode)) if (!TransientStatusCodes.Contains(response.StatusCode))
@ -714,7 +714,7 @@ namespace Tapeti.Connection
? publishChannelModel ? publishChannelModel
: consumeChannelModel; : consumeChannelModel;
if (channel != null && channel.IsOpen) if (channel is { IsOpen: true })
return channel; return channel;
} }

View File

@ -49,7 +49,7 @@ namespace Tapeti.Default
/// <inheritdoc /> /// <inheritdoc />
public object Deserialize(byte[] body, IMessageProperties properties) public object Deserialize(byte[] body, IMessageProperties properties)
{ {
if (properties.ContentType == null || !properties.ContentType.Equals(ContentType)) if (properties.ContentType is not ContentType)
throw new ArgumentException($"content_type must be {ContentType}"); throw new ArgumentException($"content_type must be {ContentType}");
var typeName = properties.GetHeader(ClassTypeHeader); var typeName = properties.GetHeader(ClassTypeHeader);

View File

@ -1,5 +1,7 @@
using System.Text; using System.Text;
// ReSharper disable UnusedMember.Global - public API
namespace Tapeti.Helpers namespace Tapeti.Helpers
{ {
/// <summary> /// <summary>

View File

@ -189,7 +189,7 @@ namespace Tapeti
/// </summary> /// </summary>
protected void RegisterDefaults() protected void RegisterDefaults()
{ {
if (!(DependencyResolver is IDependencyContainer container)) if (DependencyResolver is not IDependencyContainer container)
return; return;
if (ConsoleHelper.IsAvailable()) if (ConsoleHelper.IsAvailable())

View File

@ -79,7 +79,7 @@ namespace Tapeti
} }
var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo; var methodQueueInfo = GetQueueInfo(method) ?? controllerQueueInfo;
if (methodQueueInfo == null || !methodQueueInfo.IsValid) if (methodQueueInfo is not { IsValid: true })
throw new TopologyConfigurationException( throw new TopologyConfigurationException(
$"Method {method.Name} or controller {controller.Name} requires a queue attribute"); $"Method {method.Name} or controller {controller.Name} requires a queue attribute");

View File

@ -164,7 +164,7 @@ namespace Tapeti
var reconnectedEvent = Reconnected; var reconnectedEvent = Reconnected;
if (reconnectedEvent != null) if (reconnectedEvent != null)
Task.Run(() => reconnectedEvent?.Invoke(this, e)); Task.Run(() => reconnectedEvent.Invoke(this, e));
} }
/// <summary> /// <summary>