From a920d22dafda049e97bf8abcbc6365fa7303ff2f Mon Sep 17 00:00:00 2001 From: Savorboard Date: Tue, 20 Feb 2024 09:46:41 +0800 Subject: [PATCH] Supports disabling parallel message sending and modifies the default behavior to serial sending. (#1480) --- src/DotNetCore.CAP/CAP.Options.cs | 7 +++++++ .../Processor/IDispatcher.Default.cs | 17 +++++++++++++---- .../Processor/IDispatcher.PerGroup.cs | 17 +++++++++++++---- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 1178ba087..32fbb2a03 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -23,6 +23,7 @@ public CapOptions() FailedRetryInterval = 60; FailedRetryCount = 50; ConsumerThreadCount = 1; + EnablePublishParallelSend = false; EnableConsumerPrefetch = false; Extensions = new List(); Version = "v1"; @@ -96,6 +97,12 @@ public CapOptions() /// public bool EnableConsumerPrefetch { get; set; } + /// + /// If true, the message send task will be parallel execute by .net thread pool. + /// Default is false + /// + public bool EnablePublishParallelSend { get; set; } + /// /// If true then each message group will have own independent dispatching pipeline. Each pipeline use as many threads as /// value is. diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index 47bce901e..771ce9b8f 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -27,6 +27,7 @@ public class Dispatcher : IDispatcher private readonly IDataStorage _storage; private readonly PriorityQueue _schedulerQueue; private readonly bool _enablePrefetch; + private readonly bool _enableParallelSend; private Channel _publishedChannel = default!; private Channel<(MediumMessage, ConsumerExecutorDescriptor?)> _receivedChannel = default!; @@ -45,6 +46,7 @@ public Dispatcher(ILogger logger, _schedulerQueue = new PriorityQueue(); _storage = storage; _enablePrefetch = options.Value.EnableConsumerPrefetch; + _enableParallelSend = options.Value.EnablePublishParallelSend; } public async Task Start(CancellationToken stoppingToken) @@ -209,12 +211,19 @@ private async ValueTask Sending() try { var item = message; - _ = Task.Run(async () => + if (_enableParallelSend) + { + _ = Task.Run(async () => + { + var result = await _sender.SendAsync(item).ConfigureAwait(false); + if (!result.Succeeded) _logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception); + }); + } + else { var result = await _sender.SendAsync(item).ConfigureAwait(false); - if (!result.Succeeded) - _logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception); - }); + if (!result.Succeeded) _logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception); + } } catch (Exception ex) { diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs b/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs index 456292776..05be91aa7 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs @@ -28,6 +28,7 @@ internal class DispatcherPerGroup : IDispatcher private readonly IDataStorage _storage; private readonly PriorityQueue _schedulerQueue; private readonly bool _enablePrefetch; + private readonly bool _enableParallelSend; private Channel _publishedChannel = default!; private ConcurrentDictionary> _receivedChannels = default!; @@ -47,6 +48,7 @@ public DispatcherPerGroup(ILogger logger, _schedulerQueue = new PriorityQueue(); _storage = storage; _enablePrefetch = options.Value.EnableConsumerPrefetch; + _enableParallelSend = options.Value.EnablePublishParallelSend; } public async Task Start(CancellationToken stoppingToken) @@ -214,12 +216,19 @@ private async ValueTask Sending() while (_publishedChannel.Reader.TryRead(out var message)) try { - _ = Task.Run(async () => + if (_enableParallelSend) + { + _ = Task.Run(async () => + { + var result = await _sender.SendAsync(message).ConfigureAwait(false); + if (!result.Succeeded) _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception); + }); + } + else { var result = await _sender.SendAsync(message).ConfigureAwait(false); - if (!result.Succeeded) - _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception); - }); + if (!result.Succeeded) _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception); + } } catch (Exception ex) {