Skip to content

Commit

Permalink
use geometric mean 200
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Jan 29, 2025
1 parent eac7490 commit 062143b
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions src/daft-physical-plan/src/ops/shuffle_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,22 @@ pub struct ShuffleExchangeFactory {
}

impl ShuffleExchangeFactory {
const PARTITION_THRESHOLD_TO_USE_PRE_SHUFFLE_MERGE: usize = 1_000_000;
const PARTITION_THRESHOLD_TO_USE_PRE_SHUFFLE_MERGE: usize = 200;

pub fn new(input: PhysicalPlanRef) -> Self {
Self { input }
}

fn should_use_pre_shuffle_merge(
&self,
input_num_partitions: usize,
target_num_partitions: usize,
) -> bool {
let total_num_partitions = input_num_partitions * target_num_partitions;
let geometric_mean = (total_num_partitions as f64).sqrt() as usize;
geometric_mean > Self::PARTITION_THRESHOLD_TO_USE_PRE_SHUFFLE_MERGE
}

fn get_shuffle_strategy(
&self,
clustering_spec: Arc<ClusteringSpec>,
Expand All @@ -135,10 +145,10 @@ impl ShuffleExchangeFactory {
}

Check warning on line 145 in src/daft-physical-plan/src/ops/shuffle_exchange.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-physical-plan/src/ops/shuffle_exchange.rs#L143-L145

Added lines #L143 - L145 were not covered by tests
}
Some(cfg) if cfg.shuffle_algorithm == "auto" => {
let intermediate_num_partitions = self.input.clustering_spec().num_partitions()
* clustering_spec.num_partitions();
if intermediate_num_partitions > Self::PARTITION_THRESHOLD_TO_USE_PRE_SHUFFLE_MERGE
{
if self.should_use_pre_shuffle_merge(
self.input.clustering_spec().num_partitions(),
clustering_spec.num_partitions(),
) {
ShuffleExchangeStrategy::MapReduceWithPreShuffleMerge {
target_spec: clustering_spec,
pre_shuffle_merge_threshold: cfg.pre_shuffle_merge_threshold,
Expand All @@ -150,10 +160,10 @@ impl ShuffleExchangeFactory {
}
}
None => {
let intermediate_num_partitions = self.input.clustering_spec().num_partitions()
* clustering_spec.num_partitions();
if intermediate_num_partitions > Self::PARTITION_THRESHOLD_TO_USE_PRE_SHUFFLE_MERGE
{
if self.should_use_pre_shuffle_merge(
self.input.clustering_spec().num_partitions(),
clustering_spec.num_partitions(),
) {
ShuffleExchangeStrategy::MapReduceWithPreShuffleMerge {
target_spec: clustering_spec,
pre_shuffle_merge_threshold: DaftExecutionConfig::default()
Expand Down

0 comments on commit 062143b

Please sign in to comment.