diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 58fddeb..6d67241 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -78,7 +78,8 @@ namespace Tapeti.Connection }; var exceptionContext = new ExceptionStrategyContext(emptyContext, dispatchException); - HandleException(exceptionContext); + await HandleException(exceptionContext); + return exceptionContext.ConsumeResult; } } @@ -132,7 +133,7 @@ namespace Tapeti.Connection catch (Exception invokeException) { var exceptionContext = new ExceptionStrategyContext(context, invokeException); - HandleException(exceptionContext); + await HandleException(exceptionContext); await binding.Cleanup(context, exceptionContext.ConsumeResult); return exceptionContext.ConsumeResult; @@ -140,7 +141,7 @@ namespace Tapeti.Connection } - private void HandleException(ExceptionStrategyContext exceptionContext) + private async Task HandleException(ExceptionStrategyContext exceptionContext) { if (cancellationToken.IsCancellationRequested && IgnoreExceptionDuringShutdown(exceptionContext.Exception)) { @@ -151,7 +152,7 @@ namespace Tapeti.Connection try { - exceptionStrategy.HandleException(exceptionContext); + await exceptionStrategy.HandleException(exceptionContext); } catch (Exception strategyException) { diff --git a/Tapeti/Default/NackExceptionStrategy.cs b/Tapeti/Default/NackExceptionStrategy.cs index 06510f2..e760e20 100644 --- a/Tapeti/Default/NackExceptionStrategy.cs +++ b/Tapeti/Default/NackExceptionStrategy.cs @@ -1,4 +1,5 @@ -using Tapeti.Config; +using System.Threading.Tasks; +using Tapeti.Config; namespace Tapeti.Default { @@ -9,9 +10,10 @@ namespace Tapeti.Default public class NackExceptionStrategy : IExceptionStrategy { /// - public void HandleException(IExceptionStrategyContext context) + public Task HandleException(IExceptionStrategyContext context) { context.SetConsumeResult(ConsumeResult.Error); + return Task.CompletedTask; } } } diff --git a/Tapeti/Default/RequeueExceptionStrategy.cs b/Tapeti/Default/RequeueExceptionStrategy.cs index 87fa8a2..f91922b 100644 --- a/Tapeti/Default/RequeueExceptionStrategy.cs +++ b/Tapeti/Default/RequeueExceptionStrategy.cs @@ -1,4 +1,5 @@ -using Tapeti.Config; +using System.Threading.Tasks; +using Tapeti.Config; // ReSharper disable UnusedMember.Global @@ -20,9 +21,10 @@ namespace Tapeti.Default public class RequeueExceptionStrategy : IExceptionStrategy { /// - public void HandleException(IExceptionStrategyContext context) + public Task HandleException(IExceptionStrategyContext context) { context.SetConsumeResult(ConsumeResult.Requeue); + return Task.CompletedTask; } } } diff --git a/Tapeti/IExceptionStrategy.cs b/Tapeti/IExceptionStrategy.cs index 5aeb5e1..b498cf1 100644 --- a/Tapeti/IExceptionStrategy.cs +++ b/Tapeti/IExceptionStrategy.cs @@ -1,4 +1,5 @@ -using Tapeti.Config; +using System.Threading.Tasks; +using Tapeti.Config; namespace Tapeti { @@ -12,6 +13,6 @@ namespace Tapeti /// /// The exception strategy context containing the necessary data including the message context and the thrown exception. /// Also proivdes methods for the exception strategy to indicate how the message should be handled. - void HandleException(IExceptionStrategyContext context); + Task HandleException(IExceptionStrategyContext context); } }