Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor SortPushdown using the standard top-down visitor and using EquivalenceProperties #14821

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -2203,7 +2203,7 @@ fn repartition_transitively_past_sort_with_projection() -> Result<()> {
);

let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only test case change.
It's a no-op since we have only 1 partition here.

// Since this projection is trivial, increasing parallelism is not beneficial
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2242,7 +2242,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
],
expected_plan: vec![
"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]",
"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]",
Copy link
Contributor Author

@wiedld wiedld Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test change is actually ok.

The window agg is unpartitioned, and unbounded. Therefore the count value is constant across all rows; and therefore can be removed from the SortExec. Therefore we think this change is an improvement.

The reason this plan got better is because the pushdown_sorts is now recreating the sort from the eq props (that removes the constants), rather than re-attaching the existing sort node (as does main).

" WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
],
Expand All @@ -2259,7 +2259,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
],
expected_plan: vec![
"SortExec: expr=[non_nullable_col@1 DESC NULLS LAST, max@2 DESC NULLS LAST], preserve_partitioning=[false]",
"SortExec: expr=[non_nullable_col@1 DESC NULLS LAST], preserve_partitioning=[false]",
" WindowAggExec: wdw=[max: Ok(Field { name: \"max\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
],
Expand All @@ -2276,7 +2276,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
],
expected_plan: vec![
"SortExec: expr=[min@2 ASC NULLS LAST, non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]",
"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]",
" WindowAggExec: wdw=[min: Ok(Field { name: \"min\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
],
Expand All @@ -2293,7 +2293,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
],
expected_plan: vec![
"SortExec: expr=[avg@2 DESC NULLS LAST, nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
"SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
" WindowAggExec: wdw=[avg: Ok(Field { name: \"avg\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
],
Expand Down
170 changes: 100 additions & 70 deletions datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use crate::utils::{
};

use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{
ConcreteTreeNode, Transformed, TreeNode, TreeNodeRecursion,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, HashSet, JoinSide, Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
Expand Down Expand Up @@ -59,9 +57,9 @@ pub struct ParentRequirements {
pub type SortPushDown = PlanContext<ParentRequirements>;

/// Assigns the ordering requirement of the root node to the its children.
pub fn assign_initial_requirements(node: &mut SortPushDown) {
let reqs = node.plan.required_input_ordering();
for (child, requirement) in node.children.iter_mut().zip(reqs) {
pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) {
let reqs = sort_push_down.plan.required_input_ordering();
for (child, requirement) in sort_push_down.children.iter_mut().zip(reqs) {
child.data = ParentRequirements {
ordering_requirement: requirement,
// If the parent has a fetch value, assign it to the children
Expand All @@ -71,24 +69,26 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) {
}
}

pub fn pushdown_sorts(sort_pushdown: SortPushDown) -> Result<SortPushDown> {
let mut new_node = pushdown_sorts_helper(sort_pushdown)?;
while new_node.tnr == TreeNodeRecursion::Stop {
new_node = pushdown_sorts_helper(new_node.data)?;
pub fn pushdown_sorts(sort_push_down: SortPushDown) -> Result<SortPushDown> {
sort_push_down
.transform_down(pushdown_sorts_helper)
.map(|transformed| transformed.data)
}

fn min_fetch(f1: Option<usize>, f2: Option<usize>) -> Option<usize> {
match (f1, f2) {
(Some(f1), Some(f2)) => Some(f1.min(f2)),
(Some(_), _) => f1,
(_, Some(_)) => f2,
_ => None,
}
let (new_node, children) = new_node.data.take_children();
let new_children = children
.into_iter()
.map(pushdown_sorts)
.collect::<Result<_>>()?;
new_node.with_new_children(new_children)
}

fn pushdown_sorts_helper(
mut requirements: SortPushDown,
mut sort_push_down: SortPushDown,
) -> Result<Transformed<SortPushDown>> {
let plan = &requirements.plan;
let parent_reqs = requirements
let plan = &sort_push_down.plan;
let parent_reqs = sort_push_down
.data
.ordering_requirement
.clone()
Expand All @@ -98,82 +98,112 @@ fn pushdown_sorts_helper(
.ordering_satisfy_requirement(&parent_reqs);

if is_sort(plan) {
let sort_fetch = plan.fetch();
let required_ordering = plan
let current_sort_fetch = plan.fetch();
let parent_req_fetch = sort_push_down.data.fetch;

let current_plan_reqs = plan
.output_ordering()
.cloned()
.map(LexRequirement::from)
.unwrap_or_default();
if !satisfy_parent {
// Make sure this `SortExec` satisfies parent requirements:
let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default();
// It's possible current plan (`SortExec`) has a fetch value.
// And if both of them have fetch values, we should use the minimum one.
if let Some(fetch) = sort_fetch {
if let Some(requirement_fetch) = requirements.data.fetch {
requirements.data.fetch = Some(fetch.min(requirement_fetch));
}
let parent_is_stricter = plan
.equivalence_properties()
.requirements_compatible(&parent_reqs, &current_plan_reqs);
let child_is_stricter = plan
.equivalence_properties()
.requirements_compatible(&current_plan_reqs, &parent_reqs);

if !satisfy_parent && !parent_is_stricter {
// This new sort has different requirements than the ordering being pushed down.
// 1. add a `SortExec` here for the pushed down ordering (parent reqs).
// 2. continue sort pushdown, but with the new ordering of the new sort.

// remove current sort (which will be the new ordering to pushdown)
let new_reqs = current_plan_reqs;
sort_push_down = sort_push_down.children.swap_remove(0);
sort_push_down = sort_push_down.update_plan_from_children()?; // changed plan

// add back sort exec matching parent
sort_push_down =
add_sort_above(sort_push_down, parent_reqs, parent_req_fetch);

// If we have totally orthogonal sort, (2 different sorts in a row), that means the child sort
// gets immdiately re-sorted.
// e.g. Sort col1 ASC
// Sort col1 DESC
//
// Remove this redundant sort by not pushing down.
let is_orthogonal_sort =
!satisfy_parent && !parent_is_stricter && !child_is_stricter;

// make pushdown requirements be the new ones.
if !is_orthogonal_sort || current_sort_fetch.is_some() {
sort_push_down.children[0].data = ParentRequirements {
ordering_requirement: Some(new_reqs),
fetch: current_sort_fetch,
};
}
let fetch = requirements.data.fetch.or(sort_fetch);
requirements = requirements.children.swap_remove(0);
requirements = add_sort_above(requirements, sort_reqs, fetch);
};
} else {
// Don't add a SortExec
// Do update what sort requirements to keep pushing down

// We can safely get the 0th index as we are dealing with a `SortExec`.
let mut child = requirements.children.swap_remove(0);
if let Some(adjusted) =
pushdown_requirement_to_children(&child.plan, &required_ordering)?
{
let fetch = sort_fetch.or_else(|| child.plan.fetch());
for (grand_child, order) in child.children.iter_mut().zip(adjusted) {
grand_child.data = ParentRequirements {
ordering_requirement: order,
fetch,
};
// remove current sort, and get the sort's child
sort_push_down = sort_push_down.children.swap_remove(0);
sort_push_down = sort_push_down.update_plan_from_children()?; // changed plan

// set the stricter fetch
sort_push_down.data.fetch = min_fetch(current_sort_fetch, parent_req_fetch);

// set the stricter ordering
if child_is_stricter {
sort_push_down.data.ordering_requirement = Some(current_plan_reqs);
} else {
sort_push_down.data.ordering_requirement = Some(parent_reqs);
}
// Can push down requirements
child.data = ParentRequirements {
ordering_requirement: Some(required_ordering),
fetch,
};

return Ok(Transformed {
data: child,
transformed: true,
tnr: TreeNodeRecursion::Stop,
});
} else {
// Can not push down requirements
requirements.children = vec![child];
assign_initial_requirements(&mut requirements);
// recursive call to helper, so it doesn't transform_down and miss the new node (previous child of sort)
return pushdown_sorts_helper(sort_push_down);
}
} else if satisfy_parent && parent_reqs.is_empty() {
// Nothing to do.
return Ok(Transformed::no(sort_push_down));
} else if satisfy_parent {
// For non-sort operators, immediately return if parent requirements are met:
// For non-sort operators which satisfy ordering:
let reqs = plan.required_input_ordering();
for (child, order) in requirements.children.iter_mut().zip(reqs) {
let parent_req_fetch = sort_push_down.data.fetch;

for (child, order) in sort_push_down.children.iter_mut().zip(reqs) {
child.data.ordering_requirement = order;
child.data.fetch = min_fetch(parent_req_fetch, child.data.fetch);
}
} else if let Some(adjusted) = pushdown_requirement_to_children(plan, &parent_reqs)? {
// Can not satisfy the parent requirements, check whether we can push
// requirements down:
for (child, order) in requirements.children.iter_mut().zip(adjusted) {
// For operators that can take a sort pushdown.

// Continue pushdown, with updated requirements:
let parent_fetch = sort_push_down.data.fetch;
let current_fetch = plan.fetch();
for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) {
child.data.ordering_requirement = order;
child.data.fetch = min_fetch(current_fetch, parent_fetch);
}
requirements.data.ordering_requirement = None;
sort_push_down.data.ordering_requirement = None;
} else {
// Can not push down requirements, add new `SortExec`:
let sort_reqs = requirements
let sort_reqs = sort_push_down
.data
.ordering_requirement
.clone()
.unwrap_or_default();
let fetch = requirements.data.fetch;
requirements = add_sort_above(requirements, sort_reqs, fetch);
assign_initial_requirements(&mut requirements);
let fetch = sort_push_down.data.fetch;
sort_push_down = add_sort_above(sort_push_down, sort_reqs, fetch);
assign_initial_requirements(&mut sort_push_down);
}
Ok(Transformed::yes(requirements))

Ok(Transformed::yes(sort_push_down))
}

/// Calculate the pushdown ordering requirements for children.
/// If sort cannot be pushed down, return None.
fn pushdown_requirement_to_children(
plan: &Arc<dyn ExecutionPlan>,
parent_required: &LexRequirement,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};

/// This utility function adds a `SortExec` above an operator according to the
/// given ordering requirements while preserving the original partitioning.
///
/// Note that this updates the plan in both the [`PlanContext.children`] and
/// the [`PlanContext.plan`]'s children. Therefore its not required to sync
/// the child plans with [`PlanContext::update_plan_from_children`].
pub fn add_sort_above<T: Clone + Default>(
node: PlanContext<T>,
sort_requirements: LexRequirement,
Expand Down