Skip to content

Commit

Permalink
Supports disabling parallel message sending and modifies the default …
Browse files Browse the repository at this point in the history
…behavior to serial sending. (#1480)
  • Loading branch information
yang-xiaodong committed Feb 20, 2024
1 parent 4d12dc3 commit a920d22
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 8 deletions.
7 changes: 7 additions & 0 deletions src/DotNetCore.CAP/CAP.Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public CapOptions()
FailedRetryInterval = 60;
FailedRetryCount = 50;
ConsumerThreadCount = 1;
EnablePublishParallelSend = false;
EnableConsumerPrefetch = false;
Extensions = new List<ICapOptionsExtension>();
Version = "v1";
Expand Down Expand Up @@ -96,6 +97,12 @@ public CapOptions()
/// </summary>
public bool EnableConsumerPrefetch { get; set; }

/// <summary>
/// If true, the message send task will be parallel execute by .net thread pool.
/// Default is false
/// </summary>
public bool EnablePublishParallelSend { get; set; }

/// <summary>
/// If true then each message group will have own independent dispatching pipeline. Each pipeline use as many threads as
/// <see cref="ConsumerThreadCount" /> value is.
Expand Down
17 changes: 13 additions & 4 deletions src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class Dispatcher : IDispatcher
private readonly IDataStorage _storage;
private readonly PriorityQueue<MediumMessage, long> _schedulerQueue;
private readonly bool _enablePrefetch;
private readonly bool _enableParallelSend;

private Channel<MediumMessage> _publishedChannel = default!;
private Channel<(MediumMessage, ConsumerExecutorDescriptor?)> _receivedChannel = default!;
Expand All @@ -45,6 +46,7 @@ public Dispatcher(ILogger<Dispatcher> logger,
_schedulerQueue = new PriorityQueue<MediumMessage, long>();
_storage = storage;
_enablePrefetch = options.Value.EnableConsumerPrefetch;
_enableParallelSend = options.Value.EnablePublishParallelSend;
}

public async Task Start(CancellationToken stoppingToken)
Expand Down Expand Up @@ -209,12 +211,19 @@ private async ValueTask Sending()
try
{
var item = message;
_ = Task.Run(async () =>
if (_enableParallelSend)
{
_ = Task.Run(async () =>
{
var result = await _sender.SendAsync(item).ConfigureAwait(false);
if (!result.Succeeded) _logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception);
});
}
else
{
var result = await _sender.SendAsync(item).ConfigureAwait(false);
if (!result.Succeeded)
_logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception);
});
if (!result.Succeeded) _logger.MessagePublishException(item.Origin.GetId(), result.ToString(), result.Exception);
}
}
catch (Exception ex)
{
Expand Down
17 changes: 13 additions & 4 deletions src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ internal class DispatcherPerGroup : IDispatcher
private readonly IDataStorage _storage;
private readonly PriorityQueue<MediumMessage, DateTime> _schedulerQueue;
private readonly bool _enablePrefetch;
private readonly bool _enableParallelSend;

private Channel<MediumMessage> _publishedChannel = default!;
private ConcurrentDictionary<string, Channel<(MediumMessage, ConsumerExecutorDescriptor?)>> _receivedChannels = default!;
Expand All @@ -47,6 +48,7 @@ public DispatcherPerGroup(ILogger<Dispatcher> logger,
_schedulerQueue = new PriorityQueue<MediumMessage, DateTime>();
_storage = storage;
_enablePrefetch = options.Value.EnableConsumerPrefetch;
_enableParallelSend = options.Value.EnablePublishParallelSend;
}

public async Task Start(CancellationToken stoppingToken)
Expand Down Expand Up @@ -214,12 +216,19 @@ private async ValueTask Sending()
while (_publishedChannel.Reader.TryRead(out var message))
try
{
_ = Task.Run(async () =>
if (_enableParallelSend)
{
_ = Task.Run(async () =>
{
var result = await _sender.SendAsync(message).ConfigureAwait(false);
if (!result.Succeeded) _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception);
});
}
else
{
var result = await _sender.SendAsync(message).ConfigureAwait(false);
if (!result.Succeeded)
_logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception);
});
if (!result.Succeeded) _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception);
}
}
catch (Exception ex)
{
Expand Down

0 comments on commit a920d22

Please sign in to comment.