Skip to content

Commit

Permalink
fix: have orderings include constants which are heterogenius across p…
Browse files Browse the repository at this point in the history
…artitions
  • Loading branch information
wiedld committed Feb 26, 2025
1 parent dffeaac commit 4775354
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
11 changes: 11 additions & 0 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ pub fn const_exprs_contains(
.any(|const_expr| const_expr.expr.eq(expr))
}

/// Checks whether `expr` is among in the uniform `const_exprs`.
pub fn uniform_const_exprs_contains(
const_exprs: &[ConstExpr],
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
const_exprs.iter().any(|const_expr| {
const_expr.expr.eq(expr)
&& const_expr.across_partitions() != AcrossPartitions::Heterogeneous
})
}

/// An `EquivalenceClass` is a set of [`Arc<dyn PhysicalExpr>`]s that are known
/// to have the same value for all tuples in a relation. These are generated by
/// equality predicates (e.g. `a = b`), typically equi-join conditions and
Expand Down
19 changes: 13 additions & 6 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use std::slice::Iter;
use std::sync::Arc;
use std::{fmt, mem};

use crate::equivalence::class::{const_exprs_contains, AcrossPartitions};
use crate::equivalence::class::{
const_exprs_contains, uniform_const_exprs_contains, AcrossPartitions,
};
use crate::equivalence::{
EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
};
Expand Down Expand Up @@ -203,8 +205,9 @@ impl EquivalenceProperties {
let constants = self.constants();
let mut output_ordering = self.oeq_class().output_ordering().unwrap_or_default();
// Prune out constant expressions
output_ordering
.retain(|sort_expr| !const_exprs_contains(constants, &sort_expr.expr));
output_ordering.retain(|sort_expr| {
!uniform_const_exprs_contains(constants, &sort_expr.expr)
});
(!output_ordering.is_empty()).then_some(output_ordering)
}

Expand Down Expand Up @@ -438,7 +441,7 @@ impl EquivalenceProperties {
let filtered_exprs = LexOrdering::new(
sort_exprs
.into_iter()
.filter(|expr| !self.is_expr_constant(&expr.expr))
.filter(|expr| !self.is_expr_constant_across_partitions(&expr.expr))
.collect(),
);

Expand Down Expand Up @@ -4091,7 +4094,9 @@ mod tests {
// Setup constant columns
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
eq_properties = eq_properties.with_constants([ConstExpr::from(&col_a)]);
let const_val = AcrossPartitions::Uniform(Some(ScalarValue::Int32(Some(1))));
eq_properties = eq_properties
.with_constants([ConstExpr::from(&col_a).with_across_partitions(const_val)]);

let sort_exprs = LexOrdering::new(vec![
PhysicalSortExpr {
Expand Down Expand Up @@ -4272,7 +4277,9 @@ mod tests {
let asc = SortOptions::default();

// Constants: c is constant
eq_properties = eq_properties.with_constants([ConstExpr::from(&col_c)]);
let const_val = AcrossPartitions::Uniform(Some(ScalarValue::Int32(Some(1))));
eq_properties = eq_properties
.with_constants([ConstExpr::from(&col_c).with_across_partitions(const_val)]);

// Equality: b = d
eq_properties.add_equal_conditions(&col_b, &col_d)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn add_sort_above<T: Clone + Default>(
!node
.plan
.equivalence_properties()
.is_expr_constant(&sort_expr.expr)
.is_expr_constant_across_partitions(&sort_expr.expr)
});
let mut new_sort = SortExec::new(sort_expr, Arc::clone(&node.plan)).with_fetch(fetch);
if node.plan.output_partitioning().partition_count() > 1 {
Expand Down

0 comments on commit 4775354

Please sign in to comment.