From 3bc975cfd6563f92e39d6e8a33a813d36b03151b Mon Sep 17 00:00:00 2001 From: Cody Peck Date: Wed, 10 Jan 2024 20:15:30 -0600 Subject: [PATCH] Allow configuration of batched sender parallelism --- src/Wolverine/Configuration/Endpoint.cs | 5 +++++ src/Wolverine/Configuration/SubscriberConfiguration.cs | 10 ++++++++++ src/Wolverine/Transports/Sending/BatchedSender.cs | 2 +- 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/Wolverine/Configuration/Endpoint.cs b/src/Wolverine/Configuration/Endpoint.cs index 02cad683a..5e7a3dff5 100644 --- a/src/Wolverine/Configuration/Endpoint.cs +++ b/src/Wolverine/Configuration/Endpoint.cs @@ -116,6 +116,11 @@ protected Endpoint(Uri uri, EndpointRole role) /// public int MessageBatchSize { get; set; } = 100; + /// + /// For endpoints that send messages in batches, this governs the maximum number + /// of concurrent outgoing batches + /// + public int MessageBatchMaxDegreeOfParallelism { get; set; } = 1; /// /// Mark whether or not the receiver for this listener should use diff --git a/src/Wolverine/Configuration/SubscriberConfiguration.cs b/src/Wolverine/Configuration/SubscriberConfiguration.cs index 7a64a5478..2f4f2b317 100644 --- a/src/Wolverine/Configuration/SubscriberConfiguration.cs +++ b/src/Wolverine/Configuration/SubscriberConfiguration.cs @@ -68,6 +68,16 @@ public T MessageBatchSize(int batchSize) add(e => e.MessageBatchSize = batchSize); return this.As(); } + + /// + /// For endpoints that send messages in batches, this governs the maximum number + /// of concurrent outgoing batches + /// + public T MessageBatchMaxDegreeOfParallelism(int batchMaxDegreeOfParallelism) + { + add(e => e.MessageBatchMaxDegreeOfParallelism = batchMaxDegreeOfParallelism); + return this.As(); + } public T DefaultSerializer(IMessageSerializer serializer) { diff --git a/src/Wolverine/Transports/Sending/BatchedSender.cs b/src/Wolverine/Transports/Sending/BatchedSender.cs index 8c14efb18..41bd4a7a1 100644 --- a/src/Wolverine/Transports/Sending/BatchedSender.cs +++ b/src/Wolverine/Transports/Sending/BatchedSender.cs @@ -29,7 +29,7 @@ public BatchedSender(Endpoint destination, ISenderProtocol protocol, Cancellatio _sender = new ActionBlock(SendBatchAsync, new ExecutionDataflowBlockOptions { - MaxDegreeOfParallelism = 1, + MaxDegreeOfParallelism = destination.MessageBatchMaxDegreeOfParallelism, CancellationToken = _cancellation, BoundedCapacity = DataflowBlockOptions.Unbounded });