Skip to content

Commit

Permalink
Allow configuration of batched sender parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
codyspeck committed Jan 11, 2024
1 parent 08eb736 commit 3bc975c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/Wolverine/Configuration/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ protected Endpoint(Uri uri, EndpointRole role)
/// </summary>
public int MessageBatchSize { get; set; } = 100;

/// <summary>
/// For endpoints that send messages in batches, this governs the maximum number
/// of concurrent outgoing batches
/// </summary>
public int MessageBatchMaxDegreeOfParallelism { get; set; } = 1;

/// <summary>
/// Mark whether or not the receiver for this listener should use
Expand Down
10 changes: 10 additions & 0 deletions src/Wolverine/Configuration/SubscriberConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public T MessageBatchSize(int batchSize)
add(e => e.MessageBatchSize = batchSize);
return this.As<T>();
}

/// <summary>
/// For endpoints that send messages in batches, this governs the maximum number
/// of concurrent outgoing batches
/// </summary>
public T MessageBatchMaxDegreeOfParallelism(int batchMaxDegreeOfParallelism)
{
add(e => e.MessageBatchMaxDegreeOfParallelism = batchMaxDegreeOfParallelism);
return this.As<T>();
}

public T DefaultSerializer(IMessageSerializer serializer)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Wolverine/Transports/Sending/BatchedSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public BatchedSender(Endpoint destination, ISenderProtocol protocol, Cancellatio

_sender = new ActionBlock<OutgoingMessageBatch>(SendBatchAsync, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
MaxDegreeOfParallelism = destination.MessageBatchMaxDegreeOfParallelism,
CancellationToken = _cancellation,
BoundedCapacity = DataflowBlockOptions.Unbounded
});
Expand Down

0 comments on commit 3bc975c

Please sign in to comment.