diff --git a/Tapeti.Annotations/DynamicQueueAttribute.cs b/Tapeti.Annotations/DynamicQueueAttribute.cs index 3d730c9..5fe9525 100644 --- a/Tapeti.Annotations/DynamicQueueAttribute.cs +++ b/Tapeti.Annotations/DynamicQueueAttribute.cs @@ -9,5 +9,18 @@ namespace Tapeti.Annotations [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] public class DynamicQueueAttribute : Attribute { + public string Prefix { get; set; } + + + /// + /// If prefix is specified, Tapeti will compose the queue name using the + /// prefix and a unique ID. If not specified, an empty queue name will be passed + /// to RabbitMQ thus letting it create a unique queue name. + /// + /// + public DynamicQueueAttribute(string prefix = null) + { + Prefix = prefix; + } } } diff --git a/Tapeti.Annotations/ExchangeAttribute.cs b/Tapeti.Annotations/ExchangeAttribute.cs deleted file mode 100644 index 13be56f..0000000 --- a/Tapeti.Annotations/ExchangeAttribute.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; - -namespace Tapeti.Annotations -{ - [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] - public class ExchangeAttribute : Attribute - { - public string Name { get; set; } - - public ExchangeAttribute(string name) - { - Name = name; - } - } -} diff --git a/Tapeti.Annotations/RequestAttribute.cs b/Tapeti.Annotations/RequestAttribute.cs index 4d6a9a5..d9fc44e 100644 --- a/Tapeti.Annotations/RequestAttribute.cs +++ b/Tapeti.Annotations/RequestAttribute.cs @@ -1,6 +1,6 @@ using System; -namespace Tapeti.Flow.Annotations +namespace Tapeti.Annotations { [AttributeUsage(AttributeTargets.Class)] public class RequestAttribute : Attribute diff --git a/Tapeti.Flow.SQL/ConfigExtensions.cs b/Tapeti.Flow.SQL/ConfigExtensions.cs index c5e660d..9dd039b 100644 --- a/Tapeti.Flow.SQL/ConfigExtensions.cs +++ b/Tapeti.Flow.SQL/ConfigExtensions.cs @@ -1,6 +1,8 @@ using System.Collections.Generic; using Tapeti.Config; +// ReSharper disable UnusedMember.Global + namespace Tapeti.Flow.SQL { public static class ConfigExtensions diff --git a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs index e66c568..4a8640c 100644 --- a/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs +++ b/Tapeti.Flow.SQL/SqlConnectionFlowRepository.cs @@ -69,7 +69,7 @@ namespace Tapeti.Flow.SQL public Task CreateState(Guid flowID, T state, DateTime timestamp) { - var stateJason = JsonConvert.SerializeObject(state); + //var stateJson = JsonConvert.SerializeObject(state); throw new NotImplementedException(); } diff --git a/Tapeti.Flow/Default/FlowBindingMiddleware.cs b/Tapeti.Flow/Default/FlowBindingMiddleware.cs index 9053135..fd796c4 100644 --- a/Tapeti.Flow/Default/FlowBindingMiddleware.cs +++ b/Tapeti.Flow/Default/FlowBindingMiddleware.cs @@ -1,6 +1,7 @@ using System; using System.Reflection; using System.Threading.Tasks; +using Tapeti.Annotations; using Tapeti.Config; using Tapeti.Flow.Annotations; using Tapeti.Helpers; @@ -59,8 +60,7 @@ namespace Tapeti.Flow.Default private static void RegisterYieldPointResult(IBindingContext context) { - bool isTaskOf; - if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out isTaskOf)) + if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(typeof(IYieldPoint), out var isTaskOf)) return; if (isTaskOf) @@ -87,7 +87,10 @@ namespace Tapeti.Flow.Default private static Task HandleParallelResponse(IMessageContext context) { var flowHandler = context.DependencyResolver.Resolve(); - return flowHandler.Execute(context, new DelegateYieldPoint((a) => Task.CompletedTask)); + return flowHandler.Execute(context, new DelegateYieldPoint(async flowContext => + { + await flowContext.Store(); + })); } @@ -97,8 +100,7 @@ namespace Tapeti.Flow.Default if (request?.Response == null) return; - bool isTaskOf; - if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out isTaskOf)) + if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t == request.Response || t == typeof(IYieldPoint), out _)) throw new ResponseExpectedException($"Response of class {request.Response.FullName} expected in controller {context.Method.DeclaringType?.FullName}, method {context.Method.Name}"); } } diff --git a/Tapeti.Flow/Default/FlowCleanupMiddleware.cs b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs index 16cf61b..12673ad 100644 --- a/Tapeti.Flow/Default/FlowCleanupMiddleware.cs +++ b/Tapeti.Flow/Default/FlowCleanupMiddleware.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +using System.Threading.Tasks; using Tapeti.Config; namespace Tapeti.Flow.Default @@ -11,8 +7,7 @@ namespace Tapeti.Flow.Default { public async Task Handle(IMessageContext context, HandlingResult handlingResult) { - object flowContextObj; - if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextObj)) + if (!context.Items.TryGetValue(ContextItems.FlowContext, out var flowContextObj)) return; var flowContext = (FlowContext)flowContextObj; diff --git a/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs b/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs index 8c66e9d..8df46e8 100644 --- a/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs +++ b/Tapeti.Flow/Default/FlowMessageFilterMiddleware.cs @@ -28,8 +28,7 @@ namespace Tapeti.Flow.Default if (context.Properties.CorrelationId == null) return null; - Guid continuationID; - if (!Guid.TryParse(context.Properties.CorrelationId, out continuationID)) + if (!Guid.TryParse(context.Properties.CorrelationId, out var continuationID)) return null; var flowStore = context.DependencyResolver.Resolve(); @@ -44,8 +43,6 @@ namespace Tapeti.Flow.Default if (flowState == null) return null; - ContinuationMetadata continuation; - var flowContext = new FlowContext { MessageContext = context, @@ -54,7 +51,7 @@ namespace Tapeti.Flow.Default FlowState = flowState, ContinuationID = continuationID, - ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out continuation) ? continuation : null + ContinuationMetadata = flowState.Continuations.TryGetValue(continuationID, out var continuation) ? continuation : null }; // IDisposable items in the IMessageContext are automatically disposed diff --git a/Tapeti.Flow/Default/FlowProvider.cs b/Tapeti.Flow/Default/FlowProvider.cs index 308cb3a..7115ae9 100644 --- a/Tapeti.Flow/Default/FlowProvider.cs +++ b/Tapeti.Flow/Default/FlowProvider.cs @@ -1,9 +1,11 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Reflection; using System.Threading.Tasks; using RabbitMQ.Client.Framing; +using Tapeti.Annotations; using Tapeti.Config; using Tapeti.Flow.Annotations; using Tapeti.Flow.FlowHelpers; @@ -54,6 +56,8 @@ namespace Tapeti.Flow.Default private async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, string convergeMethodName = null, bool convergeMethodTaskSync = false) { + Debug.Assert(context.FlowState != null, "context.FlowState != null"); + if (context.FlowState == null) { await CreateNewFlowState(context); @@ -114,7 +118,7 @@ namespace Tapeti.Flow.Default { await context.Delete(); - if (context.FlowState != null && context.FlowState.Metadata.Reply != null) + if (context.FlowState?.Metadata.Reply != null) throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); } @@ -179,15 +183,12 @@ namespace Tapeti.Flow.Default public async Task Execute(IMessageContext context, IYieldPoint yieldPoint) { - var executableYieldPoint = yieldPoint as DelegateYieldPoint; - - if (executableYieldPoint == null) + if (!(yieldPoint is DelegateYieldPoint executableYieldPoint)) throw new YieldPointException($"Yield point is required in controller {context.Controller.GetType().Name} for method {context.Binding.Method.Name}"); FlowContext flowContext; - object flowContextItem; - if (!context.Items.TryGetValue(ContextItems.FlowContext, out flowContextItem)) + if (!context.Items.TryGetValue(ContextItems.FlowContext, out var flowContextItem)) { flowContext = new FlowContext { @@ -202,8 +203,7 @@ namespace Tapeti.Flow.Default try { - if (executableYieldPoint != null) - await executableYieldPoint.Execute(flowContext); + await executableYieldPoint.Execute(flowContext); } catch (YieldPointException e) { diff --git a/Tapeti.Flow/Default/FlowState.cs b/Tapeti.Flow/Default/FlowState.cs index 0ee3eec..d600a8f 100644 --- a/Tapeti.Flow/Default/FlowState.cs +++ b/Tapeti.Flow/Default/FlowState.cs @@ -12,16 +12,16 @@ namespace Tapeti.Flow.Default public FlowMetadata Metadata { - get { return metadata ?? (metadata = new FlowMetadata()); } - set { metadata = value; } + get => metadata ?? (metadata = new FlowMetadata()); + set => metadata = value; } public string Data { get; set; } public Dictionary Continuations { - get { return continuations ?? (continuations = new Dictionary()); } - set { continuations = value; } + get => continuations ?? (continuations = new Dictionary()); + set => continuations = value; } diff --git a/Tapeti.Flow/Default/FlowStore.cs b/Tapeti.Flow/Default/FlowStore.cs index d2a6d87..2597007 100644 --- a/Tapeti.Flow/Default/FlowStore.cs +++ b/Tapeti.Flow/Default/FlowStore.cs @@ -1,9 +1,7 @@ -using Newtonsoft.Json; -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Threading; using System.Threading.Tasks; using Tapeti.Flow.FlowHelpers; @@ -11,13 +9,13 @@ namespace Tapeti.Flow.Default { public class FlowStore : IFlowStore { - private readonly ConcurrentDictionary FlowStates = new ConcurrentDictionary(); - private readonly ConcurrentDictionary ContinuationLookup = new ConcurrentDictionary(); - private readonly LockCollection Locks = new LockCollection(EqualityComparer.Default); + private readonly ConcurrentDictionary flowStates = new ConcurrentDictionary(); + private readonly ConcurrentDictionary continuationLookup = new ConcurrentDictionary(); + private readonly LockCollection locks = new LockCollection(EqualityComparer.Default); private readonly IFlowRepository repository; - private volatile bool InUse = false; + private volatile bool inUse; public FlowStore(IFlowRepository repository) { @@ -27,36 +25,35 @@ namespace Tapeti.Flow.Default public async Task Load() { - if (InUse) + if (inUse) throw new InvalidOperationException("Can only load the saved state once."); - InUse = true; + inUse = true; - FlowStates.Clear(); - ContinuationLookup.Clear(); + flowStates.Clear(); + continuationLookup.Clear(); foreach (var flowStateRecord in await repository.GetStates()) { - FlowStates.TryAdd(flowStateRecord.Key, flowStateRecord.Value); + flowStates.TryAdd(flowStateRecord.Key, flowStateRecord.Value); foreach (var continuation in flowStateRecord.Value.Continuations) - ContinuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key); + continuationLookup.GetOrAdd(continuation.Key, flowStateRecord.Key); } } public Task FindFlowID(Guid continuationID) { - Guid result; - return Task.FromResult(ContinuationLookup.TryGetValue(continuationID, out result) ? result : (Guid?)null); + return Task.FromResult(continuationLookup.TryGetValue(continuationID, out var result) ? result : (Guid?)null); } public async Task LockFlowState(Guid flowID) { - InUse = true; + inUse = true; - var flowStatelock = new FlowStateLock(this, flowID, await Locks.GetLock(flowID)); + var flowStatelock = new FlowStateLock(this, flowID, await locks.GetLock(flowID)); return flowStatelock; } @@ -74,7 +71,7 @@ namespace Tapeti.Flow.Default this.flowID = flowID; this.flowLock = flowLock; - owner.FlowStates.TryGetValue(flowID, out flowState); + owner.flowStates.TryGetValue(flowID, out flowState); } public void Dispose() @@ -106,20 +103,17 @@ namespace Tapeti.Flow.Default if (flowState != null) { foreach (var removedContinuation in flowState.Continuations.Keys.Where(k => !newFlowState.Continuations.ContainsKey(k))) - { - Guid removedValue; - owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue); - } + owner.continuationLookup.TryRemove(removedContinuation, out _); } foreach (var addedContinuation in newFlowState.Continuations.Where(c => flowState == null || !flowState.Continuations.ContainsKey(c.Key))) { - owner.ContinuationLookup.TryAdd(addedContinuation.Key, flowID); + owner.continuationLookup.TryAdd(addedContinuation.Key, flowID); } var isNew = flowState == null; flowState = newFlowState; - owner.FlowStates[flowID] = newFlowState; + owner.flowStates[flowID] = newFlowState; // Storing the flowstate in the underlying repository if (isNew) @@ -141,13 +135,9 @@ namespace Tapeti.Flow.Default if (flowState != null) { foreach (var removedContinuation in flowState.Continuations.Keys) - { - Guid removedValue; - owner.ContinuationLookup.TryRemove(removedContinuation, out removedValue); - } + owner.continuationLookup.TryRemove(removedContinuation, out _); - FlowState removedFlow; - owner.FlowStates.TryRemove(flowID, out removedFlow); + owner.flowStates.TryRemove(flowID, out _); if (flowState != null) { diff --git a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs index 00ec009..b26a8c9 100644 --- a/Tapeti.Flow/Default/NonPersistentFlowRepository.cs +++ b/Tapeti.Flow/Default/NonPersistentFlowRepository.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading.Tasks; namespace Tapeti.Flow.Default diff --git a/Tapeti.Flow/FlowHelpers/LockCollection.cs b/Tapeti.Flow/FlowHelpers/LockCollection.cs index f8a3533..b3d2bcd 100644 --- a/Tapeti.Flow/FlowHelpers/LockCollection.cs +++ b/Tapeti.Flow/FlowHelpers/LockCollection.cs @@ -1,7 +1,5 @@ using System; using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading.Tasks; namespace Tapeti.Flow.FlowHelpers @@ -17,14 +15,14 @@ namespace Tapeti.Flow.FlowHelpers public Task GetLock(T key) { + // ReSharper disable once InconsistentlySynchronizedField - by design LockItem nextLi = new LockItem(locks, key); try { bool continueImmediately = false; lock (locks) { - LockItem li; - if (!locks.TryGetValue(key, out li)) + if (!locks.TryGetValue(key, out var li)) { locks.Add(key, nextLi); continueImmediately = true; @@ -80,8 +78,7 @@ namespace Tapeti.Flow.FlowHelpers { lock (locks) { - LockItem li; - if (!locks.TryGetValue(key, out li)) + if (!locks.TryGetValue(key, out var li)) return; if (li != this) diff --git a/Tapeti.Flow/FlowMiddleware.cs b/Tapeti.Flow/FlowMiddleware.cs index 751374c..ddc1d61 100644 --- a/Tapeti.Flow/FlowMiddleware.cs +++ b/Tapeti.Flow/FlowMiddleware.cs @@ -6,7 +6,7 @@ namespace Tapeti.Flow { public class FlowMiddleware : ITapetiExtension { - private IFlowRepository flowRepository; + private readonly IFlowRepository flowRepository; public FlowMiddleware(IFlowRepository flowRepository) { @@ -18,7 +18,7 @@ namespace Tapeti.Flow container.RegisterDefault(); container.RegisterDefault(); container.RegisterDefault(); - container.RegisterDefaultSingleton(() => flowRepository ?? new NonPersistentFlowRepository()); + container.RegisterDefaultSingleton(() => flowRepository ?? new NonPersistentFlowRepository()); container.RegisterDefaultSingleton(); } diff --git a/Tapeti.Flow/IFlowProvider.cs b/Tapeti.Flow/IFlowProvider.cs index edb3968..c8f6982 100644 --- a/Tapeti.Flow/IFlowProvider.cs +++ b/Tapeti.Flow/IFlowProvider.cs @@ -3,6 +3,8 @@ using System.Linq.Expressions; using System.Threading.Tasks; using Tapeti.Config; +// ReSharper disable UnusedMember.Global + namespace Tapeti.Flow { public interface IFlowProvider diff --git a/Tapeti.Flow/IFlowRepository.cs b/Tapeti.Flow/IFlowRepository.cs index 0f5dbb4..f1a29c1 100644 --- a/Tapeti.Flow/IFlowRepository.cs +++ b/Tapeti.Flow/IFlowRepository.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading.Tasks; namespace Tapeti.Flow @@ -12,13 +11,4 @@ namespace Tapeti.Flow Task UpdateState(Guid flowID, T state); Task DeleteState(Guid flowID); } - - - public class FlowStateRecord - { - public Guid FlowID; - public string Metadata; - public string Data; - public Dictionary ContinuationMetadata; - } } diff --git a/Tapeti.Flow/IFlowStore.cs b/Tapeti.Flow/IFlowStore.cs index 7af9513..17da5e8 100644 --- a/Tapeti.Flow/IFlowStore.cs +++ b/Tapeti.Flow/IFlowStore.cs @@ -1,8 +1,9 @@ using System; -using System.Collections.Generic; using System.Threading.Tasks; using Tapeti.Flow.Default; +// ReSharper disable UnusedMember.Global + namespace Tapeti.Flow { public interface IFlowStore diff --git a/Tapeti.Flow/YieldPointException.cs b/Tapeti.Flow/YieldPointException.cs index 5b789e2..3d07a32 100644 --- a/Tapeti.Flow/YieldPointException.cs +++ b/Tapeti.Flow/YieldPointException.cs @@ -5,6 +5,5 @@ namespace Tapeti.Flow public class YieldPointException : Exception { public YieldPointException(string message) : base(message) { } - public YieldPointException(string message, Exception innerException) : base(message, innerException) { } } } diff --git a/Tapeti.Serilog/Tapeti.Serilog.csproj b/Tapeti.Serilog/Tapeti.Serilog.csproj new file mode 100644 index 0000000..ec64a23 --- /dev/null +++ b/Tapeti.Serilog/Tapeti.Serilog.csproj @@ -0,0 +1,15 @@ + + + + netstandard2.0 + + + + + + + + + + + diff --git a/Tapeti.Serilog/Tapeti.Serilog.nuspec b/Tapeti.Serilog/Tapeti.Serilog.nuspec new file mode 100644 index 0000000..6e7bdcd --- /dev/null +++ b/Tapeti.Serilog/Tapeti.Serilog.nuspec @@ -0,0 +1,24 @@ + + + + X2Software.Tapeti.Serilog + $version$ + Tapeti Serilog + Hans Mulder + Hans Mulder + https://raw.githubusercontent.com/MvRens/Tapeti/master/UNLICENSE + https://github.com/MvRens/Tapeti + https://raw.githubusercontent.com/MvRens/Tapeti/master/resources/icons/Tapeti.SimpleInjector.png + false + Serilog integration package for Tapeti + + rabbitmq tapeti simpleinjector serilog + + + + + + + + + \ No newline at end of file diff --git a/Tapeti.Serilog/TapetiSeriLogger.cs b/Tapeti.Serilog/TapetiSeriLogger.cs new file mode 100644 index 0000000..5811d08 --- /dev/null +++ b/Tapeti.Serilog/TapetiSeriLogger.cs @@ -0,0 +1,41 @@ +using System; +using ISeriLogger = Serilog.ILogger; + +// ReSharper disable UnusedMember.Global + +namespace Tapeti.Serilog +{ + public class TapetiSeriLogger: ILogger + { + private readonly ISeriLogger seriLogger; + + public TapetiSeriLogger(ISeriLogger seriLogger) + { + this.seriLogger = seriLogger; + } + + public void Connect(TapetiConnectionParams connectionParams) + { + // method not yet used in Tapeti + seriLogger.Information("Trying to connected to " + connectionParams.HostName); + } + + public void ConnectFailed(TapetiConnectionParams connectionParams) + { + // method not yet used in Tapeti + seriLogger.Error("Could not connect to " + connectionParams.HostName); + + } + + public void ConnectSuccess(TapetiConnectionParams connectionParams) + { + // method not yet used in Tapeti + seriLogger.Information("Succesfull connected to " + connectionParams.HostName); + } + + public void HandlerException(Exception e) + { + seriLogger.Error(e, "Exception handled by Tapeti"); + } + } +} diff --git a/Tapeti.Tests/ConnectionStringParser.cs b/Tapeti.Tests/ConnectionStringParser.cs index 4c20c4c..d32240b 100644 --- a/Tapeti.Tests/ConnectionStringParser.cs +++ b/Tapeti.Tests/ConnectionStringParser.cs @@ -1,8 +1,7 @@ -using Tapeti; -using Tapeti.Helpers; +using Tapeti.Helpers; using Xunit; -namespace Tapet.Tests +namespace Tapeti.Tests { // ReSharper disable InconsistentNaming public class ConnectionStringParserTest @@ -119,9 +118,7 @@ namespace Tapet.Tests [Fact] public void OnlySemicolons() { - AssertConnectionString(";;;", new TapetiConnectionParams - { - }); + AssertConnectionString(";;;", new TapetiConnectionParams()); } [Fact] @@ -184,7 +181,8 @@ namespace Tapet.Tests }); } - private void AssertConnectionString(string connectionstring, TapetiConnectionParams expected) + // ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local + private static void AssertConnectionString(string connectionstring, TapetiConnectionParams expected) { var parsed = ConnectionStringParser.Parse(connectionstring); diff --git a/Tapeti.Tests/TypeNameRoutingKeyStrategyTests.cs b/Tapeti.Tests/TypeNameRoutingKeyStrategyTests.cs index 1638d47..d42ca26 100644 --- a/Tapeti.Tests/TypeNameRoutingKeyStrategyTests.cs +++ b/Tapeti.Tests/TypeNameRoutingKeyStrategyTests.cs @@ -2,7 +2,7 @@ using Tapeti.Default; using Xunit; -namespace Tapet.Tests +namespace Tapeti.Tests { // ReSharper disable InconsistentNaming public class TypeNameRoutingKeyStrategyTests diff --git a/Tapeti.sln b/Tapeti.sln index ad1c189..73f1486 100644 --- a/Tapeti.sln +++ b/Tapeti.sln @@ -17,7 +17,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.SimpleInjector", "Ta EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Test", "Test\Test.csproj", "{1A4B7136-B7DF-41EA-BEA2-E87B4607D420}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Tests", "Tapeti.Tests\Tapeti.Tests.csproj", "{334F3715-63CF-4D13-B09A-38E2A616D4F5}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tapeti.Tests", "Tapeti.Tests\Tapeti.Tests.csproj", "{334F3715-63CF-4D13-B09A-38E2A616D4F5}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tapeti.Serilog", "Tapeti.Serilog\Tapeti.Serilog.csproj", "{43AA5DF3-49D5-4795-A290-D6511502B564}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -57,6 +59,10 @@ Global {334F3715-63CF-4D13-B09A-38E2A616D4F5}.Debug|Any CPU.Build.0 = Debug|Any CPU {334F3715-63CF-4D13-B09A-38E2A616D4F5}.Release|Any CPU.ActiveCfg = Release|Any CPU {334F3715-63CF-4D13-B09A-38E2A616D4F5}.Release|Any CPU.Build.0 = Release|Any CPU + {43AA5DF3-49D5-4795-A290-D6511502B564}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {43AA5DF3-49D5-4795-A290-D6511502B564}.Debug|Any CPU.Build.0 = Debug|Any CPU + {43AA5DF3-49D5-4795-A290-D6511502B564}.Release|Any CPU.ActiveCfg = Release|Any CPU + {43AA5DF3-49D5-4795-A290-D6511502B564}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Tapeti.sln.DotSettings b/Tapeti.sln.DotSettings new file mode 100644 index 0000000..06fcad6 --- /dev/null +++ b/Tapeti.sln.DotSettings @@ -0,0 +1,4 @@ + + ID + KV + <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /> \ No newline at end of file diff --git a/Tapeti/Config/ICleanupMiddleware.cs b/Tapeti/Config/ICleanupMiddleware.cs index 290236e..132991b 100644 --- a/Tapeti/Config/ICleanupMiddleware.cs +++ b/Tapeti/Config/ICleanupMiddleware.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +using System.Threading.Tasks; namespace Tapeti.Config { diff --git a/Tapeti/Config/IExceptionStrategyContext.cs b/Tapeti/Config/IExceptionStrategyContext.cs index 2a99af9..4aae1fd 100644 --- a/Tapeti/Config/IExceptionStrategyContext.cs +++ b/Tapeti/Config/IExceptionStrategyContext.cs @@ -1,8 +1,6 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; + +// ReSharper disable UnusedMember.Global namespace Tapeti.Config { diff --git a/Tapeti/Config/IMessageContext.cs b/Tapeti/Config/IMessageContext.cs index d0ecbd5..658636c 100644 --- a/Tapeti/Config/IMessageContext.cs +++ b/Tapeti/Config/IMessageContext.cs @@ -2,6 +2,8 @@ using System.Collections.Generic; using RabbitMQ.Client; +// ReSharper disable UnusedMember.Global + namespace Tapeti.Config { public interface IMessageContext : IDisposable diff --git a/Tapeti/Config/IPublishContext.cs b/Tapeti/Config/IPublishContext.cs index 943b9ae..4bb4ff8 100644 --- a/Tapeti/Config/IPublishContext.cs +++ b/Tapeti/Config/IPublishContext.cs @@ -1,5 +1,7 @@ using RabbitMQ.Client; +// ReSharper disable UnusedMember.Global + namespace Tapeti.Config { public interface IPublishContext diff --git a/Tapeti/Connection/IConnectionEventListener.cs b/Tapeti/Connection/IConnectionEventListener.cs index c64ced7..c0e82df 100644 --- a/Tapeti/Connection/IConnectionEventListener.cs +++ b/Tapeti/Connection/IConnectionEventListener.cs @@ -1,10 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Tapeti.Connection +namespace Tapeti.Connection { public interface IConnectionEventListener { diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 03355f5..06f87aa 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -5,7 +5,6 @@ using System.Runtime.ExceptionServices; using RabbitMQ.Client; using Tapeti.Config; using Tapeti.Default; -using Tapeti.Helpers; using System.Threading.Tasks; namespace Tapeti.Connection @@ -42,7 +41,6 @@ namespace Tapeti.Connection { Task.Run(async () => { - ExceptionDispatchInfo exception = null; MessageContext context = null; HandlingResult handlingResult = null; try @@ -67,7 +65,7 @@ namespace Tapeti.Connection } catch (Exception eDispatch) { - exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch)); + var exception = ExceptionDispatchInfo.Capture(UnwrapException(eDispatch)); logger.HandlerException(eDispatch); try { @@ -119,10 +117,7 @@ namespace Tapeti.Connection } try { - if (context != null) - { - context.Dispose(); - } + context?.Dispose(); } catch (Exception eDispose) { @@ -178,25 +173,25 @@ namespace Tapeti.Connection RecursiveCaller firstCaller = null; RecursiveCaller currentCaller = null; - Action addHandler = (Handler handle) => + void AddHandler(Handler handle) { var caller = new RecursiveCaller(handle); if (currentCaller == null) firstCaller = caller; else - currentCaller.next = caller; + currentCaller.Next = caller; currentCaller = caller; - }; + } if (binding.MessageFilterMiddleware != null) { foreach (var m in binding.MessageFilterMiddleware) { - addHandler(m.Handle); + AddHandler(m.Handle); } } - addHandler(async (c, next) => + AddHandler(async (c, next) => { c.Controller = dependencyResolver.Resolve(binding.Controller); await next(); @@ -204,18 +199,18 @@ namespace Tapeti.Connection foreach (var m in messageMiddleware) { - addHandler(m.Handle); + AddHandler(m.Handle); } if (binding.MessageMiddleware != null) { foreach (var m in binding.MessageMiddleware) { - addHandler(m.Handle); + AddHandler(m.Handle); } } - addHandler(async (c, next) => + AddHandler(async (c, next) => { await binding.Invoke(c, message); }); @@ -244,10 +239,11 @@ namespace Tapeti.Connection public class RecursiveCaller { - private Handler handle; + private readonly Handler handle; private MessageContext currentContext; private MessageContext nextContext; - public RecursiveCaller next; + + public RecursiveCaller Next; public RecursiveCaller(Handler handle) { @@ -263,9 +259,9 @@ namespace Tapeti.Connection { currentContext = context; - context.UseNestedContext = next == null ? (Action)null : UseNestedContext; + context.UseNestedContext = Next == null ? (Action)null : UseNestedContext; - await handle(context, callNext); + await handle(context, CallNext); } finally { @@ -273,18 +269,18 @@ namespace Tapeti.Connection } } - private async Task callNext() + private async Task CallNext() { - if (next == null) + if (Next == null) return; if (nextContext != null) { - await next.Call(nextContext); + await Next.Call(nextContext); }else { try { - await next.Call(currentContext); + await Next.Call(currentContext); } finally { diff --git a/Tapeti/Connection/TapetiWorker.cs b/Tapeti/Connection/TapetiWorker.cs index e539bf7..398840c 100644 --- a/Tapeti/Connection/TapetiWorker.cs +++ b/Tapeti/Connection/TapetiWorker.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; @@ -70,7 +69,7 @@ namespace Tapeti.Connection if (queue.Dynamic) { - var dynamicQueue = channel.QueueDeclare(); + var dynamicQueue = channel.QueueDeclare(queue.Name); (queue as IDynamicQueue)?.SetName(dynamicQueue.QueueName); foreach (var binding in queue.Bindings) diff --git a/Tapeti/Default/ConsoleLogger.cs b/Tapeti/Default/ConsoleLogger.cs index 98484fe..1f02904 100644 --- a/Tapeti/Default/ConsoleLogger.cs +++ b/Tapeti/Default/ConsoleLogger.cs @@ -6,17 +6,17 @@ namespace Tapeti.Default { public void Connect(TapetiConnectionParams connectionParams) { - throw new System.NotImplementedException(); + throw new NotImplementedException(); } public void ConnectFailed(TapetiConnectionParams connectionParams) { - throw new System.NotImplementedException(); + throw new NotImplementedException(); } public void ConnectSuccess(TapetiConnectionParams connectionParams) { - throw new System.NotImplementedException(); + throw new NotImplementedException(); } public void HandlerException(Exception e) diff --git a/Tapeti/Default/ExceptionStrategyContext.cs b/Tapeti/Default/ExceptionStrategyContext.cs index fc24ab3..89280ee 100644 --- a/Tapeti/Default/ExceptionStrategyContext.cs +++ b/Tapeti/Default/ExceptionStrategyContext.cs @@ -1,8 +1,4 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; using Tapeti.Config; namespace Tapeti.Default @@ -22,19 +18,8 @@ namespace Tapeti.Default private HandlingResultBuilder handlingResult; public HandlingResultBuilder HandlingResult { - get - { - if (handlingResult == null) - { - handlingResult = new HandlingResultBuilder(); - } - return handlingResult; - } - - set - { - handlingResult = value; - } + get => handlingResult ?? (handlingResult = new HandlingResultBuilder()); + set => handlingResult = value; } } } diff --git a/Tapeti/Default/JsonMessageSerializer.cs b/Tapeti/Default/JsonMessageSerializer.cs index 7c62ce4..2aee24f 100644 --- a/Tapeti/Default/JsonMessageSerializer.cs +++ b/Tapeti/Default/JsonMessageSerializer.cs @@ -45,12 +45,10 @@ namespace Tapeti.Default public object Deserialize(byte[] body, IBasicProperties properties) { - object typeName; - if (properties.ContentType == null || !properties.ContentType.Equals(ContentType)) throw new ArgumentException($"content_type must be {ContentType}"); - if (properties.Headers == null || !properties.Headers.TryGetValue(ClassTypeHeader, out typeName)) + if (properties.Headers == null || !properties.Headers.TryGetValue(ClassTypeHeader, out var typeName)) throw new ArgumentException($"{ClassTypeHeader} header not present"); var messageType = deserializedTypeNames.GetOrAdd(Encoding.UTF8.GetString((byte[])typeName), DeserializeTypeName); diff --git a/Tapeti/Default/MessageContext.cs b/Tapeti/Default/MessageContext.cs index 999b532..77486dc 100644 --- a/Tapeti/Default/MessageContext.cs +++ b/Tapeti/Default/MessageContext.cs @@ -21,7 +21,6 @@ namespace Tapeti.Default public IDictionary Items { get; } - private readonly MessageContext outerContext; internal Action UseNestedContext; internal Action OnContextDisposed; @@ -43,8 +42,6 @@ namespace Tapeti.Default Properties = outerContext.Properties; Items = new DeferingDictionary(outerContext.Items); - - this.outerContext = outerContext; } public void Dispose() @@ -71,8 +68,8 @@ namespace Tapeti.Default private class DeferingDictionary : IDictionary { - private IDictionary myState; - private IDictionary deferee; + private readonly IDictionary myState; + private readonly IDictionary deferee; public DeferingDictionary(IDictionary deferee) { @@ -84,10 +81,7 @@ namespace Tapeti.Default object IDictionary.this[string key] { - get - { - return myState.ContainsKey(key) ? myState[key] : deferee[key]; - } + get => myState.ContainsKey(key) ? myState[key] : deferee[key]; set { @@ -98,37 +92,10 @@ namespace Tapeti.Default } } - int ICollection>.Count - { - get - { - return myState.Count + deferee.Count; - } - } - - bool ICollection>.IsReadOnly - { - get - { - return false; - } - } - - ICollection IDictionary.Keys - { - get - { - return myState.Keys.Concat(deferee.Keys).ToList().AsReadOnly(); - } - } - - ICollection IDictionary.Values - { - get - { - return myState.Values.Concat(deferee.Values).ToList().AsReadOnly(); - } - } + int ICollection>.Count => myState.Count + deferee.Count; + bool ICollection>.IsReadOnly => false; + ICollection IDictionary.Keys => myState.Keys.Concat(deferee.Keys).ToList().AsReadOnly(); + ICollection IDictionary.Values => myState.Values.Concat(deferee.Values).ToList().AsReadOnly(); void ICollection>.Add(KeyValuePair item) { diff --git a/Tapeti/Default/NackExceptionStrategy.cs b/Tapeti/Default/NackExceptionStrategy.cs index 48babe3..3bbb2d5 100644 --- a/Tapeti/Default/NackExceptionStrategy.cs +++ b/Tapeti/Default/NackExceptionStrategy.cs @@ -1,9 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Tapeti.Config; +using Tapeti.Config; namespace Tapeti.Default { diff --git a/Tapeti/Default/PublishResultBinding.cs b/Tapeti/Default/PublishResultBinding.cs index 7e5c29d..38d5ff8 100644 --- a/Tapeti/Default/PublishResultBinding.cs +++ b/Tapeti/Default/PublishResultBinding.cs @@ -1,5 +1,5 @@ using System; -using System.Linq.Expressions; +using System.Diagnostics; using System.Reflection; using System.Threading.Tasks; using RabbitMQ.Client.Framing; @@ -18,15 +18,14 @@ namespace Tapeti.Default return; - bool isTaskOf; - Type actualType; - if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out isTaskOf, out actualType)) + if (!context.Result.Info.ParameterType.IsTypeOrTaskOf(t => t.IsClass, out var isTaskOf, out var actualType)) return; if (isTaskOf) { - var handler = GetType().GetMethod("PublishGenericTaskResult", BindingFlags.NonPublic | BindingFlags.Static).MakeGenericMethod(actualType); + var handler = GetType().GetMethod("PublishGenericTaskResult", BindingFlags.NonPublic | BindingFlags.Static)?.MakeGenericMethod(actualType); + Debug.Assert(handler != null, nameof(handler) + " != null"); context.Result.SetHandler(async (messageContext, value) => { @@ -54,13 +53,5 @@ namespace Tapeti.Default return publisher.Publish(message, properties); } - - - private static async Task PublishGenericTaskResult(IMessageContext messageContext, object value) where T : class - { - var message = await (Task)value; - if (message != null) - await Reply(message, messageContext); - } } } diff --git a/Tapeti/Default/RequeueExceptionStrategy.cs b/Tapeti/Default/RequeueExceptionStrategy.cs index afa3143..6c014f6 100644 --- a/Tapeti/Default/RequeueExceptionStrategy.cs +++ b/Tapeti/Default/RequeueExceptionStrategy.cs @@ -1,5 +1,6 @@ -using System; -using Tapeti.Config; +using Tapeti.Config; + +// ReSharper disable UnusedMember.Global namespace Tapeti.Default { diff --git a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs index d221a82..99bf6b9 100644 --- a/Tapeti/Default/TypeNameRoutingKeyStrategy.cs +++ b/Tapeti/Default/TypeNameRoutingKeyStrategy.cs @@ -2,7 +2,6 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Text; using System.Text.RegularExpressions; namespace Tapeti.Default diff --git a/Tapeti/HandlingResult.cs b/Tapeti/HandlingResult.cs index e1bb575..107c206 100644 --- a/Tapeti/HandlingResult.cs +++ b/Tapeti/HandlingResult.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +// ReSharper disable UnusedMember.Global namespace Tapeti { @@ -34,26 +30,14 @@ namespace Tapeti private HandlingResult data = Default; public ConsumeResponse ConsumeResponse { - get - { - return data.ConsumeResponse; - } - set - { - GetWritableData().ConsumeResponse = value; - } + get => data.ConsumeResponse; + set => GetWritableData().ConsumeResponse = value; } public MessageAction MessageAction { - get - { - return data.MessageAction; - } - set - { - GetWritableData().MessageAction = value; - } + get => data.MessageAction; + set => GetWritableData().MessageAction = value; } public HandlingResult ToHandlingResult() diff --git a/Tapeti/Helpers/ConnectionstringParser.cs b/Tapeti/Helpers/ConnectionstringParser.cs index db1d8c3..bbda0d9 100644 --- a/Tapeti/Helpers/ConnectionstringParser.cs +++ b/Tapeti/Helpers/ConnectionstringParser.cs @@ -1,18 +1,14 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +using System.Text; namespace Tapeti.Helpers { public class ConnectionStringParser { - readonly TapetiConnectionParams result = new TapetiConnectionParams(); + private readonly TapetiConnectionParams result = new TapetiConnectionParams(); - readonly string connectionstring; - int pos = -1; - char current = '\0'; + private readonly string connectionstring; + private int pos = -1; + private char current = '\0'; public static TapetiConnectionParams Parse(string connectionstring) { @@ -33,15 +29,11 @@ namespace Tapeti.Helpers { var key = ParseKey(); - if (current == '=') - { - var value = ParseValue(); - SetValue(key, value); - } - else - { - EnableKey(key); - } + if (current != '=') + return; + + var value = ParseValue(); + SetValue(key, value); } private string ParseKey() @@ -112,11 +104,6 @@ namespace Tapeti.Helpers return false; } - private void EnableKey(string key) - { - - } - private void SetValue(string key, string value) { switch (key.ToLowerInvariant()) { diff --git a/Tapeti/Helpers/MiddlewareHelper.cs b/Tapeti/Helpers/MiddlewareHelper.cs index f70e6bb..a40e2bd 100644 --- a/Tapeti/Helpers/MiddlewareHelper.cs +++ b/Tapeti/Helpers/MiddlewareHelper.cs @@ -15,18 +15,16 @@ namespace Tapeti.Helpers return; } - Action handleNext = null; - - handleNext = () => + void HandleNext() { handlerIndex--; if (handlerIndex >= 0) - handle(middleware[handlerIndex], handleNext); + handle(middleware[handlerIndex], HandleNext); else lastHandler(); - }; + } - handle(middleware[handlerIndex], handleNext); + handle(middleware[handlerIndex], HandleNext); } @@ -39,18 +37,16 @@ namespace Tapeti.Helpers return; } - Func handleNext = null; - - handleNext = async () => + async Task HandleNext() { handlerIndex--; if (handlerIndex >= 0) - await handle(middleware[handlerIndex], handleNext); + await handle(middleware[handlerIndex], HandleNext); else await lastHandler(); - }; + } - await handle(middleware[handlerIndex], handleNext); + await handle(middleware[handlerIndex], HandleNext); } } } diff --git a/Tapeti/Helpers/TaskTypeHelper.cs b/Tapeti/Helpers/TaskTypeHelper.cs index b5e1a77..416a7ba 100644 --- a/Tapeti/Helpers/TaskTypeHelper.cs +++ b/Tapeti/Helpers/TaskTypeHelper.cs @@ -34,8 +34,7 @@ namespace Tapeti.Helpers public static bool IsTypeOrTaskOf(this Type type, Func predicate, out bool isTaskOf) { - Type actualType; - return IsTypeOrTaskOf(type, predicate, out isTaskOf, out actualType); + return IsTypeOrTaskOf(type, predicate, out isTaskOf, out _); } public static bool IsTypeOrTaskOf(this Type type, Type compareTo, out bool isTaskOf) diff --git a/Tapeti/IConnection.cs b/Tapeti/IConnection.cs index b9671be..5993f0f 100644 --- a/Tapeti/IConnection.cs +++ b/Tapeti/IConnection.cs @@ -1,6 +1,8 @@ using System; using System.Threading.Tasks; +// ReSharper disable UnusedMember.Global + namespace Tapeti { public interface IConnection : IDisposable diff --git a/Tapeti/IExceptionStrategy.cs b/Tapeti/IExceptionStrategy.cs index 7525324..979f454 100644 --- a/Tapeti/IExceptionStrategy.cs +++ b/Tapeti/IExceptionStrategy.cs @@ -1,5 +1,4 @@ -using System; -using Tapeti.Config; +using Tapeti.Config; namespace Tapeti { diff --git a/Tapeti/ILogger.cs b/Tapeti/ILogger.cs index 1dc244d..ee0e0e2 100644 --- a/Tapeti/ILogger.cs +++ b/Tapeti/ILogger.cs @@ -1,5 +1,7 @@ using System; +// ReSharper disable UnusedMember.Global + namespace Tapeti { // This interface is deliberately specific and typed to allow for structured logging (e.g. Serilog) diff --git a/Tapeti/MessageController.cs b/Tapeti/MessageController.cs index a046171..8616de9 100644 --- a/Tapeti/MessageController.cs +++ b/Tapeti/MessageController.cs @@ -1,5 +1,7 @@ using Tapeti.Annotations; +// ReSharper disable UnusedMember.Global + namespace Tapeti { /// diff --git a/Tapeti/MessageFutureAction.cs b/Tapeti/MessageFutureAction.cs index 7cbd319..bc48049 100644 --- a/Tapeti/MessageFutureAction.cs +++ b/Tapeti/MessageFutureAction.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +// ReSharper disable UnusedMember.Global namespace Tapeti { diff --git a/Tapeti/TapetiAppSettingsConnectionParams.cs b/Tapeti/TapetiAppSettingsConnectionParams.cs index b9b69d0..e6312e5 100644 --- a/Tapeti/TapetiAppSettingsConnectionParams.cs +++ b/Tapeti/TapetiAppSettingsConnectionParams.cs @@ -18,19 +18,19 @@ namespace Tapeti public TapetiAppSettingsConnectionParams(string prefix = DefaultPrefix) { var keys = ConfigurationManager.AppSettings.AllKeys; - Action> getAppSetting = (key, setValue) => + + void GetAppSetting(string key, Action setValue) { - if (keys.Contains(prefix + key)) - setValue(ConfigurationManager.AppSettings[prefix + key]); - }; + if (keys.Contains(prefix + key)) setValue(ConfigurationManager.AppSettings[prefix + key]); + } - getAppSetting(KeyHostname, value => HostName = value); - getAppSetting(KeyPort, value => Port = int.Parse(value)); - getAppSetting(KeyVirtualHost, value => VirtualHost = value); - getAppSetting(KeyUsername, value => Username = value); - getAppSetting(KeyPassword, value => Password = value); - getAppSetting(KeyPrefetchCount, value => PrefetchCount = ushort.Parse(value)); + GetAppSetting(KeyHostname, value => HostName = value); + GetAppSetting(KeyPort, value => Port = int.Parse(value)); + GetAppSetting(KeyVirtualHost, value => VirtualHost = value); + GetAppSetting(KeyUsername, value => Username = value); + GetAppSetting(KeyPassword, value => Password = value); + GetAppSetting(KeyPrefetchCount, value => PrefetchCount = ushort.Parse(value)); } } } diff --git a/Tapeti/TapetiConfig.cs b/Tapeti/TapetiConfig.cs index 117c4c9..3807190 100644 --- a/Tapeti/TapetiConfig.cs +++ b/Tapeti/TapetiConfig.cs @@ -22,7 +22,7 @@ namespace Tapeti public class TapetiConfig { private readonly Dictionary> staticRegistrations = new Dictionary>(); - private readonly Dictionary> dynamicRegistrations = new Dictionary>(); + private readonly Dictionary>> dynamicRegistrations = new Dictionary>>(); private readonly List bindingMiddleware = new List(); private readonly List messageMiddleware = new List(); @@ -49,20 +49,47 @@ namespace Tapeti var queues = new List(); queues.AddRange(staticRegistrations.Select(qb => new Queue(new QueueInfo { Dynamic = false, Name = qb.Key }, qb.Value))); - // Group all bindings with the same index into queues, this will - // ensure each message type is unique on their queue - var dynamicBindings = new List>(); - foreach (var bindings in dynamicRegistrations.Values) + + // We want to ensure each queue only has unique messages classes. This means we can requeue + // without the side-effect of calling other handlers for the same message class again as well. + // + // Since I had trouble deciphering this code after a year, here's an overview of how it achieves this grouping + // and how the bindingIndex is relevant: + // + // dynamicRegistrations: + // Key (prefix) + // "" + // Key (message class) Value (list of bindings) + // A binding1, binding2, binding3 + // B binding4 + // "prefix" + // A binding5, binding6 + // + // By combining all bindings with the same index, per prefix, the following queues will be registered: + // + // Prefix Bindings + // "" binding1 (message A), binding4 (message B) + // "" binding2 (message A) + // "" binding3 (message A) + // "prefix" binding5 (message A) + // "prefix" binding6 (message A) + // + foreach (var prefixGroup in dynamicRegistrations) { - while (dynamicBindings.Count < bindings.Count) - dynamicBindings.Add(new List()); + var dynamicBindings = new List>(); - for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++) - dynamicBindings[bindingIndex].Add(bindings[bindingIndex]); + foreach (var bindings in prefixGroup.Value.Values) + { + while (dynamicBindings.Count < bindings.Count) + dynamicBindings.Add(new List()); + + for (var bindingIndex = 0; bindingIndex < bindings.Count; bindingIndex++) + dynamicBindings[bindingIndex].Add(bindings[bindingIndex]); + } + + queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true, Name = GetDynamicQueueName(prefixGroup.Key) }, bl))); } - queues.AddRange(dynamicBindings.Select(bl => new Queue(new QueueInfo { Dynamic = true }, bl))); - var config = new Config(dependencyResolver, messageMiddleware, cleanupMiddleware, publishMiddleware, queues); (dependencyResolver as IDependencyContainer)?.RegisterDefaultSingleton(config); @@ -100,8 +127,7 @@ namespace Tapeti public TapetiConfig Use(ITapetiExtension extension) { - var container = dependencyResolver as IDependencyContainer; - if (container != null) + if (dependencyResolver is IDependencyContainer container) extension.RegisterDefaults(container); var middlewareBundle = extension.GetMiddleware(dependencyResolver); @@ -112,14 +138,14 @@ namespace Tapeti foreach (var middleware in middlewareBundle) { // ReSharper disable once CanBeReplacedWithTryCastAndCheckForNull - if (middleware is IBindingMiddleware) - Use((IBindingMiddleware)middleware); - else if (middleware is IMessageMiddleware) - Use((IMessageMiddleware)middleware); - else if (middleware is ICleanupMiddleware) - Use((ICleanupMiddleware)middleware); - else if (middleware is IPublishMiddleware) - Use((IPublishMiddleware)middleware); + if (middleware is IBindingMiddleware bindingExtension) + Use(bindingExtension); + else if (middleware is IMessageMiddleware messageExtension) + Use(messageExtension); + else if (middleware is ICleanupMiddleware cleanupExtension) + Use(cleanupExtension); + else if (middleware is IPublishMiddleware publishExtension) + Use(publishExtension); else throw new ArgumentException($"Unsupported middleware implementation: {(middleware == null ? "null" : middleware.GetType().Name)}"); } @@ -154,7 +180,7 @@ namespace Tapeti (dependencyResolver as IDependencyContainer)?.RegisterController(controller); foreach (var method in controller.GetMembers(BindingFlags.Public | BindingFlags.Instance) - .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && !(m as MethodInfo).IsSpecialName) + .Where(m => m.MemberType == MemberTypes.Method && m.DeclaringType != typeof(object) && (m as MethodInfo)?.IsSpecialName == false) .Select(m => (MethodInfo)m)) { var context = new BindingContext(method); @@ -317,10 +343,21 @@ namespace Tapeti protected void AddDynamicRegistration(IBindingContext context, Binding binding) { - if (dynamicRegistrations.ContainsKey(context.MessageClass)) - dynamicRegistrations[context.MessageClass].Add(binding); - else - dynamicRegistrations.Add(context.MessageClass, new List { binding }); + var prefix = binding.QueueInfo.Name ?? ""; + + if (!dynamicRegistrations.TryGetValue(prefix, out Dictionary> prefixRegistrations)) + { + prefixRegistrations = new Dictionary>(); + dynamicRegistrations.Add(prefix, prefixRegistrations); + } + + if (!prefixRegistrations.TryGetValue(context.MessageClass, out List bindings)) + { + bindings = new List(); + prefixRegistrations.Add(context.MessageClass, bindings); + } + + bindings.Add(binding); } @@ -333,7 +370,7 @@ namespace Tapeti throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on {member.Name}"); if (dynamicQueueAttribute != null) - return new QueueInfo { Dynamic = true }; + return new QueueInfo { Dynamic = true, Name = dynamicQueueAttribute.Prefix }; if (durableQueueAttribute != null) return new QueueInfo { Dynamic = false, Name = durableQueueAttribute.Name }; @@ -342,6 +379,15 @@ namespace Tapeti } + protected string GetDynamicQueueName(string prefix) + { + if (String.IsNullOrEmpty(prefix)) + return ""; + + return prefix + "." + Guid.NewGuid().ToString("N"); + } + + protected class QueueInfo { public bool? Dynamic { get; set; } @@ -376,8 +422,7 @@ namespace Tapeti public IBinding GetBinding(Delegate method) { - IBinding binding; - return bindingMethodLookup.TryGetValue(method.Method, out binding) ? binding : null; + return bindingMethodLookup.TryGetValue(method.Method, out var binding) ? binding : null; } } @@ -418,7 +463,7 @@ namespace Tapeti private QueueInfo queueInfo; public QueueInfo QueueInfo { - get { return queueInfo; } + get => queueInfo; set { QueueName = (value?.Dynamic).GetValueOrDefault() ? value?.Name : null; diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index 98e8cda..0578a2e 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -4,6 +4,8 @@ using System.Threading.Tasks; using Tapeti.Config; using Tapeti.Connection; +// ReSharper disable UnusedMember.Global + namespace Tapeti { public class TapetiConnection : IDisposable diff --git a/Tapeti/TapetiConnectionParams.cs b/Tapeti/TapetiConnectionParams.cs index 31089b6..2c6c525 100644 --- a/Tapeti/TapetiConnectionParams.cs +++ b/Tapeti/TapetiConnectionParams.cs @@ -1,5 +1,7 @@ using System; +// ReSharper disable UnusedMember.Global + namespace Tapeti { public class TapetiConnectionParams diff --git a/Test/FlowEndController.cs b/Test/FlowEndController.cs index 679e542..5c2f450 100644 --- a/Test/FlowEndController.cs +++ b/Test/FlowEndController.cs @@ -3,6 +3,8 @@ using Tapeti.Annotations; using Tapeti.Flow; using Tapeti.Flow.Annotations; +// ReSharper disable UnusedMember.Global + namespace Test { [MessageController] diff --git a/Test/MarcoController.cs b/Test/MarcoController.cs index fe78c4b..da2ddab 100644 --- a/Test/MarcoController.cs +++ b/Test/MarcoController.cs @@ -1,31 +1,32 @@ using System; using System.ComponentModel.DataAnnotations; using System.Threading.Tasks; -using Tapeti; using Tapeti.Annotations; using Tapeti.Flow; using Tapeti.Flow.Annotations; +// ReSharper disable UnusedMember.Global + namespace Test { [MessageController] [DynamicQueue] public class MarcoController { - private readonly IPublisher publisher; + //private readonly IPublisher publisher; private readonly IFlowProvider flowProvider; - private readonly Visualizer visualizer; + //private readonly Visualizer visualizer; // Public properties are automatically stored and retrieved while in a flow public Guid StateTestGuid { get; set; } public int Phase; - public MarcoController(IPublisher publisher, IFlowProvider flowProvider, Visualizer visualizer) + public MarcoController(/*IPublisher publisher, */IFlowProvider flowProvider/*, Visualizer visualizer*/) { - this.publisher = publisher; + //this.publisher = publisher; this.flowProvider = flowProvider; - this.visualizer = visualizer; + //this.visualizer = visualizer; } @@ -68,16 +69,13 @@ namespace Test } - /** - * The Visualizer could've been injected through the constructor, which is - * the recommended way. Just testing the injection middleware here. - */ - public async Task Marco(MarcoMessage message, Visualizer myVisualizer) + [Start] + public IYieldPoint TestParallelRequest() { Console.WriteLine(">> Marco (yielding with request)"); - await myVisualizer.VisualizeMarco(); StateTestGuid = Guid.NewGuid(); + Console.WriteLine($"Starting parallel request with StateTestGuid {StateTestGuid}"); return flowProvider.YieldWithParallelRequest() .AddRequestSync(new PoloConfirmationRequestMessage @@ -113,7 +111,7 @@ namespace Test private IYieldPoint ContinuePoloConfirmation() { Console.WriteLine("> ConvergePoloConfirmation (ending flow)"); - return flowProvider.EndWithResponse(new PoloMessage()); + return flowProvider.End(); } @@ -134,6 +132,7 @@ namespace Test + [DynamicQueue("custom.prefix")] public void Polo(PoloMessage message) { Console.WriteLine(">> Polo"); diff --git a/Test/MarcoEmitter.cs b/Test/MarcoEmitter.cs index 66a4a75..b40ae95 100644 --- a/Test/MarcoEmitter.cs +++ b/Test/MarcoEmitter.cs @@ -1,18 +1,17 @@ -using System.Threading; -using System.Threading.Tasks; -using Tapeti; +using System.Threading.Tasks; namespace Test { public class MarcoEmitter { - private readonly IPublisher publisher; + //private readonly IPublisher publisher; - public MarcoEmitter(IPublisher publisher) + /*public MarcoEmitter(IPublisher publisher) { this.publisher = publisher; } + */ public async Task Run() @@ -45,6 +44,7 @@ namespace Test { await Task.Delay(1000); } + // ReSharper disable once FunctionNeverReturns } } } diff --git a/Test/MyLogger.cs b/Test/MyLogger.cs index 679493a..58d51b1 100644 --- a/Test/MyLogger.cs +++ b/Test/MyLogger.cs @@ -1,10 +1,8 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; using Tapeti; +// ReSharper disable UnusedMember.Global + namespace Test { public class MyLogger : ILogger diff --git a/Test/Program.cs b/Test/Program.cs index aa33708..caad2c9 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -1,11 +1,8 @@ using System; -using System.Threading.Tasks; using SimpleInjector; using Tapeti; using Tapeti.DataAnnotations; using Tapeti.Flow; -using Tapeti.Flow.SQL; -using Tapeti.Helpers; using Tapeti.SimpleInjector; using System.Threading; @@ -61,7 +58,8 @@ namespace Test connection.GetPublisher().Publish(new FlowEndController.PingMessage()); - container.GetInstance().Start(c => c.StartFlow, true); + //container.GetInstance().Start(c => c.StartFlow, true); + container.GetInstance().Start(c => c.TestParallelRequest); Thread.Sleep(1000); diff --git a/Test/Visualizer.cs b/Test/Visualizer.cs index cd9eb99..c99af85 100644 --- a/Test/Visualizer.cs +++ b/Test/Visualizer.cs @@ -1,6 +1,8 @@ using System; using System.Threading.Tasks; +// ReSharper disable UnusedMember.Global + namespace Test { public class Visualizer diff --git a/appveyor.yml b/appveyor.yml index 7866075..9fbce10 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -20,6 +20,8 @@ after_build: - cmd: appveyor PushArtifact "X2Software.Tapeti.Flow.%GitVersion_NuGetVersion%.nupkg" - cmd: nuget pack Tapeti.SimpleInjector\Tapeti.SimpleInjector.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" - cmd: appveyor PushArtifact "X2Software.Tapeti.SimpleInjector.%GitVersion_NuGetVersion%.nupkg" + - cmd: nuget pack Tapeti.Serilog\Tapeti.Serilog.nuspec -version "%GitVersion_NuGetVersion%" -prop "target=%CONFIGURATION%" + - cmd: appveyor PushArtifact "X2Software.Tapeti.Serilog.%GitVersion_NuGetVersion%.nupkg" assembly_info: patch: false @@ -36,7 +38,7 @@ configuration: deploy: provider: NuGet api_key: - secure: pkaN6R8ocu0Q93uCK3DOCifgr1Q4tuH4ZJ4eiV9U5NmwE5qRM2xjUy4B9SkZCsWx + secure: iol1BCXq2OjgLnobj1/d4hhBvsTZgXN3zyFd8/wp2HnD/vzl+GNJJC7F39yODwEF skip_symbols: false artifact: /.*\.nupkg/ \ No newline at end of file