Skip to content

Commit

Permalink
chore
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Feb 6, 2025
1 parent 1cb85dd commit 69a3c4f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
18 changes: 14 additions & 4 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1071,10 +1071,20 @@ fn replace_round_robin_repartition_with_on_demand(
if let Some(repartition) = context.plan.as_any().downcast_ref::<RepartitionExec>() {
if let Partitioning::RoundRobinBatch(n) = repartition.partitioning() {
let child_plan = Arc::clone(&context.children[0].plan);
context.plan = Arc::new(OnDemandRepartitionExec::try_new(
child_plan,
Partitioning::OnDemand(*n),
)?);
context.plan = if repartition.preserve_order() {
Arc::new(
OnDemandRepartitionExec::try_new(
child_plan,
Partitioning::OnDemand(*n),
)?
.with_preserve_order(),
)
} else {
Arc::new(OnDemandRepartitionExec::try_new(
child_plan,
Partitioning::OnDemand(*n),
)?)
};
return Ok(context);
}
}
Expand Down
31 changes: 20 additions & 11 deletions datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl OnDemandRepartitionExec {
}

/// Get preserve_order flag of the RepartitionExecutor
/// `true` means `SortPreservingRepartitionExec`, `false` means `RepartitionExec`
/// `true` means `SortPreservingRepartitionExec`, `false` means `OnDemandRepartitionExec`
pub fn preserve_order(&self) -> bool {
self.base.preserve_order
}
Expand All @@ -129,7 +129,7 @@ impl OnDemandRepartitionExec {
/// operator can take advantage of it.
///
/// If the input is not ordered, or has only one partition, this is a no op,
/// and the node remains a `RepartitionExec`.
/// and the node remains a `OnDemandRepartitionExec`.
pub fn with_preserve_order(mut self) -> Self {
self.base = self.base.with_preserve_order();
self
Expand Down Expand Up @@ -239,7 +239,6 @@ impl ExecutionPlan for OnDemandRepartitionExec {

let stream = futures::stream::once(async move {
let num_input_partitions = input.output_partitioning().partition_count();

let input_captured = Arc::clone(&input);
let metrics_captured = metrics.clone();
let name_captured = name.clone();
Expand Down Expand Up @@ -483,8 +482,8 @@ impl OnDemandRepartitionExec {
partition,
buffer_tx,
Arc::clone(&context),
metrics.fetch_time,
metrics.send_buffer_time,
metrics.fetch_time.clone(),
metrics.send_buffer_time.clone(),
));

// While there are still outputs to send to, keep pulling inputs
Expand Down Expand Up @@ -621,10 +620,11 @@ impl Stream for OnDemandPerPartitionStream {
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if !self.is_requested && !self.sender.is_closed() {
self.sender.try_send(self.partition).map_err(|_| {
self.sender.send_blocking(self.partition).map_err(|e| {
internal_datafusion_err!(
"Error sending partition number to the receiver for partition {}",
self.partition
"Error sending partition number to the receiver for partition {}: {}",
self.partition,
e
)
})?;
self.is_requested = true;
Expand Down Expand Up @@ -693,10 +693,11 @@ impl Stream for OnDemandRepartitionStream {
loop {
// Send partition number to input partitions
if !self.is_requested && !self.sender.is_closed() {
self.sender.try_send(self.partition).map_err(|_| {
self.sender.send_blocking(self.partition).map_err(|e| {
internal_datafusion_err!(
"Error sending partition number to the receiver for partition {}",
self.partition
"Error sending partition number to the receiver for partition {}: {}",
self.partition,
e
)
})?;
self.is_requested = true;
Expand Down Expand Up @@ -891,24 +892,32 @@ mod tests {
"| 1 |",
"| 1 |",
"| 1 |",
"| 1 |",
"| 2 |",
"| 2 |",
"| 2 |",
"| 2 |",
"| 3 |",
"| 3 |",
"| 3 |",
"| 3 |",
"| 4 |",
"| 4 |",
"| 4 |",
"| 4 |",
"| 5 |",
"| 5 |",
"| 5 |",
"| 5 |",
"| 6 |",
"| 6 |",
"| 6 |",
"| 6 |",
"| 7 |",
"| 7 |",
"| 7 |",
"| 7 |",
"| 8 |",
"| 8 |",
"| 8 |",
"| 8 |",
Expand Down

0 comments on commit 69a3c4f

Please sign in to comment.