1
0
mirror of synced 2025-02-01 04:43:07 +01:00

Merge branch 'release/3.2' into develop

This commit is contained in:
Mark van Renswoude 2025-01-27 10:47:32 +01:00
commit c0674fcadc
41 changed files with 662 additions and 276 deletions

9
.readthedocs.yaml Normal file
View File

@ -0,0 +1,9 @@
version 2
build
os ubuntu-22.04
tools
python 3.12
sphinx
configuration docs/conf.py

View File

@ -68,3 +68,6 @@ Master build (stable release)
Latest build Latest build
[![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti) [![Build status](https://ci.appveyor.com/api/projects/status/cyuo0vm7admy0d9x?svg=true)](https://ci.appveyor.com/project/MvRens/tapeti)
## Contributing
By contributing to Tapeti's main repository (https://github.com/MvRens/Tapeti) you agree to dedicate your code-base contributions to the public domain under the Unlicense license.

View File

@ -44,14 +44,14 @@ namespace Tapeti.Flow.SQL
{ {
return await SqlRetryHelper.Execute(async () => return await SqlRetryHelper.Execute(async () =>
{ {
using (var connection = await GetConnection()) using var connection = await GetConnection().ConfigureAwait(false);
{
var flowQuery = new SqlCommand($"select FlowID, CreationTime, StateJson from {tableName}", connection); var flowQuery = new SqlCommand($"select FlowID, CreationTime, StateJson from {tableName}", connection);
var flowReader = await flowQuery.ExecuteReaderAsync(); var flowReader = await flowQuery.ExecuteReaderAsync().ConfigureAwait(false);
var result = new List<FlowRecord<T>>(); var result = new List<FlowRecord<T>>();
while (await flowReader.ReadAsync()) while (await flowReader.ReadAsync().ConfigureAwait(false))
{ {
var flowID = flowReader.GetGuid(0); var flowID = flowReader.GetGuid(0);
var creationTime = flowReader.GetDateTime(1); var creationTime = flowReader.GetDateTime(1);
@ -63,8 +63,7 @@ namespace Tapeti.Flow.SQL
} }
return result; return result;
} }).ConfigureAwait(false);
});
} }
/// <inheritdoc /> /// <inheritdoc />
@ -72,8 +71,8 @@ namespace Tapeti.Flow.SQL
{ {
await SqlRetryHelper.Execute(async () => await SqlRetryHelper.Execute(async () =>
{ {
using (var connection = await GetConnection()) using var connection = await GetConnection().ConfigureAwait(false);
{
var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" + var query = new SqlCommand($"insert into {tableName} (FlowID, StateJson, CreationTime)" +
"values (@FlowID, @StateJson, @CreationTime)", "values (@FlowID, @StateJson, @CreationTime)",
connection); connection);
@ -86,9 +85,8 @@ namespace Tapeti.Flow.SQL
stateJsonParam.Value = JsonConvert.SerializeObject(state); stateJsonParam.Value = JsonConvert.SerializeObject(state);
creationTimeParam.Value = timestamp; creationTimeParam.Value = timestamp;
await query.ExecuteNonQueryAsync(); await query.ExecuteNonQueryAsync().ConfigureAwait(false);
} }).ConfigureAwait(false);
});
} }
/// <inheritdoc /> /// <inheritdoc />
@ -96,8 +94,8 @@ namespace Tapeti.Flow.SQL
{ {
await SqlRetryHelper.Execute(async () => await SqlRetryHelper.Execute(async () =>
{ {
using (var connection = await GetConnection()) using var connection = await GetConnection().ConfigureAwait(false);
{
var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection); var query = new SqlCommand($"update {tableName} set StateJson = @StateJson where FlowID = @FlowID", connection);
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
@ -106,9 +104,8 @@ namespace Tapeti.Flow.SQL
flowIDParam.Value = flowID; flowIDParam.Value = flowID;
stateJsonParam.Value = JsonConvert.SerializeObject(state); stateJsonParam.Value = JsonConvert.SerializeObject(state);
await query.ExecuteNonQueryAsync(); await query.ExecuteNonQueryAsync().ConfigureAwait(false);
} }).ConfigureAwait(false);
});
} }
/// <inheritdoc /> /// <inheritdoc />
@ -116,23 +113,22 @@ namespace Tapeti.Flow.SQL
{ {
await SqlRetryHelper.Execute(async () => await SqlRetryHelper.Execute(async () =>
{ {
using (var connection = await GetConnection()) using var connection = await GetConnection().ConfigureAwait(false);
{
var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection); var query = new SqlCommand($"delete from {tableName} where FlowID = @FlowID", connection);
var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier); var flowIDParam = query.Parameters.Add("@FlowID", SqlDbType.UniqueIdentifier);
flowIDParam.Value = flowID; flowIDParam.Value = flowID;
await query.ExecuteNonQueryAsync(); await query.ExecuteNonQueryAsync().ConfigureAwait(false);
} }).ConfigureAwait(false);
});
} }
private async Task<SqlConnection> GetConnection() private async Task<SqlConnection> GetConnection()
{ {
var connection = new SqlConnection(connectionString); var connection = new SqlConnection(connectionString);
await connection.OpenAsync(); await connection.OpenAsync().ConfigureAwait(false);
return connection; return connection;
} }

View File

@ -27,14 +27,14 @@ namespace Tapeti.Flow.SQL
{ {
try try
{ {
await callback(); await callback().ConfigureAwait(false);
break; break;
} }
catch (SqlException e) catch (SqlException e)
{ {
if (SqlExceptionHelper.IsTransientError(e)) if (SqlExceptionHelper.IsTransientError(e))
{ {
await Task.Delay(ExponentialBackoff[retryAttempt]); await Task.Delay(ExponentialBackoff[retryAttempt]).ConfigureAwait(false);
if (retryAttempt < ExponentialBackoff.Length - 1) if (retryAttempt < ExponentialBackoff.Length - 1)
retryAttempt++; retryAttempt++;
} }
@ -51,8 +51,8 @@ namespace Tapeti.Flow.SQL
await Execute(async () => await Execute(async () =>
{ {
returnValue = await callback(); returnValue = await callback().ConfigureAwait(false);
}); }).ConfigureAwait(false);
return returnValue!; return returnValue!;
} }

View File

@ -16,7 +16,7 @@ namespace Tapeti.Flow.Default
public async Task Execute(FlowContext context) public async Task Execute(FlowContext context)
{ {
await onExecute(context); await onExecute(context).ConfigureAwait(false);
} }
} }
} }

View File

@ -61,8 +61,8 @@ namespace Tapeti.Flow.Default
if (value == null) if (value == null)
throw new InvalidOperationException("Return value should be a Task, not null"); throw new InvalidOperationException("Return value should be a Task, not null");
await (Task)value; await ((Task)value).ConfigureAwait(false);
await HandleParallelResponse(messageContext); await HandleParallelResponse(messageContext).ConfigureAwait(false);
}); });
} }
else if (context.Result.Info.ParameterType == typeof(ValueTask)) else if (context.Result.Info.ParameterType == typeof(ValueTask))
@ -73,8 +73,8 @@ namespace Tapeti.Flow.Default
// ValueTask is a struct and should never be null // ValueTask is a struct and should never be null
throw new UnreachableException("Return value should be a ValueTask, not null"); throw new UnreachableException("Return value should be a ValueTask, not null");
await (ValueTask)value; await ((ValueTask)value).ConfigureAwait(false);
await HandleParallelResponse(messageContext); await HandleParallelResponse(messageContext).ConfigureAwait(false);
}); });
} }
else if (context.Result.Info.ParameterType == typeof(void)) else if (context.Result.Info.ParameterType == typeof(void))
@ -116,8 +116,8 @@ namespace Tapeti.Flow.Default
if (value == null) if (value == null)
throw new InvalidOperationException("Return value should be a Task<IYieldPoint>, not null"); throw new InvalidOperationException("Return value should be a Task<IYieldPoint>, not null");
var yieldPoint = await (Task<IYieldPoint>)value; var yieldPoint = await ((Task<IYieldPoint>)value).ConfigureAwait(false);
await HandleYieldPoint(messageContext, yieldPoint); await HandleYieldPoint(messageContext, yieldPoint).ConfigureAwait(false);
}); });
break; break;
@ -128,8 +128,8 @@ namespace Tapeti.Flow.Default
// ValueTask is a struct and should never be null // ValueTask is a struct and should never be null
throw new UnreachableException("Return value should be a ValueTask<IYieldPoint>, not null"); throw new UnreachableException("Return value should be a ValueTask<IYieldPoint>, not null");
var yieldPoint = await (ValueTask<IYieldPoint>)value; var yieldPoint = await ((ValueTask<IYieldPoint>)value).ConfigureAwait(false);
await HandleYieldPoint(messageContext, yieldPoint); await HandleYieldPoint(messageContext, yieldPoint).ConfigureAwait(false);
}); });
break; break;
@ -148,7 +148,10 @@ namespace Tapeti.Flow.Default
private static ValueTask HandleParallelResponse(IMessageContext context) private static ValueTask HandleParallelResponse(IMessageContext context)
{ {
if (context.TryGet<FlowMessageContextPayload>(out var flowPayload) && flowPayload.FlowIsConverging) if (!context.TryGet<FlowMessageContextPayload>(out var flowPayload))
return default;
if (flowPayload.FlowIsConverging)
return default; return default;
var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = context.Config.DependencyResolver.Resolve<IFlowHandler>();
@ -156,7 +159,7 @@ namespace Tapeti.Flow.Default
{ {
// IFlowParallelRequest.AddRequest will store the flow immediately // IFlowParallelRequest.AddRequest will store the flow immediately
if (!flowPayload.FlowContext.IsStoredOrDeleted()) if (!flowPayload.FlowContext.IsStoredOrDeleted())
await flowContext.Store(context.Binding.QueueType == QueueType.Durable); await flowContext.Store(context.Binding.QueueType == QueueType.Durable).ConfigureAwait(false);
})); }));
} }

View File

@ -16,14 +16,14 @@ namespace Tapeti.Flow.Default
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload)) if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return; return;
var flowContext = await EnrichWithFlowContext(context); var flowContext = await EnrichWithFlowContext(context).ConfigureAwait(false);
if (flowContext?.ContinuationMetadata == null) if (flowContext?.ContinuationMetadata == null)
return; return;
if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method)) if (flowContext.ContinuationMetadata.MethodName != MethodSerializer.Serialize(controllerPayload.Binding.Method))
return; return;
await next(); await next().ConfigureAwait(false);
} }
@ -44,22 +44,22 @@ namespace Tapeti.Flow.Default
// Remove Continuation now because the IYieldPoint result handler will store the new state // Remove Continuation now because the IYieldPoint result handler will store the new state
flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID); flowContext.FlowState.Continuations.Remove(flowContext.ContinuationID);
await next(); await next().ConfigureAwait(false);
if (flowPayload.FlowIsConverging) if (flowPayload.FlowIsConverging)
{ {
var flowHandler = flowContext.HandlerContext.Config.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = flowContext.HandlerContext.Config.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Converge(new FlowHandlerContext(context)); await flowHandler.Converge(new FlowHandlerContext(context)).ConfigureAwait(false);
} }
} }
else else
await next(); await next().ConfigureAwait(false);
} }
public async ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<ValueTask> next) public async ValueTask Cleanup(IMessageContext context, ConsumeResult consumeResult, Func<ValueTask> next)
{ {
await next(); await next().ConfigureAwait(false);
if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload)) if (!context.TryGet<ControllerMessageContextPayload>(out var controllerPayload))
return; return;
@ -78,7 +78,7 @@ namespace Tapeti.Flow.Default
if (!flowContext.IsStoredOrDeleted()) if (!flowContext.IsStoredOrDeleted())
// The exception strategy can set the consume result to Success. Instead, check if the yield point // The exception strategy can set the consume result to Success. Instead, check if the yield point
// was handled. The flow provider ensures we only end up here in case of an exception. // was handled. The flow provider ensures we only end up here in case of an exception.
await flowContext.FlowStateLock.DeleteFlowState(); await flowContext.FlowStateLock.DeleteFlowState().ConfigureAwait(false);
flowContext.FlowStateLock.Dispose(); flowContext.FlowStateLock.Dispose();
} }
@ -100,13 +100,13 @@ namespace Tapeti.Flow.Default
var flowStore = context.Config.DependencyResolver.Resolve<IFlowStore>(); var flowStore = context.Config.DependencyResolver.Resolve<IFlowStore>();
var flowID = await flowStore.FindFlowID(continuationID); var flowID = await flowStore.FindFlowID(continuationID).ConfigureAwait(false);
if (!flowID.HasValue) if (!flowID.HasValue)
return null; return null;
var flowStateLock = await flowStore.LockFlowState(flowID.Value); var flowStateLock = await flowStore.LockFlowState(flowID.Value).ConfigureAwait(false);
var flowState = await flowStateLock.GetFlowState(); var flowState = await flowStateLock.GetFlowState().ConfigureAwait(false);
if (flowState == null) if (flowState == null)
return null; return null;

View File

@ -45,6 +45,20 @@ namespace Tapeti.Flow.Default
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
} }
/// <inheritdoc />
public IYieldPoint YieldWithRequestDirect<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, Task<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo));
}
/// <inheritdoc />
public IYieldPoint YieldWithRequestDirect<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, ValueTask<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo));
}
/// <inheritdoc /> /// <inheritdoc />
public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class public IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class
{ {
@ -52,6 +66,13 @@ namespace Tapeti.Flow.Default
return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo)); return new DelegateYieldPoint(context => SendRequest(context, message, responseHandlerInfo));
} }
/// <inheritdoc />
public IYieldPoint YieldWithRequestDirectSync<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class
{
var responseHandlerInfo = GetResponseHandlerInfo(config, message, responseHandler);
return new DelegateYieldPoint(context => SendRequestDirect(context, message, queueName, responseHandlerInfo));
}
/// <inheritdoc /> /// <inheritdoc />
public IFlowParallelRequestBuilder YieldWithParallelRequest() public IFlowParallelRequestBuilder YieldWithParallelRequest()
{ {
@ -72,11 +93,11 @@ namespace Tapeti.Flow.Default
internal async Task<MessageProperties> PrepareRequest(FlowContext context, ResponseHandlerInfo responseHandlerInfo, internal async Task<MessageProperties> PrepareRequest(FlowContext context, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false) string? convergeMethodName = null, bool convergeMethodTaskSync = false)
{ {
if (!context.HasFlowStateAndLock) if (!context.HasFlowStateAndLock)
{ {
await CreateNewFlowState(context); await CreateNewFlowState(context).ConfigureAwait(false);
Debug.Assert(context.FlowState != null, "context.FlowState != null"); Debug.Assert(context.FlowState != null, "context.FlowState != null");
} }
@ -101,12 +122,22 @@ namespace Tapeti.Flow.Default
internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo, internal async Task SendRequest(FlowContext context, object message, ResponseHandlerInfo responseHandlerInfo,
string convergeMethodName = null, bool convergeMethodTaskSync = false) string? convergeMethodName = null, bool convergeMethodTaskSync = false)
{
var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync).ConfigureAwait(false);
await context.Store(responseHandlerInfo.IsDurableQueue).ConfigureAwait(false);
await publisher.Publish(message, properties, true).ConfigureAwait(false);
}
internal async Task SendRequestDirect(FlowContext context, object message, string queueName, ResponseHandlerInfo responseHandlerInfo,
string? convergeMethodName = null, bool convergeMethodTaskSync = false)
{ {
var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync); var properties = await PrepareRequest(context, responseHandlerInfo, convergeMethodName, convergeMethodTaskSync);
await context.Store(responseHandlerInfo.IsDurableQueue); await context.Store(responseHandlerInfo.IsDurableQueue);
await publisher.Publish(message, properties, true); await publisher.PublishDirect(message, queueName, properties, true);
} }
@ -129,17 +160,17 @@ namespace Tapeti.Flow.Default
// TODO disallow if replyto is not specified? // TODO disallow if replyto is not specified?
if (reply.ReplyTo != null) if (reply.ReplyTo != null)
await publisher.PublishDirect(message, reply.ReplyTo, properties, reply.Mandatory); await publisher.PublishDirect(message, reply.ReplyTo, properties, reply.Mandatory).ConfigureAwait(false);
else else
await publisher.Publish(message, properties, reply.Mandatory); await publisher.Publish(message, properties, reply.Mandatory).ConfigureAwait(false);
await context.Delete(); await context.Delete().ConfigureAwait(false);
} }
internal static async Task EndFlow(FlowContext context) internal static async Task EndFlow(FlowContext context)
{ {
await context.Delete(); await context.Delete().ConfigureAwait(false);
if (context is { HasFlowStateAndLock: true, FlowState.Metadata.Reply: { } }) if (context is { HasFlowStateAndLock: true, FlowState.Metadata.Reply: { } })
throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}"); throw new YieldPointException($"Flow must end with a response message of type {context.FlowState.Metadata.Reply.ResponseTypeName}");
@ -194,7 +225,7 @@ namespace Tapeti.Flow.Default
var flowStore = flowContext.HandlerContext.Config.DependencyResolver.Resolve<IFlowStore>(); var flowStore = flowContext.HandlerContext.Config.DependencyResolver.Resolve<IFlowStore>();
var flowID = Guid.NewGuid(); var flowID = Guid.NewGuid();
var flowStateLock = await flowStore.LockFlowState(flowID); var flowStateLock = await flowStore.LockFlowState(flowID).ConfigureAwait(false);
if (flowStateLock == null) if (flowStateLock == null)
throw new InvalidOperationException("Unable to lock a new flow"); throw new InvalidOperationException("Unable to lock a new flow");
@ -232,7 +263,7 @@ namespace Tapeti.Flow.Default
try try
{ {
await executableYieldPoint.Execute(flowContext); await executableYieldPoint.Execute(flowContext).ConfigureAwait(false);
} }
catch (YieldPointException e) catch (YieldPointException e)
{ {
@ -272,7 +303,7 @@ namespace Tapeti.Flow.Default
if (flowContext.ContinuationMetadata.ConvergeMethodName == null) if (flowContext.ContinuationMetadata.ConvergeMethodName == null)
throw new InvalidOperationException("Missing ConvergeMethodName in FlowContext ContinuationMetadata"); throw new InvalidOperationException("Missing ConvergeMethodName in FlowContext ContinuationMetadata");
await Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync); await Converge(flowContext, flowContext.ContinuationMetadata.ConvergeMethodName, flowContext.ContinuationMetadata.ConvergeMethodSync).ConfigureAwait(false);
})); }));
} }
@ -305,13 +336,13 @@ namespace Tapeti.Flow.Default
if (yieldPointTask == null) if (yieldPointTask == null)
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}"); throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}");
yieldPoint = await (Task<IYieldPoint>)yieldPointTask; yieldPoint = await ((Task<IYieldPoint>)yieldPointTask).ConfigureAwait(false);
} }
if (yieldPoint == null) if (yieldPoint == null)
throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}"); throw new YieldPointException($"Yield point is required in controller {controllerPayload.Controller.GetType().Name} for converge method {convergeMethodName}");
await Execute(flowContext.HandlerContext, yieldPoint); await Execute(flowContext.HandlerContext, yieldPoint).ConfigureAwait(false);
} }
@ -424,13 +455,13 @@ namespace Tapeti.Flow.Default
context, context,
requestInfo.ResponseHandlerInfo, requestInfo.ResponseHandlerInfo,
convergeMethod.Method.Name, convergeMethod.Method.Name,
convergeMethodSync); convergeMethodSync).ConfigureAwait(false);
preparedRequests.Add(new PreparedRequest(requestInfo.Message, properties)); preparedRequests.Add(new PreparedRequest(requestInfo.Message, properties));
} }
await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)); await context.Store(requests.Any(i => i.ResponseHandlerInfo.IsDurableQueue)).ConfigureAwait(false);
await Task.WhenAll(preparedRequests.Select(r => publisher.Publish(r.Message, r.Properties, true))); await Task.WhenAll(preparedRequests.Select(r => publisher.Publish(r.Message, r.Properties, true))).ConfigureAwait(false);
}); });
} }
} }

View File

@ -25,25 +25,25 @@ namespace Tapeti.Flow.Default
/// <inheritdoc /> /// <inheritdoc />
public async Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class public async Task Start<TController>(Expression<Func<TController, Func<IYieldPoint>>> methodSelector) where TController : class
{ {
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), Array.Empty<object?>()); await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), Array.Empty<object?>()).ConfigureAwait(false);
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class public async Task Start<TController>(Expression<Func<TController, Func<Task<IYieldPoint>>>> methodSelector) where TController : class
{ {
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, Array.Empty<object?>()); await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, Array.Empty<object?>()).ConfigureAwait(false);
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, IYieldPoint>>> methodSelector, TParameter parameter) where TController : class
{ {
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object?[] {parameter}); await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => Task.FromResult((IYieldPoint)value), new object?[] {parameter}).ConfigureAwait(false);
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class public async Task Start<TController, TParameter>(Expression<Func<TController, Func<TParameter, Task<IYieldPoint>>>> methodSelector, TParameter parameter) where TController : class
{ {
await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object?[] {parameter}); await CallControllerMethod<TController>(GetExpressionMethod(methodSelector), value => (Task<IYieldPoint>)value, new object?[] {parameter}).ConfigureAwait(false);
} }
@ -54,12 +54,12 @@ namespace Tapeti.Flow.Default
if (result == null) if (result == null)
throw new InvalidOperationException($"Method {method.Name} must return an IYieldPoint or Task<IYieldPoint>, got null"); throw new InvalidOperationException($"Method {method.Name} must return an IYieldPoint or Task<IYieldPoint>, got null");
var yieldPoint = await getYieldPointResult(result); var yieldPoint = await getYieldPointResult(result).ConfigureAwait(false);
var context = new FlowHandlerContext(config, controller, method); var context = new FlowHandlerContext(config, controller, method);
var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>(); var flowHandler = config.DependencyResolver.Resolve<IFlowHandler>();
await flowHandler.Execute(context, yieldPoint); await flowHandler.Execute(context, yieldPoint).ConfigureAwait(false);
} }

View File

@ -64,7 +64,7 @@ namespace Tapeti.Flow.Default
validatedMethods = new HashSet<string>(); validatedMethods = new HashSet<string>();
try try
{ {
foreach (var flowStateRecord in await repository.GetStates<FlowState>()) foreach (var flowStateRecord in await repository.GetStates<FlowState>().ConfigureAwait(false))
{ {
flowStates.TryAdd(flowStateRecord.FlowID, new CachedFlowState(flowStateRecord.FlowState, flowStateRecord.CreationTime, true)); flowStates.TryAdd(flowStateRecord.FlowID, new CachedFlowState(flowStateRecord.FlowState, flowStateRecord.CreationTime, true));
@ -134,7 +134,7 @@ namespace Tapeti.Flow.Default
inUse = true; inUse = true;
var flowStatelock = new FlowStateLock(this, flowID, await locks.GetLock(flowID)); var flowStatelock = new FlowStateLock(this, flowID, await locks.GetLock(flowID).ConfigureAwait(false));
return flowStatelock; return flowStatelock;
} }
@ -215,18 +215,18 @@ namespace Tapeti.Flow.Default
// Storing the flowstate in the underlying repository // Storing the flowstate in the underlying repository
if (isNew) if (isNew)
{ {
await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, cachedFlowState.CreationTime); await owner.repository.CreateState(FlowID, cachedFlowState.FlowState, cachedFlowState.CreationTime).ConfigureAwait(false);
} }
else else
{ {
await owner.repository.UpdateState(FlowID, cachedFlowState.FlowState); await owner.repository.UpdateState(FlowID, cachedFlowState.FlowState).ConfigureAwait(false);
} }
} }
else if (wasPersistent) else if (wasPersistent)
{ {
// We transitioned from a durable queue to a dynamic queue, // We transitioned from a durable queue to a dynamic queue,
// remove the persistent state but keep the in-memory version // remove the persistent state but keep the in-memory version
await owner.repository.DeleteState(FlowID); await owner.repository.DeleteState(FlowID).ConfigureAwait(false);
} }
} }
@ -244,7 +244,7 @@ namespace Tapeti.Flow.Default
cachedFlowState = null; cachedFlowState = null;
if (removedFlowState is { IsPersistent: true }) if (removedFlowState is { IsPersistent: true })
await owner.repository.DeleteState(FlowID); await owner.repository.DeleteState(FlowID).ConfigureAwait(false);
} }
} }
} }

View File

@ -35,6 +35,34 @@ namespace Tapeti.Flow
IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class; IYieldPoint YieldWithRequest<TRequest, TResponse>(TRequest message, Func<TResponse, ValueTask<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message directly to a queue and continue the flow when the response arrives.
/// The exchange and routing key are not used.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="queueName"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint YieldWithRequestDirect<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, Task<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message directly to a queue and continue the flow when the response arrives.
/// The exchange and routing key are not used.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for asynchronous response handlers.
/// </summary>
/// <param name="message"></param>
/// <param name="queueName"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
IYieldPoint YieldWithRequestDirect<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, ValueTask<IYieldPoint>> responseHandler) where TRequest : class where TResponse : class;
/// <summary> /// <summary>
/// Publish a request message and continue the flow when the response arrives. /// Publish a request message and continue the flow when the response arrives.
/// The request message must be marked with the [Request] attribute, and the /// The request message must be marked with the [Request] attribute, and the
@ -54,6 +82,27 @@ namespace Tapeti.Flow
IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class; IYieldPoint YieldWithRequestSync<TRequest, TResponse>(TRequest message, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class;
/// <summary>
/// Publish a request message directly to a queue and continue the flow when the response arrives.
/// The exchange and routing key are not used.
/// The request message must be marked with the [Request] attribute, and the
/// Response type must match. Used for synchronous response handlers.
/// </summary>
/// <remarks>
/// The reason why this requires the extra 'Sync' in the name: one does not simply overload methods
/// with Task vs non-Task Funcs. "Ambiguous call". Apparantly this is because a return type
/// of a method is not part of its signature,according to:
/// http://stackoverflow.com/questions/18715979/ambiguity-with-action-and-func-parameter
/// </remarks>
/// <param name="message"></param>
/// <param name="queueName"></param>
/// <param name="responseHandler"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
IYieldPoint YieldWithRequestDirectSync<TRequest, TResponse>(TRequest message, string queueName, Func<TResponse, IYieldPoint> responseHandler) where TRequest : class where TResponse : class;
/// <summary> /// <summary>
/// Create a request builder to publish one or more requests messages. Call Yield on the resulting builder /// Create a request builder to publish one or more requests messages. Call Yield on the resulting builder
/// to acquire an IYieldPoint. /// to acquire an IYieldPoint.

View File

@ -1,4 +1,6 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using Serilog.Core; using Serilog.Core;
using Serilog.Events; using Serilog.Events;
@ -10,16 +12,20 @@ namespace Tapeti.Serilog.Default
public class DiagnosticContext : IDiagnosticContext public class DiagnosticContext : IDiagnosticContext
{ {
private readonly global::Serilog.ILogger logger; private readonly global::Serilog.ILogger logger;
private readonly Stopwatch stopwatch;
private readonly List<LogEventProperty> properties = new(); private readonly List<LogEventProperty> properties = new();
private int resetCount;
/// <summary> /// <summary>
/// Creates a new instance of a DiagnosticContext /// Creates a new instance of a DiagnosticContext
/// </summary> /// </summary>
/// <param name="logger">The Serilog ILogger which will be enriched</param> /// <param name="logger">The Serilog ILogger which will be enriched</param>
public DiagnosticContext(global::Serilog.ILogger logger) /// <param name="stopwatch">The Stopwatch instance that monitors the run time of the message handler</param>
public DiagnosticContext(global::Serilog.ILogger logger, Stopwatch stopwatch)
{ {
this.logger = logger; this.logger = logger;
this.stopwatch = stopwatch;
} }
@ -31,6 +37,17 @@ namespace Tapeti.Serilog.Default
} }
/// <inheritdoc />
public void ResetStopwatch(bool addToContext = true, string propertyNamePrefix = "stopwatchReset")
{
var newResetCount = Interlocked.Increment(ref resetCount);
if (addToContext)
Set(propertyNamePrefix + newResetCount, stopwatch.ElapsedMilliseconds);
stopwatch.Restart();
}
/// <summary> /// <summary>
/// Returns a Serilog ILogger which is enriched with the properties set by this DiagnosticContext /// Returns a Serilog ILogger which is enriched with the properties set by this DiagnosticContext
/// </summary> /// </summary>

View File

@ -5,8 +5,7 @@
/// MessageHandlerLogging middleware. /// MessageHandlerLogging middleware.
/// </summary> /// </summary>
/// <remarks> /// <remarks>
/// This is a one-to-one copy of the IDiagnosticContext in Serilog.Extensions.Hosting which /// Similar to IDiagnosticContext in Serilog.Extensions.Hosting but slightly extended.
/// saves a reference to that package while allowing similar usage within Tapeti message handlers.
/// </remarks> /// </remarks>
public interface IDiagnosticContext public interface IDiagnosticContext
{ {
@ -19,5 +18,14 @@
/// <param name="destructureObjects">If true, the value will be serialized as structured /// <param name="destructureObjects">If true, the value will be serialized as structured
/// data if possible; if false, the object will be recorded as a scalar or simple array.</param> /// data if possible; if false, the object will be recorded as a scalar or simple array.</param>
void Set(string propertyName, object value, bool destructureObjects = false); void Set(string propertyName, object value, bool destructureObjects = false);
/// <summary>
/// Resets the timer which is used to monitor how long a message handler takes to complete.
/// Useful for example when a message handler is throttled by a rate limiter in the message
/// handler method and you want to measure only the time taken after it is allowed to start.
/// </summary>
/// <param name="addToContext">If true, the time taken until this reset is added to this diagnostic context as an incrementally named property for logging purposes. The value will be the time in milliseconds.</param>
/// <param name="propertyNamePrefix">The prefix for the property name(s) when addToContext is true. The number of times ResetStopwatch is called will be appended (stopwatchReset1, stopwatchReset2, etc).</param>
void ResetStopwatch(bool addToContext = true, string propertyNamePrefix = "stopwatchReset");
} }
} }

View File

@ -33,13 +33,13 @@ namespace Tapeti.Serilog.Middleware
{ {
var logger = context.Config.DependencyResolver.Resolve<global::Serilog.ILogger>(); var logger = context.Config.DependencyResolver.Resolve<global::Serilog.ILogger>();
var diagnosticContext = new DiagnosticContext(logger); var stopwatch = new Stopwatch();
var diagnosticContext = new DiagnosticContext(logger, stopwatch);
context.Store(new DiagnosticContextPayload(diagnosticContext)); context.Store(new DiagnosticContextPayload(diagnosticContext));
var stopwatch = new Stopwatch();
stopwatch.Start(); stopwatch.Start();
await next(); await next().ConfigureAwait(false);
stopwatch.Stop(); stopwatch.Stop();

View File

@ -32,7 +32,7 @@ namespace Tapeti.Transient
/// <inheritdoc /> /// <inheritdoc />
public async ValueTask Apply(IBindingTarget target) public async ValueTask Apply(IBindingTarget target)
{ {
QueueName = await target.BindDynamicDirect(dynamicQueuePrefix, null); QueueName = await target.BindDynamicDirect(dynamicQueuePrefix, null).ConfigureAwait(false);
router.TransientResponseQueueName = QueueName; router.TransientResponseQueueName = QueueName;
} }

View File

@ -23,7 +23,7 @@ namespace Tapeti.Transient
/// <inheritdoc /> /// <inheritdoc />
public async Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request) where TRequest : class where TResponse : class public async Task<TResponse> RequestResponse<TRequest, TResponse>(TRequest request) where TRequest : class where TResponse : class
{ {
return (TResponse)await router.RequestResponse(publisher, request); return (TResponse)await router.RequestResponse(publisher, request).ConfigureAwait(false);
} }
} }
} }

View File

@ -71,7 +71,7 @@ namespace Tapeti.Transient
Persistent = false Persistent = false
}; };
await ((IInternalPublisher)publisher).Publish(request, properties, false); await ((IInternalPublisher)publisher).Publish(request, properties, false).ConfigureAwait(false);
} }
catch (Exception) catch (Exception)
{ {
@ -84,7 +84,7 @@ namespace Tapeti.Transient
await using (new Timer(TimeoutResponse, tcs, defaultTimeoutMs, -1)) await using (new Timer(TimeoutResponse, tcs, defaultTimeoutMs, -1))
{ {
return await tcs.Task; return await tcs.Task.ConfigureAwait(false);
} }
} }

View File

@ -8,7 +8,9 @@
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SQL/@EntryIndexedValue">SQL</s:String> <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SQL/@EntryIndexedValue">SQL</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=UTF/@EntryIndexedValue">UTF</s:String> <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=UTF/@EntryIndexedValue">UTF</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /&gt;</s:String> <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /&gt;</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/UserRules/=4a98fdf6_002D7d98_002D4f5a_002Dafeb_002Dea44ad98c70c/@EntryIndexedValue">&lt;Policy&gt;&lt;Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"&gt;&lt;ElementKinds&gt;&lt;Kind Name="FIELD" /&gt;&lt;Kind Name="READONLY_FIELD" /&gt;&lt;/ElementKinds&gt;&lt;/Descriptor&gt;&lt;Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="aaBb" /&gt;&lt;/Policy&gt;</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean> <s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean> <s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean> <s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary> <s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EPredefinedNamingRulesToUserRulesUpgrade/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

View File

@ -17,7 +17,10 @@ namespace Tapeti.Config
/// <summary> /// <summary>
/// Various Tapeti features which can be turned on or off. /// Various Tapeti features which can be turned on or off.
/// </summary> /// </summary>
ITapetiConfigFeatues Features { get; } /// <remarks>
/// Calling this method will freeze the feature set if <see cref="ITapetiConfigBuilder.DelayFeatures"/> is used.
/// </remarks>
ITapetiConfigFeatures GetFeatures();
/// <summary> /// <summary>
/// Provides access to the different kinds of registered middleware. /// Provides access to the different kinds of registered middleware.
@ -34,7 +37,7 @@ namespace Tapeti.Config
/// <summary> /// <summary>
/// Various Tapeti features which can be turned on or off. /// Various Tapeti features which can be turned on or off.
/// </summary> /// </summary>
public interface ITapetiConfigFeatues public interface ITapetiConfigFeatures
{ {
/// <summary> /// <summary>
/// Determines whether 'publisher confirms' are used. This RabbitMQ features allows Tapeti to /// Determines whether 'publisher confirms' are used. This RabbitMQ features allows Tapeti to

View File

@ -53,6 +53,40 @@ namespace Tapeti.Config
void RegisterBinding(IBinding binding); void RegisterBinding(IBinding binding);
/// <inheritdoc cref="ITapetiConfigFeaturesBuilder.DisablePublisherConfirms"/>
ITapetiConfigBuilder DisablePublisherConfirms();
/// <inheritdoc cref="ITapetiConfigFeaturesBuilder.SetPublisherConfirms"/>
ITapetiConfigBuilder SetPublisherConfirms(bool enabled);
/// <inheritdoc cref="ITapetiConfigFeaturesBuilder.EnableDeclareDurableQueues"/>
ITapetiConfigBuilder EnableDeclareDurableQueues();
/// <inheritdoc cref="ITapetiConfigFeaturesBuilder.SetDeclareDurableQueues"/>
ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled);
/// <inheritdoc cref="ITapetiConfigFeaturesBuilder.DisableVerifyDurableQueues"/>
ITapetiConfigBuilder DisableVerifyDurableQueues();
/// <inheritdoc cref="ITapetiConfigFeaturesBuilder.SetVerifyDurableQueues"/>
ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled);
/// <summary>
/// Allows the core features to be determine on-demand when first required by the connection instead
/// of before <see cref="TapetiConnection"/> is constructed.
/// </summary>
/// <param name="onBuild">Called when the feature set is required. From that moment on the feature set is frozen.</param>
ITapetiConfigBuilder DelayFeatures(Action<ITapetiConfigFeaturesBuilder> onBuild);
}
/// <summary>
/// Configures Tapeti core features. Every method returns the builder instance for method chaining.
/// </summary>
public interface ITapetiConfigFeaturesBuilder
{
/// <summary> /// <summary>
/// Disables 'publisher confirms'. This RabbitMQ features allows Tapeti to be notified if a message /// Disables 'publisher confirms'. This RabbitMQ features allows Tapeti to be notified if a message
/// has no route, and guarantees delivery for request-response style messages and those marked with /// has no route, and guarantees delivery for request-response style messages and those marked with
@ -62,7 +96,7 @@ namespace Tapeti.Config
/// and disables Tapeti.Flow from verifying if a request/response can be routed. This may /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may
/// result in never-ending flows. Only disable if you can accept those consequences. /// result in never-ending flows. Only disable if you can accept those consequences.
/// </summary> /// </summary>
ITapetiConfigBuilder DisablePublisherConfirms(); ITapetiConfigFeaturesBuilder DisablePublisherConfirms();
/// <summary> /// <summary>
@ -74,7 +108,7 @@ namespace Tapeti.Config
/// and disables Tapeti.Flow from verifying if a request/response can be routed. This may /// and disables Tapeti.Flow from verifying if a request/response can be routed. This may
/// result in never-ending flows. Only disable if you can accept those consequences. /// result in never-ending flows. Only disable if you can accept those consequences.
/// </summary> /// </summary>
ITapetiConfigBuilder SetPublisherConfirms(bool enabled); ITapetiConfigFeaturesBuilder SetPublisherConfirms(bool enabled);
/// <summary> /// <summary>
@ -84,7 +118,7 @@ namespace Tapeti.Config
/// Note that access to the RabbitMQ Management plugin's REST API is required for this /// Note that access to the RabbitMQ Management plugin's REST API is required for this
/// feature to work, since AMQP does not provide a way to query existing bindings. /// feature to work, since AMQP does not provide a way to query existing bindings.
/// </remarks> /// </remarks>
ITapetiConfigBuilder EnableDeclareDurableQueues(); ITapetiConfigFeaturesBuilder EnableDeclareDurableQueues();
/// <summary> /// <summary>
/// Configures the automatic creation of durable queues and updating of their bindings. /// Configures the automatic creation of durable queues and updating of their bindings.
@ -93,7 +127,7 @@ namespace Tapeti.Config
/// Note that access to the RabbitMQ Management plugin's REST API is required for this /// Note that access to the RabbitMQ Management plugin's REST API is required for this
/// feature to work, since AMQP does not provide a way to query existing bindings. /// feature to work, since AMQP does not provide a way to query existing bindings.
/// </remarks> /// </remarks>
ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled); ITapetiConfigFeaturesBuilder SetDeclareDurableQueues(bool enabled);
/// <summary> /// <summary>
@ -102,7 +136,7 @@ namespace Tapeti.Config
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error /// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
/// while verifying. /// while verifying.
/// </summary> /// </summary>
ITapetiConfigBuilder DisableVerifyDurableQueues(); ITapetiConfigFeaturesBuilder DisableVerifyDurableQueues();
/// <summary> /// <summary>
@ -111,7 +145,7 @@ namespace Tapeti.Config
/// exchange, which do not correspond to Tapeti's configuration, as these will cause an error /// exchange, which do not correspond to Tapeti's configuration, as these will cause an error
/// while verifying. /// while verifying.
/// </summary> /// </summary>
ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled); ITapetiConfigFeaturesBuilder SetVerifyDurableQueues(bool enabled);
} }

View File

@ -17,30 +17,35 @@ namespace Tapeti.Connection
/// <summary> /// <summary>
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer /// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
/// </summary> /// </summary>
internal class TapetiBasicConsumer : DefaultBasicConsumer internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer
{ {
private readonly IConsumer consumer; private readonly IConsumer consumer;
private readonly IMessageHandlerTracker messageHandlerTracker;
private readonly long connectionReference; private readonly long connectionReference;
private readonly ResponseFunc onRespond; private readonly ResponseFunc onRespond;
/// <inheritdoc /> /// <inheritdoc />
public TapetiBasicConsumer(IConsumer consumer, long connectionReference, ResponseFunc onRespond) public TapetiBasicConsumer(IConsumer consumer, IMessageHandlerTracker messageHandlerTracker, long connectionReference, ResponseFunc onRespond)
{ {
this.consumer = consumer; this.consumer = consumer;
this.messageHandlerTracker = messageHandlerTracker;
this.connectionReference = connectionReference; this.connectionReference = connectionReference;
this.onRespond = onRespond; this.onRespond = onRespond;
} }
/// <inheritdoc /> /// <inheritdoc />
public override void HandleBasicDeliver(string consumerTag, public override async Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag, ulong deliveryTag,
bool redelivered, bool redelivered,
string exchange, string exchange,
string routingKey, string routingKey,
IBasicProperties properties, IBasicProperties properties,
ReadOnlyMemory<byte> body) ReadOnlyMemory<byte> body)
{
messageHandlerTracker.Enter();
try
{ {
// RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing // RabbitMQ.Client 6+ re-uses the body memory. Unfortunately Newtonsoft.Json does not support deserializing
// from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support // from Span/ReadOnlyMemory yet so we still need to use ToArray and allocate heap memory for it. When support
@ -50,20 +55,20 @@ namespace Tapeti.Connection
// See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761 // See also: https://github.com/JamesNK/Newtonsoft.Json/issues/1761
var bodyArray = body.ToArray(); var bodyArray = body.ToArray();
// Changing to AsyncDefaultBasicConsumer does not mean HandleBasicDeliver runs in parallel, the Task.Run would
// still be necessary, which is why TapetiBasicConsumer is a DefaultBasicConsumer.
Task.Run(async () =>
{
try try
{ {
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray); var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), bodyArray).ConfigureAwait(false);
await onRespond(connectionReference, deliveryTag, response); await onRespond(connectionReference, deliveryTag, response).ConfigureAwait(false);
} }
catch catch
{ {
await onRespond(connectionReference, deliveryTag, ConsumeResult.Error); await onRespond(connectionReference, deliveryTag, ConsumeResult.Error).ConfigureAwait(false);
}
}
finally
{
messageHandlerTracker.Exit();
} }
});
} }
} }
} }

View File

@ -45,7 +45,7 @@ namespace Tapeti.Connection
if (capturedTaskQueue == null) if (capturedTaskQueue == null)
return; return;
await capturedTaskQueue.Add(() => { }); await capturedTaskQueue.Add(() => { }).ConfigureAwait(false);
capturedTaskQueue.Dispose(); capturedTaskQueue.Dispose();
} }
@ -74,7 +74,7 @@ namespace Tapeti.Connection
{ {
return GetTaskQueue().Add(async () => return GetTaskQueue().Add(async () =>
{ {
await operation(modelProvider); await operation(modelProvider).ConfigureAwait(false);
}); });
} }

View File

@ -33,6 +33,7 @@ namespace Tapeti.Connection
private const int ReconnectDelay = 5000; private const int ReconnectDelay = 5000;
private const int MandatoryReturnTimeout = 300000; private const int MandatoryReturnTimeout = 300000;
private const int MinimumConnectedReconnectDelay = 1000; private const int MinimumConnectedReconnectDelay = 1000;
private const int CloseMessageHandlersTimeout = 30000;
private readonly TapetiConnectionParams connectionParams; private readonly TapetiConnectionParams connectionParams;
@ -49,6 +50,7 @@ namespace Tapeti.Connection
private readonly TapetiChannel consumeChannel; private readonly TapetiChannel consumeChannel;
private readonly TapetiChannel publishChannel; private readonly TapetiChannel publishChannel;
private readonly HttpClient managementClient; private readonly HttpClient managementClient;
private readonly MessageHandlerTracker messageHandlerTracker = new();
// These fields must be locked using connectionLock // These fields must be locked using connectionLock
private readonly object connectionLock = new(); private readonly object connectionLock = new();
@ -135,7 +137,7 @@ namespace Tapeti.Connection
DeclareExchange(channel, exchange); DeclareExchange(channel, exchange);
// The delivery tag is lost after a reconnect, register under the new tag // The delivery tag is lost after a reconnect, register under the new tag
if (config.Features.PublisherConfirms) if (config.GetFeatures().PublisherConfirms)
{ {
lastDeliveryTag++; lastDeliveryTag++;
@ -175,7 +177,7 @@ namespace Tapeti.Connection
var delayCancellationTokenSource = new CancellationTokenSource(); var delayCancellationTokenSource = new CancellationTokenSource();
var signalledTask = await Task.WhenAny( var signalledTask = await Task.WhenAny(
publishResultTask, publishResultTask,
Task.Delay(MandatoryReturnTimeout, delayCancellationTokenSource.Token)); Task.Delay(MandatoryReturnTimeout, delayCancellationTokenSource.Token)).ConfigureAwait(false);
if (signalledTask != publishResultTask) if (signalledTask != publishResultTask)
throw new TimeoutException( throw new TimeoutException(
@ -201,7 +203,7 @@ namespace Tapeti.Connection
throw new NoRouteException( throw new NoRouteException(
$"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' could not be delivered, reply code: {replyCode}"); $"Mandatory message with exchange '{exchange}' and routing key '{routingKey}' could not be delivered, reply code: {replyCode}");
} }
}); }).ConfigureAwait(false);
} }
@ -224,9 +226,9 @@ namespace Tapeti.Connection
return; return;
capturedConnectionReference = Interlocked.Read(ref connectionReference); capturedConnectionReference = Interlocked.Read(ref connectionReference);
var basicConsumer = new TapetiBasicConsumer(consumer, capturedConnectionReference, Respond); var basicConsumer = new TapetiBasicConsumer(consumer, messageHandlerTracker, capturedConnectionReference, Respond);
consumerTag = channel.BasicConsume(queueName, false, basicConsumer); consumerTag = channel.BasicConsume(queueName, false, basicConsumer);
}); }).ConfigureAwait(false);
return consumerTag == null return consumerTag == null
? null ? null
@ -257,7 +259,7 @@ namespace Tapeti.Connection
return; return;
channel.BasicCancel(consumerTag.ConsumerTag); channel.BasicCancel(consumerTag.ConsumerTag);
}); }).ConfigureAwait(false);
} }
@ -291,13 +293,13 @@ namespace Tapeti.Connection
default: default:
throw new ArgumentOutOfRangeException(nameof(result), result, null); throw new ArgumentOutOfRangeException(nameof(result), result, null);
} }
}); }).ConfigureAwait(false);
} }
private async Task<bool> GetDurableQueueDeclareRequired(string queueName, IRabbitMQArguments? arguments) private async Task<bool> GetDurableQueueDeclareRequired(string queueName, IRabbitMQArguments? arguments)
{ {
var existingQueue = await GetQueueInfo(queueName); var existingQueue = await GetQueueInfo(queueName).ConfigureAwait(false);
if (existingQueue == null) if (existingQueue == null)
return true; return true;
@ -342,9 +344,9 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public async Task DurableQueueDeclare(string queueName, IEnumerable<QueueBinding> bindings, IRabbitMQArguments? arguments, CancellationToken cancellationToken) public async Task DurableQueueDeclare(string queueName, IEnumerable<QueueBinding> bindings, IRabbitMQArguments? arguments, CancellationToken cancellationToken)
{ {
var declareRequired = await GetDurableQueueDeclareRequired(queueName, arguments); var declareRequired = await GetDurableQueueDeclareRequired(queueName, arguments).ConfigureAwait(false);
var existingBindings = (await GetQueueBindings(queueName)).ToList(); var existingBindings = (await GetQueueBindings(queueName).ConfigureAwait(false)).ToList();
var currentBindings = bindings.ToList(); var currentBindings = bindings.ToList();
var bindingLogger = logger as IBindingLogger; var bindingLogger = logger as IBindingLogger;
@ -371,7 +373,7 @@ namespace Tapeti.Connection
bindingLogger?.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey); bindingLogger?.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey);
channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey); channel.QueueUnbind(queueName, deletedBinding.Exchange, deletedBinding.RoutingKey);
} }
}); }).ConfigureAwait(false);
} }
@ -386,7 +388,7 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public async Task DurableQueueVerify(string queueName, IRabbitMQArguments? arguments, CancellationToken cancellationToken) public async Task DurableQueueVerify(string queueName, IRabbitMQArguments? arguments, CancellationToken cancellationToken)
{ {
if (!await GetDurableQueueDeclareRequired(queueName, arguments)) if (!await GetDurableQueueDeclareRequired(queueName, arguments).ConfigureAwait(false))
return; return;
await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel =>
@ -396,7 +398,7 @@ namespace Tapeti.Connection
(logger as IBindingLogger)?.QueueDeclare(queueName, true, true); (logger as IBindingLogger)?.QueueDeclare(queueName, true, true);
channel.QueueDeclarePassive(queueName); channel.QueueDeclarePassive(queueName);
}); }).ConfigureAwait(false);
} }
@ -413,7 +415,7 @@ namespace Tapeti.Connection
return; return;
deletedMessages = channel.QueueDelete(queueName); deletedMessages = channel.QueueDelete(queueName);
}); }).ConfigureAwait(false);
deletedQueues.Add(queueName); deletedQueues.Add(queueName);
(logger as IBindingLogger)?.QueueObsolete(queueName, true, deletedMessages); (logger as IBindingLogger)?.QueueObsolete(queueName, true, deletedMessages);
@ -434,7 +436,7 @@ namespace Tapeti.Connection
// Get queue information from the Management API, since the AMQP operations will // Get queue information from the Management API, since the AMQP operations will
// throw an error if the queue does not exist or still contains messages and resets // throw an error if the queue does not exist or still contains messages and resets
// the connection. The resulting reconnect will cause subscribers to reset. // the connection. The resulting reconnect will cause subscribers to reset.
var queueInfo = await GetQueueInfo(queueName); var queueInfo = await GetQueueInfo(queueName).ConfigureAwait(false);
if (queueInfo == null) if (queueInfo == null)
{ {
deletedQueues.Add(queueName); deletedQueues.Add(queueName);
@ -467,7 +469,7 @@ namespace Tapeti.Connection
else else
{ {
// Remove all bindings instead // Remove all bindings instead
var existingBindings = (await GetQueueBindings(queueName)).ToList(); var existingBindings = (await GetQueueBindings(queueName).ConfigureAwait(false)).ToList();
if (existingBindings.Count > 0) if (existingBindings.Count > 0)
{ {
@ -481,7 +483,7 @@ namespace Tapeti.Connection
(logger as IBindingLogger)?.QueueObsolete(queueName, false, queueInfo.Messages); (logger as IBindingLogger)?.QueueObsolete(queueName, false, queueInfo.Messages);
} }
} while (retry); } while (retry);
}); }).ConfigureAwait(false);
} }
@ -507,7 +509,7 @@ namespace Tapeti.Connection
queueName = channel.QueueDeclare(arguments: GetDeclareArguments(arguments)).QueueName; queueName = channel.QueueDeclare(arguments: GetDeclareArguments(arguments)).QueueName;
bindingLogger?.QueueDeclare(queueName, false, false); bindingLogger?.QueueDeclare(queueName, false, false);
} }
}); }).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
if (queueName == null) if (queueName == null)
@ -527,7 +529,7 @@ namespace Tapeti.Connection
DeclareExchange(channel, binding.Exchange); DeclareExchange(channel, binding.Exchange);
(logger as IBindingLogger)?.QueueBind(queueName, false, binding.Exchange, binding.RoutingKey); (logger as IBindingLogger)?.QueueBind(queueName, false, binding.Exchange, binding.RoutingKey);
channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey); channel.QueueBind(queueName, binding.Exchange, binding.RoutingKey);
}); }).ConfigureAwait(false);
} }
@ -551,8 +553,8 @@ namespace Tapeti.Connection
} }
// Empty the queue // Empty the queue
await consumeChannel.Reset(); await consumeChannel.Reset().ConfigureAwait(false);
await publishChannel.Reset(); await publishChannel.Reset().ConfigureAwait(false);
// No need to close the channels as the connection will be closed // No need to close the channels as the connection will be closed
capturedConsumeModel?.Dispose(); capturedConsumeModel?.Dispose();
@ -570,6 +572,9 @@ namespace Tapeti.Connection
capturedConnection.Dispose(); capturedConnection.Dispose();
} }
} }
// Wait for message handlers to finish
await messageHandlerTracker.WaitForIdle(CloseMessageHandlersTimeout);
} }
@ -619,9 +624,9 @@ namespace Tapeti.Connection
response.EnsureSuccessStatusCode(); response.EnsureSuccessStatusCode();
var content = await response.Content.ReadAsStringAsync(); var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
return JsonConvert.DeserializeObject<ManagementQueueInfo>(content); return JsonConvert.DeserializeObject<ManagementQueueInfo>(content);
}); }).ConfigureAwait(false);
} }
@ -659,7 +664,7 @@ namespace Tapeti.Connection
{ {
response.EnsureSuccessStatusCode(); response.EnsureSuccessStatusCode();
var content = await response.Content.ReadAsStringAsync(); var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
var bindings = JsonConvert.DeserializeObject<IEnumerable<ManagementBinding>>(content); var bindings = JsonConvert.DeserializeObject<IEnumerable<ManagementBinding>>(content);
// Filter out the binding to an empty source, which is always present for direct-to-queue routing // Filter out the binding to an empty source, which is always present for direct-to-queue routing
@ -667,7 +672,7 @@ namespace Tapeti.Connection
.Where(binding => !string.IsNullOrEmpty(binding.Source) && !string.IsNullOrEmpty(binding.RoutingKey)) .Where(binding => !string.IsNullOrEmpty(binding.Source) && !string.IsNullOrEmpty(binding.RoutingKey))
.Select(binding => new QueueBinding(binding.Source!, binding.RoutingKey!)) .Select(binding => new QueueBinding(binding.Source!, binding.RoutingKey!))
?? Enumerable.Empty<QueueBinding>(); ?? Enumerable.Empty<QueueBinding>();
}); }).ConfigureAwait(false);
} }
@ -702,8 +707,8 @@ namespace Tapeti.Connection
{ {
try try
{ {
var response = await managementClient.SendAsync(request); var response = await managementClient.SendAsync(request).ConfigureAwait(false);
return await handleResponse(response); return await handleResponse(response).ConfigureAwait(false);
} }
catch (TimeoutException) catch (TimeoutException)
{ {
@ -717,7 +722,7 @@ namespace Tapeti.Connection
throw; throw;
} }
await Task.Delay(ExponentialBackoff[retryDelayIndex]); await Task.Delay(ExponentialBackoff[retryDelayIndex]).ConfigureAwait(false);
if (retryDelayIndex < ExponentialBackoff.Length - 1) if (retryDelayIndex < ExponentialBackoff.Length - 1)
retryDelayIndex++; retryDelayIndex++;
@ -777,9 +782,13 @@ namespace Tapeti.Connection
Password = connectionParams.Password, Password = connectionParams.Password,
AutomaticRecoveryEnabled = false, AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false, TopologyRecoveryEnabled = false,
RequestedHeartbeat = TimeSpan.FromSeconds(30) RequestedHeartbeat = TimeSpan.FromSeconds(30),
DispatchConsumersAsync = true
}; };
if (connectionParams.ConsumerDispatchConcurrency > 0)
connectionFactory.ConsumerDispatchConcurrency = connectionParams.ConsumerDispatchConcurrency;
if (connectionParams.ClientProperties != null) if (connectionParams.ClientProperties != null)
foreach (var pair in connectionParams.ClientProperties) foreach (var pair in connectionParams.ClientProperties)
{ {
@ -844,7 +853,7 @@ namespace Tapeti.Connection
} }
if (config.Features.PublisherConfirms) if (config.GetFeatures().PublisherConfirms)
{ {
lastDeliveryTag = 0; lastDeliveryTag = 0;

View File

@ -60,7 +60,7 @@ namespace Tapeti.Connection
Exchange = exchange, Exchange = exchange,
RoutingKey = routingKey, RoutingKey = routingKey,
Properties = properties Properties = properties
}); }).ConfigureAwait(false);
} }
catch (Exception dispatchException) catch (Exception dispatchException)
{ {
@ -78,7 +78,7 @@ namespace Tapeti.Connection
}; };
var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException); var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException);
await HandleException(exceptionContext); await HandleException(exceptionContext).ConfigureAwait(false);
return exceptionContext.ConsumeResult; return exceptionContext.ConsumeResult;
} }
@ -93,7 +93,7 @@ namespace Tapeti.Connection
foreach (var binding in bindings.Where(binding => binding.Accept(messageType))) foreach (var binding in bindings.Where(binding => binding.Accept(messageType)))
{ {
var consumeResult = await InvokeUsingBinding(message, messageContextData, binding); var consumeResult = await InvokeUsingBinding(message, messageContextData, binding).ConfigureAwait(false);
validMessageType = true; validMessageType = true;
if (consumeResult != ConsumeResult.Success) if (consumeResult != ConsumeResult.Success)
@ -125,18 +125,18 @@ namespace Tapeti.Connection
try try
{ {
await MiddlewareHelper.GoAsync(config.Middleware.Message, await MiddlewareHelper.GoAsync(config.Middleware.Message,
async (handler, next) => await handler.Handle(context, next), async (handler, next) => await handler.Handle(context, next).ConfigureAwait(false),
async () => { await binding.Invoke(context); }); async () => { await binding.Invoke(context).ConfigureAwait(false); });
await binding.Cleanup(context, ConsumeResult.Success); await binding.Cleanup(context, ConsumeResult.Success).ConfigureAwait(false);
return ConsumeResult.Success; return ConsumeResult.Success;
} }
catch (Exception invokeException) catch (Exception invokeException)
{ {
var exceptionContext = new ExceptionStrategyContext(context, invokeException); var exceptionContext = new ExceptionStrategyContext(context, invokeException);
await HandleException(exceptionContext); await HandleException(exceptionContext).ConfigureAwait(false);
await binding.Cleanup(context, exceptionContext.ConsumeResult); await binding.Cleanup(context, exceptionContext.ConsumeResult).ConfigureAwait(false);
return exceptionContext.ConsumeResult; return exceptionContext.ConsumeResult;
} }
} }
@ -153,7 +153,7 @@ namespace Tapeti.Connection
try try
{ {
await exceptionStrategy.HandleException(exceptionContext); await exceptionStrategy.HandleException(exceptionContext).ConfigureAwait(false);
} }
catch (Exception strategyException) catch (Exception strategyException)
{ {

View File

@ -34,21 +34,21 @@ namespace Tapeti.Connection
/// <inheritdoc /> /// <inheritdoc />
public async Task Publish(object message) public async Task Publish(object message)
{ {
await Publish(message, null, IsMandatory(message)); await Publish(message, null, IsMandatory(message)).ConfigureAwait(false);
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Action<TResponse>>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Action<TResponse>>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class
{ {
await PublishRequest(message, responseMethodSelector.Body); await PublishRequest(message, responseMethodSelector.Body).ConfigureAwait(false);
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Func<TResponse, Task>>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class public async Task PublishRequest<TController, TRequest, TResponse>(TRequest message, Expression<Func<TController, Func<TResponse, Task>>> responseMethodSelector) where TController : class where TRequest : class where TResponse : class
{ {
await PublishRequest(message, responseMethodSelector.Body); await PublishRequest(message, responseMethodSelector.Body).ConfigureAwait(false);
} }
@ -87,14 +87,14 @@ namespace Tapeti.Connection
ReplyTo = binding.QueueName ReplyTo = binding.QueueName
}; };
await Publish(message, properties, IsMandatory(message)); await Publish(message, properties, IsMandatory(message)).ConfigureAwait(false);
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task SendToQueue(string queueName, object message) public async Task SendToQueue(string queueName, object message)
{ {
await PublishDirect(message, queueName, null, IsMandatory(message)); await PublishDirect(message, queueName, null, IsMandatory(message)).ConfigureAwait(false);
} }
@ -105,14 +105,14 @@ namespace Tapeti.Connection
var exchange = exchangeStrategy.GetExchange(messageClass); var exchange = exchangeStrategy.GetExchange(messageClass);
var routingKey = routingKeyStrategy.GetRoutingKey(messageClass); var routingKey = routingKeyStrategy.GetRoutingKey(messageClass);
await Publish(message, properties, exchange, routingKey, mandatory); await Publish(message, properties, exchange, routingKey, mandatory).ConfigureAwait(false);
} }
/// <inheritdoc /> /// <inheritdoc />
public async Task PublishDirect(object message, string queueName, IMessageProperties? properties, bool mandatory) public async Task PublishDirect(object message, string queueName, IMessageProperties? properties, bool mandatory)
{ {
await Publish(message, properties, null, queueName, mandatory); await Publish(message, properties, null, queueName, mandatory).ConfigureAwait(false);
} }
@ -136,12 +136,12 @@ namespace Tapeti.Connection
await MiddlewareHelper.GoAsync( await MiddlewareHelper.GoAsync(
config.Middleware.Publish, config.Middleware.Publish,
async (handler, next) => await handler.Handle(context, next), async (handler, next) => await handler.Handle(context, next).ConfigureAwait(false),
async () => async () =>
{ {
var body = messageSerializer.Serialize(message, writableProperties); var body = messageSerializer.Serialize(message, writableProperties);
await clientFactory().Publish(body, writableProperties, exchange, routingKey, mandatory); await clientFactory().Publish(body, writableProperties, exchange, routingKey, mandatory).ConfigureAwait(false);
}); }).ConfigureAwait(false);
} }

View File

@ -29,7 +29,7 @@ namespace Tapeti.Connection
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
{ {
if (consuming) if (consuming)
await Stop(); await Stop().ConfigureAwait(false);
} }
@ -48,7 +48,7 @@ namespace Tapeti.Connection
public async Task ApplyBindings() public async Task ApplyBindings()
{ {
initializeCancellationTokenSource = new CancellationTokenSource(); initializeCancellationTokenSource = new CancellationTokenSource();
await ApplyBindings(initializeCancellationTokenSource.Token); await ApplyBindings(initializeCancellationTokenSource.Token).ConfigureAwait(false);
} }
@ -81,10 +81,10 @@ namespace Tapeti.Connection
Task.Run(async () => Task.Run(async () =>
{ {
await ApplyBindings(cancellationToken); await ApplyBindings(cancellationToken).ConfigureAwait(false);
if (consuming && !cancellationToken.IsCancellationRequested) if (consuming && !cancellationToken.IsCancellationRequested)
await ConsumeQueues(cancellationToken); await ConsumeQueues(cancellationToken).ConfigureAwait(false);
}, CancellationToken.None); }, CancellationToken.None);
} }
@ -98,7 +98,7 @@ namespace Tapeti.Connection
consuming = true; consuming = true;
initializeCancellationTokenSource = new CancellationTokenSource(); initializeCancellationTokenSource = new CancellationTokenSource();
await ConsumeQueues(initializeCancellationTokenSource.Token); await ConsumeQueues(initializeCancellationTokenSource.Token).ConfigureAwait(false);
} }
@ -111,7 +111,7 @@ namespace Tapeti.Connection
initializeCancellationTokenSource?.Cancel(); initializeCancellationTokenSource?.Cancel();
initializeCancellationTokenSource = null; initializeCancellationTokenSource = null;
await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag))); await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag))).ConfigureAwait(false);
consumerTags.Clear(); consumerTags.Clear();
consuming = false; consuming = false;
@ -125,17 +125,17 @@ namespace Tapeti.Connection
CustomBindingTarget bindingTarget; CustomBindingTarget bindingTarget;
if (config.Features.DeclareDurableQueues) if (config.GetFeatures().DeclareDurableQueues)
bindingTarget = new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); bindingTarget = new DeclareDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
else if (config.Features.VerifyDurableQueues) else if (config.GetFeatures().VerifyDurableQueues)
bindingTarget = new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); bindingTarget = new PassiveDurableQueuesBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
else else
bindingTarget = new NoVerifyBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken); bindingTarget = new NoVerifyBindingTarget(clientFactory, routingKeyStrategy, exchangeStrategy, cancellationToken);
foreach (var binding in config.Bindings) foreach (var binding in config.Bindings)
await binding.Apply(bindingTarget); await binding.Apply(bindingTarget).ConfigureAwait(false);
await bindingTarget.Apply(); await bindingTarget.Apply().ConfigureAwait(false);
} }
@ -155,8 +155,8 @@ namespace Tapeti.Connection
var queueName = group.Key; var queueName = group.Key;
var consumer = new TapetiConsumer(cancellationToken, config, queueName, group); var consumer = new TapetiConsumer(cancellationToken, config, queueName, group);
return await clientFactory().Consume(queueName, consumer, cancellationToken); return await clientFactory().Consume(queueName, consumer, cancellationToken).ConfigureAwait(false);
}))) })).ConfigureAwait(false))
.Where(t => t?.ConsumerTag != null) .Where(t => t?.ConsumerTag != null)
.Cast<TapetiConsumerTag>()); .Cast<TapetiConsumerTag>());
} }
@ -201,14 +201,14 @@ namespace Tapeti.Connection
public async ValueTask<string> BindDynamic(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments) public async ValueTask<string> BindDynamic(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments)
{ {
var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments); var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments).ConfigureAwait(false);
if (!result.IsNewMessageClass) if (!result.IsNewMessageClass)
return result.QueueName; return result.QueueName;
var routingKey = RoutingKeyStrategy.GetRoutingKey(messageClass); var routingKey = RoutingKeyStrategy.GetRoutingKey(messageClass);
var exchange = ExchangeStrategy.GetExchange(messageClass); var exchange = ExchangeStrategy.GetExchange(messageClass);
await ClientFactory().DynamicQueueBind(result.QueueName, new QueueBinding(exchange, routingKey), CancellationToken); await ClientFactory().DynamicQueueBind(result.QueueName, new QueueBinding(exchange, routingKey), CancellationToken).ConfigureAwait(false);
return result.QueueName; return result.QueueName;
} }
@ -216,7 +216,7 @@ namespace Tapeti.Connection
public async ValueTask<string> BindDynamicDirect(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments) public async ValueTask<string> BindDynamicDirect(Type messageClass, string? queuePrefix, IRabbitMQArguments? arguments)
{ {
var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments); var result = await DeclareDynamicQueue(messageClass, queuePrefix, arguments).ConfigureAwait(false);
return result.QueueName; return result.QueueName;
} }
@ -225,7 +225,7 @@ namespace Tapeti.Connection
{ {
// If we don't know the routing key, always create a new queue to ensure there is no overlap. // If we don't know the routing key, always create a new queue to ensure there is no overlap.
// Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either. // Keep it out of the dynamicQueues dictionary, so it can't be re-used later on either.
return await ClientFactory().DynamicQueueDeclare(queuePrefix, arguments, CancellationToken); return await ClientFactory().DynamicQueueDeclare(queuePrefix, arguments, CancellationToken).ConfigureAwait(false);
} }
@ -267,7 +267,7 @@ namespace Tapeti.Connection
} }
// Declare a new queue // Declare a new queue
var queueName = await ClientFactory().DynamicQueueDeclare(queuePrefix, arguments, CancellationToken); var queueName = await ClientFactory().DynamicQueueDeclare(queuePrefix, arguments, CancellationToken).ConfigureAwait(false);
var queueInfo = new DynamicQueueInfo var queueInfo = new DynamicQueueInfo
{ {
QueueName = queueName, QueueName = queueName,
@ -363,8 +363,8 @@ namespace Tapeti.Connection
public override async Task Apply() public override async Task Apply()
{ {
var client = ClientFactory(); var client = ClientFactory();
await DeclareQueues(client); await DeclareQueues(client).ConfigureAwait(false);
await DeleteObsoleteQueues(client); await DeleteObsoleteQueues(client).ConfigureAwait(false);
} }
@ -380,8 +380,8 @@ namespace Tapeti.Connection
return new QueueBinding(exchange, routingKey); return new QueueBinding(exchange, routingKey);
}); });
await client.DurableQueueDeclare(queue.Key, bindings, queue.Value.Arguments, CancellationToken); await client.DurableQueueDeclare(queue.Key, bindings, queue.Value.Arguments, CancellationToken).ConfigureAwait(false);
})); })).ConfigureAwait(false);
} }
@ -389,8 +389,8 @@ namespace Tapeti.Connection
{ {
await Task.WhenAll(obsoleteDurableQueues.Except(durableQueues.Keys).Select(async queue => await Task.WhenAll(obsoleteDurableQueues.Except(durableQueues.Keys).Select(async queue =>
{ {
await client.DurableQueueDelete(queue, true, CancellationToken); await client.DurableQueueDelete(queue, true, CancellationToken).ConfigureAwait(false);
})); })).ConfigureAwait(false);
} }
} }
@ -407,12 +407,12 @@ namespace Tapeti.Connection
public override async ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments) public override async ValueTask BindDurable(Type messageClass, string queueName, IRabbitMQArguments? arguments)
{ {
await VerifyDurableQueue(queueName, arguments); await VerifyDurableQueue(queueName, arguments).ConfigureAwait(false);
} }
public override async ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments) public override async ValueTask BindDurableDirect(string queueName, IRabbitMQArguments? arguments)
{ {
await VerifyDurableQueue(queueName, arguments); await VerifyDurableQueue(queueName, arguments).ConfigureAwait(false);
} }
public override ValueTask BindDurableObsolete(string queueName) public override ValueTask BindDurableObsolete(string queueName)
@ -426,7 +426,7 @@ namespace Tapeti.Connection
if (!durableQueues.Add(queueName)) if (!durableQueues.Add(queueName))
return; return;
await ClientFactory().DurableQueueVerify(queueName, arguments, CancellationToken); await ClientFactory().DurableQueueVerify(queueName, arguments, CancellationToken).ConfigureAwait(false);
} }
} }

View File

@ -117,10 +117,10 @@ namespace Tapeti.Default
{ {
case BindingTargetMode.Default: case BindingTargetMode.Default:
if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Dynamic) if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Dynamic)
QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); QueueName = await target.BindDynamic(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments).ConfigureAwait(false);
else else
{ {
await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments); await target.BindDurable(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments).ConfigureAwait(false);
QueueName = bindingInfo.QueueInfo.Name; QueueName = bindingInfo.QueueInfo.Name;
} }
@ -128,10 +128,10 @@ namespace Tapeti.Default
case BindingTargetMode.Direct: case BindingTargetMode.Direct:
if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Dynamic) if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Dynamic)
QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments); QueueName = await target.BindDynamicDirect(bindingInfo.MessageClass, bindingInfo.QueueInfo.Name, bindingInfo.QueueInfo.QueueArguments).ConfigureAwait(false);
else else
{ {
await target.BindDurableDirect(bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments); await target.BindDurableDirect(bindingInfo.QueueInfo.Name!, bindingInfo.QueueInfo.QueueArguments).ConfigureAwait(false);
QueueName = bindingInfo.QueueInfo.Name; QueueName = bindingInfo.QueueInfo.Name;
} }
@ -143,7 +143,7 @@ namespace Tapeti.Default
} }
else if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Durable) else if (bindingInfo.QueueInfo.QueueType == Config.QueueType.Durable)
{ {
await target.BindDurableObsolete(bindingInfo.QueueInfo.Name!); await target.BindDurableObsolete(bindingInfo.QueueInfo.Name!).ConfigureAwait(false);
QueueName = bindingInfo.QueueInfo.Name; QueueName = bindingInfo.QueueInfo.Name;
} }
} }
@ -165,14 +165,14 @@ namespace Tapeti.Default
var controller = Method.IsStatic ? null : dependencyResolver.Resolve(bindingInfo.ControllerType); var controller = Method.IsStatic ? null : dependencyResolver.Resolve(bindingInfo.ControllerType);
context.Store(new ControllerMessageContextPayload(controller, (IControllerMethodBinding)context.Binding)); context.Store(new ControllerMessageContextPayload(controller, (IControllerMethodBinding)context.Binding));
if (!await FilterAllowed(context)) if (!await FilterAllowed(context).ConfigureAwait(false))
return; return;
await MiddlewareHelper.GoAsync( await MiddlewareHelper.GoAsync(
bindingInfo.MessageMiddleware, bindingInfo.MessageMiddleware,
async (handler, next) => await handler.Handle(context, next), async (handler, next) => await handler.Handle(context, next).ConfigureAwait(false),
async () => await messageHandler(context)); async () => await messageHandler(context).ConfigureAwait(false)).ConfigureAwait(false);
} }
@ -181,8 +181,8 @@ namespace Tapeti.Default
{ {
await MiddlewareHelper.GoAsync( await MiddlewareHelper.GoAsync(
bindingInfo.CleanupMiddleware, bindingInfo.CleanupMiddleware,
async (handler, next) => await handler.Cleanup(context, consumeResult, next), async (handler, next) => await handler.Cleanup(context, consumeResult, next).ConfigureAwait(false),
() => default); () => default).ConfigureAwait(false);
} }
@ -191,12 +191,12 @@ namespace Tapeti.Default
var allowed = false; var allowed = false;
await MiddlewareHelper.GoAsync( await MiddlewareHelper.GoAsync(
bindingInfo.FilterMiddleware, bindingInfo.FilterMiddleware,
async (handler, next) => await handler.Filter(context, next), async (handler, next) => await handler.Filter(context, next).ConfigureAwait(false),
() => () =>
{ {
allowed = true; allowed = true;
return default; return default;
}); }).ConfigureAwait(false);
return allowed; return allowed;
} }

View File

@ -79,7 +79,7 @@ namespace Tapeti.Default
switch (payload) switch (payload)
{ {
case IAsyncDisposable asyncDisposable: case IAsyncDisposable asyncDisposable:
await asyncDisposable.DisposeAsync(); await asyncDisposable.DisposeAsync().ConfigureAwait(false);
break; break;
case IDisposable disposable: case IDisposable disposable:
@ -151,7 +151,7 @@ namespace Tapeti.Default
foreach (var item in items.Values) foreach (var item in items.Values)
{ {
if (item is IAsyncDisposable asyncDisposable) if (item is IAsyncDisposable asyncDisposable)
await asyncDisposable.DisposeAsync(); await asyncDisposable.DisposeAsync().ConfigureAwait(false);
} }
} }
} }

View File

@ -0,0 +1,40 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Tapeti.Helpers;
namespace Tapeti.Default
{
/// <inheritdoc />
public class MessageHandlerTracker : IMessageHandlerTracker
{
private volatile int runningCount;
private readonly ManualResetEventSlim idleEvent = new(true);
/// <inheritdoc />
public void Enter()
{
if (Interlocked.Increment(ref runningCount) == 1)
idleEvent.Reset();
}
/// <inheritdoc />
public void Exit()
{
if (Interlocked.Decrement(ref runningCount) == 0)
idleEvent.Set();
}
/// <summary>
/// Waits for the amount of currently running message handlers to reach zero.
/// </summary>
/// <param name="timeoutMilliseconds">The timeout after which an OperationCanceledException is thrown.</param>
public Task WaitForIdle(int timeoutMilliseconds)
{
return idleEvent.WaitHandle.WaitOneAsync(CancellationToken.None, timeoutMilliseconds);
}
}
}

View File

@ -75,15 +75,15 @@ namespace Tapeti.Default
private static async ValueTask PublishGenericTaskResult<T>(IMessageContext messageContext, object value) where T : class private static async ValueTask PublishGenericTaskResult<T>(IMessageContext messageContext, object value) where T : class
{ {
var message = await (Task<T>)value; var message = await ((Task<T>)value).ConfigureAwait(false);
await Reply(message, messageContext); await Reply(message, messageContext).ConfigureAwait(false);
} }
private static async ValueTask PublishGenericValueTaskResult<T>(IMessageContext messageContext, object value) where T : class private static async ValueTask PublishGenericValueTaskResult<T>(IMessageContext messageContext, object value) where T : class
{ {
var message = await (ValueTask<T>)value; var message = await ((ValueTask<T>)value).ConfigureAwait(false);
await Reply(message, messageContext); await Reply(message, messageContext).ConfigureAwait(false);
} }
@ -99,9 +99,9 @@ namespace Tapeti.Default
}; };
if (!string.IsNullOrEmpty(messageContext.Properties.ReplyTo)) if (!string.IsNullOrEmpty(messageContext.Properties.ReplyTo))
await publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, messageContext.Properties.Persistent.GetValueOrDefault(true)); await publisher.PublishDirect(message, messageContext.Properties.ReplyTo, properties, messageContext.Properties.Persistent.GetValueOrDefault(true)).ConfigureAwait(false);
else else
await publisher.Publish(message, properties, false); await publisher.Publish(message, properties, false).ConfigureAwait(false);
} }
} }
} }

View File

@ -31,7 +31,7 @@ namespace Tapeti.Default
return; return;
} }
await next(); await next().ConfigureAwait(false);
} }
} }
} }

View File

@ -126,6 +126,7 @@ namespace Tapeti.Helpers
case "password": result.Password = value; break; case "password": result.Password = value; break;
case "prefetchcount": result.PrefetchCount = ushort.Parse(value); break; case "prefetchcount": result.PrefetchCount = ushort.Parse(value); break;
case "managementport": result.ManagementPort = int.Parse(value); break; case "managementport": result.ManagementPort = int.Parse(value); break;
case "consumerDispatchConcurrency": result.ConsumerDispatchConcurrency = int.Parse(value); break;
} }
} }
} }

View File

@ -50,7 +50,7 @@ namespace Tapeti.Helpers
var handlerIndex = middleware?.Count - 1 ?? -1; var handlerIndex = middleware?.Count - 1 ?? -1;
if (middleware == null || handlerIndex == -1) if (middleware == null || handlerIndex == -1)
{ {
await lastHandler(); await lastHandler().ConfigureAwait(false);
return; return;
} }
@ -58,12 +58,12 @@ namespace Tapeti.Helpers
{ {
handlerIndex--; handlerIndex--;
if (handlerIndex >= 0) if (handlerIndex >= 0)
await handle(middleware[handlerIndex], HandleNext); await handle(middleware[handlerIndex], HandleNext).ConfigureAwait(false);
else else
await lastHandler(); await lastHandler().ConfigureAwait(false);
} }
await handle(middleware[handlerIndex], HandleNext); await handle(middleware[handlerIndex], HandleNext).ConfigureAwait(false);
} }
} }
} }

View File

@ -0,0 +1,52 @@
using System.Threading.Tasks;
using System.Threading;
using System;
namespace Tapeti.Helpers
{
/// <summary>
/// Provides a WaitOneAsync method for <see cref="WaitHandle"/>.
/// </summary>
public static class WaitHandleExtensions
{
/// <summary>
/// Provides a way to wait for a WaitHandle asynchronously.
/// </summary>
/// <remarks>
/// Credit: <see href="https://stackoverflow.com/a/68632819"/>
/// </remarks>
public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
{
if (waitHandle == null)
throw new ArgumentNullException(nameof(waitHandle));
var tcs = new TaskCompletionSource<bool>();
var ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
var timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;
var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
(_, timedOut) =>
{
if (timedOut)
{
tcs.TrySetCanceled();
}
else
{
tcs.TrySetResult(true);
}
},
null, timeout, true);
var task = tcs.Task;
_ = task.ContinueWith(_ =>
{
rwh.Unregister(null);
return ctr.Unregister();
}, CancellationToken.None);
return task;
}
}
}

View File

@ -84,6 +84,12 @@ namespace Tapeti
ISubscriber SubscribeSync(bool startConsuming = true); ISubscriber SubscribeSync(bool startConsuming = true);
/// <summary>
/// Stops the current subscriber.
/// </summary>
Task Unsubscribe();
/// <summary> /// <summary>
/// Returns an IPublisher implementation for the current connection. /// Returns an IPublisher implementation for the current connection.
/// </summary> /// </summary>

View File

@ -0,0 +1,18 @@
namespace Tapeti
{
/// <summary>
/// Tracks the number of currently running message handlers.
/// </summary>
public interface IMessageHandlerTracker
{
/// <summary>
/// Registers the start of a message handler.
/// </summary>
void Enter();
/// <summary>
/// Registers the end of a message handler.
/// </summary>
void Exit();
}
}

View File

@ -22,7 +22,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2022.*" /> <PackageReference Include="JetBrains.Annotations" Version="2022.*" />
<PackageReference Include="Newtonsoft.Json" Version="13.*" /> <PackageReference Include="Newtonsoft.Json" Version="13.*" />
<PackageReference Include="RabbitMQ.Client" Version="[6.5]" /> <PackageReference Include="RabbitMQ.Client" Version="[6.8.1]" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
@ -34,6 +34,6 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" /> <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="Tapeti.Annotations" Version="3.*-*" /> <PackageReference Include="Tapeti.Annotations" Version="3.*" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -135,7 +135,7 @@ namespace Tapeti
/// <inheritdoc /> /// <inheritdoc />
public ITapetiConfigBuilder DisablePublisherConfirms() public ITapetiConfigBuilder DisablePublisherConfirms()
{ {
GetConfig().SetPublisherConfirms(false); GetConfig().GetFeaturesBuilder().DisablePublisherConfirms();
return this; return this;
} }
@ -143,7 +143,7 @@ namespace Tapeti
/// <inheritdoc /> /// <inheritdoc />
public ITapetiConfigBuilder SetPublisherConfirms(bool enabled) public ITapetiConfigBuilder SetPublisherConfirms(bool enabled)
{ {
GetConfig().SetPublisherConfirms(enabled); GetConfig().GetFeaturesBuilder().SetPublisherConfirms(enabled);
return this; return this;
} }
@ -151,7 +151,7 @@ namespace Tapeti
/// <inheritdoc /> /// <inheritdoc />
public ITapetiConfigBuilder EnableDeclareDurableQueues() public ITapetiConfigBuilder EnableDeclareDurableQueues()
{ {
GetConfig().SetDeclareDurableQueues(true); GetConfig().GetFeaturesBuilder().EnableDeclareDurableQueues();
return this; return this;
} }
@ -159,7 +159,7 @@ namespace Tapeti
/// <inheritdoc /> /// <inheritdoc />
public ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled) public ITapetiConfigBuilder SetDeclareDurableQueues(bool enabled)
{ {
GetConfig().SetDeclareDurableQueues(enabled); GetConfig().GetFeaturesBuilder().SetDeclareDurableQueues(enabled);
return this; return this;
} }
@ -167,7 +167,7 @@ namespace Tapeti
/// <inheritdoc /> /// <inheritdoc />
public ITapetiConfigBuilder DisableVerifyDurableQueues() public ITapetiConfigBuilder DisableVerifyDurableQueues()
{ {
GetConfig().SetVerifyDurableQueues(false); GetConfig().GetFeaturesBuilder().DisablePublisherConfirms();
return this; return this;
} }
@ -175,7 +175,15 @@ namespace Tapeti
/// <inheritdoc /> /// <inheritdoc />
public ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled) public ITapetiConfigBuilder SetVerifyDurableQueues(bool enabled)
{ {
GetConfig().SetVerifyDurableQueues(enabled); GetConfig().GetFeaturesBuilder().SetVerifyDurableQueues(enabled);
return this;
}
/// <inheritdoc />
public ITapetiConfigBuilder DelayFeatures(Action<ITapetiConfigFeaturesBuilder> onBuild)
{
GetConfig().GetFeaturesBuilder().DelayFeatures(onBuild);
return this; return this;
} }
@ -221,12 +229,13 @@ namespace Tapeti
/// <inheritdoc /> /// <inheritdoc />
internal class Config : ITapetiConfig internal class Config : ITapetiConfig
{ {
private readonly ConfigFeatures features = new(); private ConfigFeaturesBuilder? featuresBuilder = new();
private ITapetiConfigFeatures? features;
private readonly ConfigMiddleware middleware = new(); private readonly ConfigMiddleware middleware = new();
private readonly ConfigBindings bindings = new(); private readonly ConfigBindings bindings = new();
public IDependencyResolver DependencyResolver { get; } public IDependencyResolver DependencyResolver { get; }
public ITapetiConfigFeatues Features => features;
public ITapetiConfigMiddleware Middleware => middleware; public ITapetiConfigMiddleware Middleware => middleware;
public ITapetiConfigBindings Bindings => bindings; public ITapetiConfigBindings Bindings => bindings;
@ -237,6 +246,17 @@ namespace Tapeti
} }
public ITapetiConfigFeatures GetFeatures()
{
if (features != null)
return features;
features = featuresBuilder!.Build();
featuresBuilder = null;
return features;
}
public void Lock() public void Lock()
{ {
bindings.Lock(); bindings.Lock();
@ -260,24 +280,17 @@ namespace Tapeti
} }
public void SetPublisherConfirms(bool enabled) internal ConfigFeaturesBuilder GetFeaturesBuilder()
{ {
features.PublisherConfirms = enabled; if (featuresBuilder == null)
} throw new InvalidOperationException("Tapeti features are already frozen");
public void SetDeclareDurableQueues(bool enabled) return featuresBuilder;
{
features.DeclareDurableQueues = enabled;
}
public void SetVerifyDurableQueues(bool enabled)
{
features.VerifyDurableQueues = enabled;
} }
} }
internal class ConfigFeatures : ITapetiConfigFeatues internal class ConfigFeatures : ITapetiConfigFeatures
{ {
public bool PublisherConfirms { get; internal set; } = true; public bool PublisherConfirms { get; internal set; } = true;
public bool DeclareDurableQueues { get; internal set; } public bool DeclareDurableQueues { get; internal set; }
@ -285,6 +298,66 @@ namespace Tapeti
} }
internal class ConfigFeaturesBuilder : ITapetiConfigFeaturesBuilder
{
private bool publisherConfirms = true;
private bool declareDurableQueues;
private bool verifyDurableQueues = true;
private Action<ITapetiConfigFeaturesBuilder>? onBuild;
public ITapetiConfigFeaturesBuilder DisablePublisherConfirms()
{
return SetPublisherConfirms(false);
}
public ITapetiConfigFeaturesBuilder SetPublisherConfirms(bool enabled)
{
publisherConfirms = enabled;
return this;
}
public ITapetiConfigFeaturesBuilder EnableDeclareDurableQueues()
{
return SetDeclareDurableQueues(true);
}
public ITapetiConfigFeaturesBuilder SetDeclareDurableQueues(bool enabled)
{
declareDurableQueues = enabled;
return this;
}
public ITapetiConfigFeaturesBuilder DisableVerifyDurableQueues()
{
return SetVerifyDurableQueues(false);
}
public ITapetiConfigFeaturesBuilder SetVerifyDurableQueues(bool enabled)
{
verifyDurableQueues = enabled;
return this;
}
// ReSharper disable once ParameterHidesMember
public void DelayFeatures(Action<ITapetiConfigFeaturesBuilder> onBuild)
{
this.onBuild = onBuild;
}
public ITapetiConfigFeatures Build()
{
onBuild?.Invoke(this);
return new ConfigFeatures
{
DeclareDurableQueues = declareDurableQueues,
PublisherConfirms = publisherConfirms,
VerifyDurableQueues = verifyDurableQueues
};
}
}
internal class ConfigMiddleware : ITapetiConfigMiddleware internal class ConfigMiddleware : ITapetiConfigMiddleware
{ {
private readonly List<IMessageMiddleware> messageMiddleware = new(); private readonly List<IMessageMiddleware> messageMiddleware = new();

View File

@ -63,11 +63,11 @@ namespace Tapeti
if (subscriber == null) if (subscriber == null)
{ {
subscriber = new TapetiSubscriber(() => client.Value, config); subscriber = new TapetiSubscriber(() => client.Value, config);
await subscriber.ApplyBindings(); await subscriber.ApplyBindings().ConfigureAwait(false);
} }
if (startConsuming) if (startConsuming)
await subscriber.Resume(); await subscriber.Resume().ConfigureAwait(false);
return subscriber; return subscriber;
} }
@ -80,6 +80,13 @@ namespace Tapeti
} }
/// <inheritdoc />
public Task Unsubscribe()
{
return subscriber?.Stop() ?? Task.CompletedTask;
}
/// <inheritdoc /> /// <inheritdoc />
public IPublisher GetPublisher() public IPublisher GetPublisher()
{ {
@ -91,28 +98,36 @@ namespace Tapeti
public async Task Close() public async Task Close()
{ {
if (client.IsValueCreated) if (client.IsValueCreated)
await client.Value.Close(); await client.Value.Close().ConfigureAwait(false);
} }
/// <inheritdoc /> /// <inheritdoc />
public void Dispose() public void Dispose()
{ {
if (!disposed) GC.SuppressFinalize(this);
DisposeAsync().GetAwaiter().GetResult();
if (disposed)
return;
var disposeAsyncTask = DisposeAsync();
if (!disposeAsyncTask.IsCompleted)
disposeAsyncTask.AsTask().GetAwaiter().GetResult();
} }
/// <inheritdoc /> /// <inheritdoc />
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
{ {
GC.SuppressFinalize(this);
if (disposed) if (disposed)
return; return;
if (subscriber != null) if (subscriber != null)
await subscriber.DisposeAsync(); await subscriber.DisposeAsync().ConfigureAwait(false);
await Close(); await Close().ConfigureAwait(false);
disposed = true; disposed = true;
} }

View File

@ -50,6 +50,17 @@ namespace Tapeti
/// </summary> /// </summary>
public int ManagementPort { get; set; } = 15672; public int ManagementPort { get; set; } = 15672;
/// <summary>
/// The maximum number of consumers which are run concurrently.
/// </summary>
/// <remarks>
/// The number of consumers is usually roughly equal to the number of queues consumed.
/// Do not set too high to avoid overloading the thread pool.
/// The RabbitMQ Client library defaults to 1. Due to older Tapeti versions implementing concurrency
/// effectively limited by the PrefetchCount, this will default to Environment.ProcessorCount instead.
/// </remarks>
public int ConsumerDispatchConcurrency { get; set; }
/// <summary> /// <summary>
/// Key-value pair of properties that are set on the connection. These will be visible in the RabbitMQ Management interface. /// Key-value pair of properties that are set on the connection. These will be visible in the RabbitMQ Management interface.
/// Note that you can either set a new dictionary entirely, to allow for inline declaration, or use this property directly /// Note that you can either set a new dictionary entirely, to allow for inline declaration, or use this property directly
@ -69,6 +80,7 @@ namespace Tapeti
/// </summary> /// </summary>
public TapetiConnectionParams() public TapetiConnectionParams()
{ {
ConsumerDispatchConcurrency = Environment.ProcessorCount;
} }
/// <summary> /// <summary>
@ -77,7 +89,7 @@ namespace Tapeti
/// <example>new TapetiConnectionParams(new Uri("amqp://username:password@hostname/"))</example> /// <example>new TapetiConnectionParams(new Uri("amqp://username:password@hostname/"))</example>
/// <example>new TapetiConnectionParams(new Uri("amqp://username:password@hostname:5672/virtualHost"))</example> /// <example>new TapetiConnectionParams(new Uri("amqp://username:password@hostname:5672/virtualHost"))</example>
/// <param name="uri"></param> /// <param name="uri"></param>
public TapetiConnectionParams(Uri uri) public TapetiConnectionParams(Uri uri) : this()
{ {
HostName = uri.Host; HostName = uri.Host;
VirtualHost = string.IsNullOrEmpty(uri.AbsolutePath) ? "/" : uri.AbsolutePath; VirtualHost = string.IsNullOrEmpty(uri.AbsolutePath) ? "/" : uri.AbsolutePath;

View File

@ -44,7 +44,7 @@ configuration:
deploy: deploy:
- provider: NuGet - provider: NuGet
api_key: api_key:
secure: HJ6sQ5J8aQUCalJSppNpuEydKri1AhSLSOXDwM63xKwiTvA462KQnqmBB7gljHA3 secure: yR7Sj3XoMgWBEj2roujkdErQYgGo22X//FqpCcE4AHQ4i/EyFjqETv1hxC06GCtg
skip_symbols: false skip_symbols: false
artifact: /.*(\.|\.s)nupkg/ artifact: /.*(\.|\.s)nupkg/