Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor SortPushdown using the standard top-down visitor and using EquivalenceProperties #14821

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

wiedld
Copy link
Contributor

@wiedld wiedld commented Feb 22, 2025

Which issue does this PR close?

No issue.

Rationale for this change

It's a minor refactor on the EnforceSorting subrule sort_pushdown. I was having a hard time reasoning and debugging a few things. Switching the traversal to a standard visitor pattern (transform_down) made it a lot easier for me.

What changes are included in this PR?

Move pushdown_sorts to use the standard top-down traversal.
Only requires a bit of tweaking in the pushdown_sorts_helper.
Add copious code docs.

Are these changes tested?

Yes, with existing tests.

Are there any user-facing changes?

No.

@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate labels Feb 22, 2025
@@ -2203,7 +2203,7 @@ fn repartition_transitively_past_sort_with_projection() -> Result<()> {
);

let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only test case change.
It's a no-op since we have only 1 partition here.

@wiedld wiedld marked this pull request as ready for review February 22, 2025 07:43
@alamb alamb mentioned this pull request Feb 24, 2025
10 tasks
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wiedld -- this looks like an improvment -- and thank you for all the comments. I don't fully understand where some of this code is coming from -- I will give it a more careful look in the morning

let fetch = requirements.data.fetch.or(sort_fetch);
let parent_is_stricter = plan
.equivalence_properties()
.requirements_compatible(&parent_reqs, &child_reqs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where these requirements_compatible calls come from. Is there an existing location?

Copy link
Contributor Author

@wiedld wiedld Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requirements_compatible (docs here) determine if the set of requirements is equal or stricter than the other set of requirements.

On current main, the handling of sort nodes includes the use of pushdown_requirement_to_children which itself uses the requirements_compatible. I simply removed some of the misdirection -- but I could add it back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope this is great -- I just was trying to pattern match

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @wiedld -- I went through this logic carefully and I think it makes sense and is much easier to understand now

I am in the process of running the planning benchmarks to make sure these changes don't cause a performance regression, but overall I think they look quite good to me.

cc @mustafasrepo @berkaysynnada

@alamb
Copy link
Contributor

alamb commented Feb 25, 2025

++ critcmp main wiedld_refactor-sort-pushdown
group                                         main                                   wiedld_refactor-sort-pushdown
-----                                         ----                                   -----------------------------
logical_aggregate_with_join                   1.00  1513.2±12.84µs        ? ?/sec    1.01  1523.4±18.57µs        ? ?/sec
logical_select_all_from_1000                  1.00      4.7±0.04ms        ? ?/sec    1.00      4.6±0.04ms        ? ?/sec
logical_select_one_from_700                   1.00  1224.6±18.07µs        ? ?/sec    1.00  1221.0±12.28µs        ? ?/sec
...
physical_plan_clickbench_all                  1.00    264.8±3.74ms        ? ?/sec    1.02    270.4±6.25ms        ? ?/sec
physical_plan_tpcds_all                       1.00   1429.8±4.32ms        ? ?/sec    1.02  1457.8±22.28ms        ? ?/sec
physical_plan_tpch_all                        1.00     93.5±0.67ms        ? ?/sec    1.01     94.5±1.26ms        ? ?/sec
...
physical_select_all_from_1000                 1.01     43.5±0.16ms        ? ?/sec    1.00     42.9±0.29ms        ? ?/sec
physical_select_one_from_700                  1.01      3.5±0.03ms        ? ?/sec    1.00      3.5±0.05ms        ? ?/sec

So maybe there is a slight degredation but overall it looks fine to me

Full results

++ critcmp main wiedld_refactor-sort-pushdown
group                                         main                                   wiedld_refactor-sort-pushdown
-----                                         ----                                   -----------------------------
logical_aggregate_with_join                   1.00  1513.2±12.84µs        ? ?/sec    1.01  1523.4±18.57µs        ? ?/sec
logical_select_all_from_1000                  1.00      4.7±0.04ms        ? ?/sec    1.00      4.6±0.04ms        ? ?/sec
logical_select_one_from_700                   1.00  1224.6±18.07µs        ? ?/sec    1.00  1221.0±12.28µs        ? ?/sec
logical_trivial_join_high_numbered_columns    1.00  1163.5±12.74µs        ? ?/sec    1.03  1199.2±13.34µs        ? ?/sec
logical_trivial_join_low_numbered_columns     1.00  1185.5±19.11µs        ? ?/sec    1.00  1180.9±29.76µs        ? ?/sec
physical_intersection                         1.00      2.4±0.01ms        ? ?/sec    1.01      2.5±0.02ms        ? ?/sec
physical_join_consider_sort                   1.00      3.3±0.01ms        ? ?/sec    1.01      3.3±0.03ms        ? ?/sec
physical_join_distinct                        1.00  1136.3±16.08µs        ? ?/sec    1.01  1148.7±14.88µs        ? ?/sec
physical_many_self_joins                      1.00     17.3±0.12ms        ? ?/sec    1.01     17.5±0.21ms        ? ?/sec
physical_plan_clickbench_all                  1.00    264.8±3.74ms        ? ?/sec    1.02    270.4±6.25ms        ? ?/sec
physical_plan_clickbench_q1                   1.00      3.9±0.10ms        ? ?/sec    1.21      4.7±0.17ms        ? ?/sec
physical_plan_clickbench_q10                  1.00      5.1±0.16ms        ? ?/sec    1.08      5.4±0.34ms        ? ?/sec
physical_plan_clickbench_q11                  1.00      5.2±0.11ms        ? ?/sec    1.08      5.6±0.32ms        ? ?/sec
physical_plan_clickbench_q12                  1.00      5.4±0.17ms        ? ?/sec    1.04      5.6±0.32ms        ? ?/sec
physical_plan_clickbench_q13                  1.00      4.9±0.11ms        ? ?/sec    1.07      5.2±0.35ms        ? ?/sec
physical_plan_clickbench_q14                  1.00      5.2±0.15ms        ? ?/sec    1.03      5.4±0.28ms        ? ?/sec
physical_plan_clickbench_q15                  1.00      5.0±0.14ms        ? ?/sec    1.04      5.2±0.27ms        ? ?/sec
physical_plan_clickbench_q16                  1.00      4.4±0.09ms        ? ?/sec    1.05      4.6±0.19ms        ? ?/sec
physical_plan_clickbench_q17                  1.00      4.5±0.11ms        ? ?/sec    1.04      4.7±0.21ms        ? ?/sec
physical_plan_clickbench_q18                  1.00      4.2±0.11ms        ? ?/sec    1.03      4.4±0.21ms        ? ?/sec
physical_plan_clickbench_q19                  1.00      5.1±0.10ms        ? ?/sec    1.00      5.2±0.14ms        ? ?/sec
physical_plan_clickbench_q2                   1.00      4.2±0.09ms        ? ?/sec    1.01      4.2±0.12ms        ? ?/sec
physical_plan_clickbench_q20                  1.00      4.0±0.09ms        ? ?/sec    1.02      4.0±0.14ms        ? ?/sec
physical_plan_clickbench_q21                  1.00      4.2±0.11ms        ? ?/sec    1.00      4.2±0.07ms        ? ?/sec
physical_plan_clickbench_q22                  1.00      5.3±0.14ms        ? ?/sec    1.00      5.2±0.09ms        ? ?/sec
physical_plan_clickbench_q23                  1.05      6.0±0.14ms        ? ?/sec    1.00      5.7±0.09ms        ? ?/sec
physical_plan_clickbench_q24                  1.02      6.6±0.16ms        ? ?/sec    1.00      6.5±0.13ms        ? ?/sec
physical_plan_clickbench_q25                  1.00      4.6±0.13ms        ? ?/sec    1.11      5.2±0.27ms        ? ?/sec
physical_plan_clickbench_q26                  1.00      4.3±0.14ms        ? ?/sec    1.02      4.4±0.13ms        ? ?/sec
physical_plan_clickbench_q27                  1.00      4.7±0.14ms        ? ?/sec    1.02      4.7±0.11ms        ? ?/sec
physical_plan_clickbench_q28                  1.00      5.5±0.16ms        ? ?/sec    1.02      5.6±0.16ms        ? ?/sec
physical_plan_clickbench_q29                  1.00      6.7±0.25ms        ? ?/sec    1.00      6.7±0.18ms        ? ?/sec
physical_plan_clickbench_q3                   1.00      4.2±0.14ms        ? ?/sec    1.01      4.2±0.13ms        ? ?/sec
physical_plan_clickbench_q30                  1.00     20.8±0.28ms        ? ?/sec    1.01     20.9±0.54ms        ? ?/sec
physical_plan_clickbench_q31                  1.01      5.6±0.18ms        ? ?/sec    1.00      5.5±0.09ms        ? ?/sec
physical_plan_clickbench_q32                  1.02      5.6±0.13ms        ? ?/sec    1.00      5.5±0.09ms        ? ?/sec
physical_plan_clickbench_q33                  1.01      5.0±0.08ms        ? ?/sec    1.00      5.0±0.09ms        ? ?/sec
physical_plan_clickbench_q34                  1.01      4.6±0.09ms        ? ?/sec    1.00      4.5±0.08ms        ? ?/sec
physical_plan_clickbench_q35                  1.01      4.7±0.11ms        ? ?/sec    1.00      4.7±0.09ms        ? ?/sec
physical_plan_clickbench_q36                  1.04      6.1±0.31ms        ? ?/sec    1.00      5.9±0.13ms        ? ?/sec
physical_plan_clickbench_q37                  1.00      6.0±0.12ms        ? ?/sec    1.03      6.1±0.20ms        ? ?/sec
physical_plan_clickbench_q38                  1.00      6.0±0.14ms        ? ?/sec    1.08      6.4±0.30ms        ? ?/sec
physical_plan_clickbench_q39                  1.01      5.5±0.10ms        ? ?/sec    1.00      5.4±0.10ms        ? ?/sec
physical_plan_clickbench_q4                   1.01      4.1±0.14ms        ? ?/sec    1.00      4.1±0.09ms        ? ?/sec
physical_plan_clickbench_q40                  1.02      6.2±0.17ms        ? ?/sec    1.00      6.1±0.11ms        ? ?/sec
physical_plan_clickbench_q41                  1.01      5.9±0.14ms        ? ?/sec    1.00      5.8±0.11ms        ? ?/sec
physical_plan_clickbench_q42                  1.01      5.7±0.15ms        ? ?/sec    1.00      5.6±0.16ms        ? ?/sec
physical_plan_clickbench_q43                  1.00      5.8±0.13ms        ? ?/sec    1.02      6.0±0.19ms        ? ?/sec
physical_plan_clickbench_q44                  1.00      4.1±0.09ms        ? ?/sec    1.07      4.4±0.18ms        ? ?/sec
physical_plan_clickbench_q45                  1.00      4.1±0.09ms        ? ?/sec    1.04      4.3±0.11ms        ? ?/sec
physical_plan_clickbench_q46                  1.00      4.8±0.10ms        ? ?/sec    1.03      4.9±0.09ms        ? ?/sec
physical_plan_clickbench_q47                  1.00      5.6±0.15ms        ? ?/sec    1.03      5.7±0.19ms        ? ?/sec
physical_plan_clickbench_q48                  1.00      6.2±0.16ms        ? ?/sec    1.03      6.4±0.25ms        ? ?/sec
physical_plan_clickbench_q49                  1.00      6.4±0.14ms        ? ?/sec    1.06      6.8±0.23ms        ? ?/sec
physical_plan_clickbench_q5                   1.03      4.4±0.11ms        ? ?/sec    1.00      4.3±0.10ms        ? ?/sec
physical_plan_clickbench_q6                   1.00      4.4±0.13ms        ? ?/sec    1.04      4.6±0.17ms        ? ?/sec
physical_plan_clickbench_q7                   1.00      5.0±0.13ms        ? ?/sec    1.08      5.3±0.24ms        ? ?/sec
physical_plan_clickbench_q8                   1.00      4.6±0.12ms        ? ?/sec    1.10      5.1±0.19ms        ? ?/sec
physical_plan_clickbench_q9                   1.00      5.0±0.14ms        ? ?/sec    1.02      5.1±0.26ms        ? ?/sec
physical_plan_tpcds_all                       1.00   1429.8±4.32ms        ? ?/sec    1.02  1457.8±22.28ms        ? ?/sec
physical_plan_tpch_all                        1.00     93.5±0.67ms        ? ?/sec    1.01     94.5±1.26ms        ? ?/sec
physical_plan_tpch_q1                         1.00      3.3±0.02ms        ? ?/sec    1.00      3.3±0.02ms        ? ?/sec
physical_plan_tpch_q10                        1.00      4.6±0.04ms        ? ?/sec    1.00      4.6±0.04ms        ? ?/sec
physical_plan_tpch_q11                        1.00      4.1±0.03ms        ? ?/sec    1.01      4.2±0.05ms        ? ?/sec
physical_plan_tpch_q12                        1.00      3.2±0.02ms        ? ?/sec    1.00      3.2±0.02ms        ? ?/sec
physical_plan_tpch_q13                        1.00      2.5±0.02ms        ? ?/sec    1.02      2.6±0.03ms        ? ?/sec
physical_plan_tpch_q14                        1.00      2.9±0.03ms        ? ?/sec    1.01      3.0±0.04ms        ? ?/sec
physical_plan_tpch_q16                        1.00      4.0±0.03ms        ? ?/sec    1.01      4.0±0.05ms        ? ?/sec
physical_plan_tpch_q17                        1.00      3.9±0.03ms        ? ?/sec    1.00      3.9±0.04ms        ? ?/sec
physical_plan_tpch_q18                        1.00      4.3±0.05ms        ? ?/sec    1.01      4.3±0.07ms        ? ?/sec
physical_plan_tpch_q19                        1.00      6.0±0.03ms        ? ?/sec    1.01      6.1±0.05ms        ? ?/sec
physical_plan_tpch_q2                         1.00      7.8±0.05ms        ? ?/sec    1.00      7.8±0.08ms        ? ?/sec
physical_plan_tpch_q20                        1.00      4.9±0.03ms        ? ?/sec    1.01      4.9±0.04ms        ? ?/sec
physical_plan_tpch_q21                        1.00      6.3±0.05ms        ? ?/sec    1.00      6.3±0.12ms        ? ?/sec
physical_plan_tpch_q22                        1.00      3.7±0.02ms        ? ?/sec    1.01      3.8±0.03ms        ? ?/sec
physical_plan_tpch_q3                         1.00      3.4±0.03ms        ? ?/sec    1.00      3.4±0.03ms        ? ?/sec
physical_plan_tpch_q4                         1.00      2.6±0.02ms        ? ?/sec    1.01      2.7±0.06ms        ? ?/sec
physical_plan_tpch_q5                         1.00      4.6±0.04ms        ? ?/sec    1.00      4.6±0.04ms        ? ?/sec
physical_plan_tpch_q6                         1.00  1916.1±13.26µs        ? ?/sec    1.01  1926.0±24.44µs        ? ?/sec
physical_plan_tpch_q7                         1.00      6.0±0.04ms        ? ?/sec    1.01      6.0±0.06ms        ? ?/sec
physical_plan_tpch_q8                         1.00      7.1±0.05ms        ? ?/sec    1.00      7.1±0.08ms        ? ?/sec
physical_plan_tpch_q9                         1.00      5.6±0.03ms        ? ?/sec    1.00      5.6±0.06ms        ? ?/sec
physical_select_aggregates_from_200           1.00     32.8±0.15ms        ? ?/sec    1.01     33.1±0.24ms        ? ?/sec
physical_select_all_from_1000                 1.01     43.5±0.16ms        ? ?/sec    1.00     42.9±0.29ms        ? ?/sec
physical_select_one_from_700                  1.01      3.5±0.03ms        ? ?/sec    1.00      3.5±0.05ms        ? ?/sec
physical_sorted_union_orderby                 1.00    119.7±0.46ms        ? ?/sec    1.00    120.1±0.54ms        ? ?/sec
physical_theta_join_consider_sort             1.00      3.7±0.02ms        ? ?/sec    1.01      3.7±0.04ms        ? ?/sec
physical_unnest_to_join                       1.00      3.3±0.02ms        ? ?/sec    1.01      3.4±0.03ms        ? ?/sec
with_param_values_many_columns                1.00    161.8±0.89µs        ? ?/sec    1.01    163.0±0.86µs        ? ?/sec

@alamb
Copy link
Contributor

alamb commented Feb 25, 2025

FYI @xudong963

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you @wiedld. I just have 2 questions

@wiedld
Copy link
Contributor Author

wiedld commented Feb 26, 2025

There is a new test on main (added 4 hours ago) which is failing.
Converting to draft while I debug.

@wiedld wiedld marked this pull request as draft February 26, 2025 18:40
@alamb
Copy link
Contributor

alamb commented Feb 26, 2025

There is a new test on main (added 4 hours ago) which is failing. Converting to draft while I debug.

Testing for the win. That is a sign this code isn't sufficiently tested 🤔

@wiedld
Copy link
Contributor Author

wiedld commented Feb 26, 2025

There is a new test on main (added 4 hours ago) which is failing. Converting to draft while I debug.

It's failing sort pushdown on this test case:

"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]",
"  WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]",
"    DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",

It tries to push down only the non_nullable_col@1 ASC NULLS LAST as the current_plan_reqs, and it removes the second count column since it's a constant (per when the SortExec calculates its properties here).

The reason this doesn't happen on main is because they (1) immediately see if they can push down the sort to it's child (without the recursive call to walk down to the child), and then (2) if it cannot be pushed down then re-use the same SortExec node (this branch here), rather than reconstructing from the equivalence properties' ordering (which has constants removed).

@alamb
Copy link
Contributor

alamb commented Feb 26, 2025

"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]",
"  WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]",
"    DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",

The reason this doesn't happen on main is because they (1) immediately see if they can push down the sort to it's child (without the recursive call to walk down to the child), and then (2) if it cannot be pushed down then re-use the same SortExec node (this branch here), rather than reconstructing from the equivalence properties' ordering (which has constants removed).

What are the output sort properties of WindowAggExec? It seems like it should report that it is compatible with the SortExec

WindowAggExec would produce a sort output that is nullable_col and constants={count}

So then I would expect that the sort properties should be satisfied and this pass should eliminate the SortExec 🤔

@alamb
Copy link
Contributor

alamb commented Feb 26, 2025

I think count() is constant in this case because there is only one partition perhaps 🤔

I think the SQL looks like

SELECT count() over ()

But maybe not 🤔

@github-actions github-actions bot added the physical-expr Changes to the physical-expr crates label Feb 26, 2025
@wiedld wiedld force-pushed the wiedld/refactor-sort-pushdown branch from 5a4ef63 to 4775354 Compare February 26, 2025 23:38
@wiedld
Copy link
Contributor Author

wiedld commented Feb 27, 2025

The equivalence properties' ordering methods, as well as the add_sort_above util method, were checking if the field was a constant -- and not considering if it's a heterogeneous constant. This commit 4775354 makes the tests pass; is it the correct fix?

Do you need me to make that^^ added commit into a separate PR?

@wiedld wiedld marked this pull request as ready for review February 27, 2025 00:17
) -> bool {
const_exprs.iter().any(|const_expr| {
const_expr.expr.eq(expr)
&& const_expr.across_partitions() != AcrossPartitions::Heterogeneous
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would find this code more explicit it it checked for == AcrossPartitions::Uniform rather than != Heterogenious

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this (which is how other parts of the code check this):

index 1e1266b7a..e9e7eb2ca 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -223,7 +223,10 @@ pub fn uniform_const_exprs_contains(
 ) -> bool {
     const_exprs.iter().any(|const_expr| {
         const_expr.expr.eq(expr)
-            && const_expr.across_partitions() != AcrossPartitions::Heterogeneous
+            && matches!(
+                const_expr.across_partitions(),
+                AcrossPartitions::Uniform { .. }
+            )
     })
 }

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The equivalence properties' ordering methods, as well as the add_sort_above util method, were checking if the field was a constant -- and not considering if it's a heterogeneous constant. This commit 4775354 makes the tests pass; is it the correct fix?

I am not 100% what the correct fix is :) We need to figure it out together. The proposed change in 4775354 seems reasonable to me and the fact that all the tests pass is a good one

Given all the tests pass I think this PR is ready to go.

Let us know what you think @berkaysynnada

) -> bool {
const_exprs.iter().any(|const_expr| {
const_expr.expr.eq(expr)
&& const_expr.across_partitions() != AcrossPartitions::Heterogeneous
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this (which is how other parts of the code check this):

index 1e1266b7a..e9e7eb2ca 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -223,7 +223,10 @@ pub fn uniform_const_exprs_contains(
 ) -> bool {
     const_exprs.iter().any(|const_expr| {
         const_expr.expr.eq(expr)
-            && const_expr.across_partitions() != AcrossPartitions::Heterogeneous
+            && matches!(
+                const_expr.across_partitions(),
+                AcrossPartitions::Uniform { .. }
+            )
     })
 }

@@ -22,7 +22,9 @@ use std::slice::Iter;
use std::sync::Arc;
use std::{fmt, mem};

use crate::equivalence::class::{const_exprs_contains, AcrossPartitions};
use crate::equivalence::class::{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking through the properties.rs file, it seems like it is getting huge -- I will file a follow on ticket about that

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort_pushdown changes seem good to me, but I don't agree the part where we filter the expressions being heterogeneously constant across partitions, while retaining the output ordering.

         // Prune out constant expressions
         output_ordering.retain(|sort_expr| {
             !uniform_const_exprs_contains(constants, &sort_expr.expr)
         });

I cannot follow from which point this fix arose, but we should fix the constantness check instead, where this function is called and returns and unexpected result. I'm saying this because output_ordering() API reflects the per-partition behavior.

For example, when a multi-partition operator returns an ordering from output_ordering() API, it means that its each partition is ordered according to that. But, we do not know how they will be once they are coalesced into 1 partition (it could still be ordered, or it could be broken)

BTW, I feel the need day by day more of expanding SortOptions with a new variant of "constant"

@wiedld
Copy link
Contributor Author

wiedld commented Feb 27, 2025

but I don't agree the part where we filter the expressions being heterogeneously constant across partitions, while retaining the output ordering.

I cannot follow from which point this fix arose, but we should fix the constantness check instead, where this function is called and returns and unexpected result. I'm saying this because output_ordering() API reflects the per-partition behavior.

@berkaysynnada -- once I merged in the latest main, a (nice ❤️ ) test added by you a few hours earlier started failing. Specifically, it was failing due to the removal of heterogeneous constant fields from the ordering requirements. Your test case is not failing on main since the SortExec is kept intact, instead of selectively recreated (if needed) from the output ordering requirements. See here for exactly how this happens: #14821 (comment)

As a result, I proposed a possible "fix" (more here: #14821 (comment)) based upon my limited understanding of how constants (and if they are heterogeneous across partitions) should be handled when calculating the output ordering. But I'm not sure this is the proper fix. 🤔

I'm going to convert this PR back to a draft, and make a reproducer test case isolating how output_ordering() calculates based on heterogeneous constants. Then we can decide that behavior first (before re-considering this PR).

… partition by (in window agg), and when aggregating on an unordered column
@github-actions github-actions bot removed the physical-expr Changes to the physical-expr crates label Feb 28, 2025
@wiedld
Copy link
Contributor Author

wiedld commented Feb 28, 2025

Thanks for giving me time to catch up.

After digging more into the ordering calculation, and making a small PR with additional test cases (to demonstrate how I think its intended to work?), I now agree with @berkaysynnada. I'm going to undo the ordering calculation change (revert that single commit).

Instead, I'm pushing up a temporary commit to show what test cases have a selective removal of the order by count. I'm not sure why the count field is only a constant when we have Plain(unbounded) + no partition_by + on unordered column. Keeping as a draft, and I'll dig more tmrw (starting with the windows agg exec constants handling).

@berkaysynnada
Copy link
Contributor

@berkaysynnada -- once I merged in the latest main, a (nice ❤️ ) test added by you a few hours earlier started failing. Specifically, it was failing due to the removal of heterogeneous constant fields from the ordering requirements. Your test case is not failing on main since the SortExec is kept intact, instead of selectively recreated (if needed) from the output ordering requirements. See here for exactly how this happens: #14821 (comment)

I took a look, but couldn't quite get the point you try to emphasize. Do you think that there is a problem in sort_pushdown or compute_properties of window operators?

As a result, I proposed a possible "fix" (more here: #14821 (comment)) based upon my limited understanding of how constants (and if they are heterogeneous across partitions) should be handled when calculating the output ordering. But I'm not sure this is the proper fix. 🤔

I'll share my thoughts there

I'm going to convert this PR back to a draft, and make a reproducer test case isolating how output_ordering() calculates based on heterogeneous constants. Then we can decide that behavior first (before re-considering this PR).

👍🏻

@berkaysynnada
Copy link
Contributor

The equivalence properties' ordering methods, as well as the add_sort_above util method, were checking if the field was a constant -- and not considering if it's a heterogeneous constant. This commit 4775354 makes the tests pass; is it the correct fix?

Do you need me to make that^^ added commit into a separate PR?

I'm still seeing the output_ordering() change which adds the heterogenous constant to the output_ordering, and I have said that doesn't seem correct to me. Did you mean to point out another part in that commit?

@berkaysynnada
Copy link
Contributor

I'm a bit confused about what are we trying to solve and how. Could you @wiedld tell me which issue or reproducer we are trying to solve in this collection of PR's please? I'd like to unblock you but unsure how to do it

@wiedld
Copy link
Contributor Author

wiedld commented Feb 28, 2025

I'm a bit confused about what are we trying to solve and how. Could you @wiedld tell me which issue or reproducer we are trying to solve in this collection of PR's please? I'd like to unblock you but unsure how to do it

We are trying to fix the coalesce bug. In the process, I've been adding docs and making small improvements to make the code easier to understand.

I'm still seeing the output_ordering() change which adds the heterogenous constant to the output_ordering, and I have said that doesn't seem correct to me.

I've already removed this change.

What is the goal of this PR?

The goal is to make the enforce sorting optimizer easier to understand, with this PR focused on the pushdown_sorts subrule.

@@ -2242,7 +2242,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet",
],
expected_plan: vec![
"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]",
"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]",
Copy link
Contributor Author

@wiedld wiedld Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test change is actually ok.

The window agg is unpartitioned, and unbounded. Therefore the count value is constant across all rows; and therefore can be removed from the SortExec. Therefore we think this change is an improvement.

The reason this plan got better is because the pushdown_sorts is now recreating the sort from the eq props (that removes the constants), rather than re-attaching the existing sort node (as does main).

@wiedld wiedld marked this pull request as ready for review February 28, 2025 18:22
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went over this PR again carefully with @wiedld and we discussed the plan changes and they seem like improvements to me

@berkaysynnada I wonder if you would be interested sometime in a video call or something to discuss these passes and what we are doing. We would be happy to set one up and it might help us be more efficient in the bac and forths

@alamb alamb changed the title Refactor SortPushdown using the standard top-down visitor. Refactor SortPushdown using the standard top-down visitor and using EquivalenceProperties Feb 28, 2025
@berkaysynnada
Copy link
Contributor

I went over this PR again carefully with @wiedld and we discussed the plan changes and they seem like improvements to me

@berkaysynnada I wonder if you would be interested sometime in a video call or something to discuss these passes and what we are doing. We would be happy to set one up and it might help us be more efficient in the bac and forths

Of course, we can. My username berkaysynnada on discord, feel free to reach me.

@alamb
Copy link
Contributor

alamb commented Feb 28, 2025

Of course, we can. My username berkaysynnada on discord, feel free to reach me.

I reached out to find some time

@alamb
Copy link
Contributor

alamb commented Mar 4, 2025

@berkaysynnada @wiedld and I had a brief meeting. The outcomes from my perspective is:

  1. @berkaysynnada plans to review this PR and test on the synnada fork to ensure it still work
  2. In parallel, @wiedld and I will work on increasing the test coverage of EnforceDistribution to ensure it covers our InfluxDB Iox plans

@berkaysynnada
Copy link
Contributor

@berkaysynnada @wiedld and I had a brief meeting. The outcomes from my perspective is:

  1. @berkaysynnada plans to review this PR and test on the synnada fork to ensure it still work
  2. In parallel, @wiedld and I will work on increasing the test coverage of EnforceDistribution to ensure it covers our InfluxDB Iox plans

and I'll investigate the temporal bug in release 44

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants