diff --git a/pkg/util/parallelize/parallelism.go b/pkg/util/parallelize/parallelism.go index ae1cc370..f70a9bc3 100644 --- a/pkg/util/parallelize/parallelism.go +++ b/pkg/util/parallelize/parallelism.go @@ -18,13 +18,28 @@ package parallelize import ( "context" + "math" "k8s.io/client-go/util/workqueue" ) const parallelism = 16 +// chunkSizeFor returns a chunk size for the given number of items to use for +// parallel work. The size aims to produce good CPU utilization. +// returns max(1, min(sqrt(n), n/Parallelism)) +func chunkSizeFor(n, parallelism int) int { + s := int(math.Sqrt(float64(n))) + + if r := n/parallelism + 1; s > r { + s = r + } else if s < 1 { + s = 1 + } + return s +} + // Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms. func Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) { - workqueue.ParallelizeUntil(ctx, parallelism, pieces, doWorkPiece) + workqueue.ParallelizeUntil(ctx, parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces, parallelism))) }