-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor SortPushdown using the standard top-down visitor and using EquivalenceProperties
#14821
base: main
Are you sure you want to change the base?
Conversation
…factor in sort_pushdown_helper to make it more understandable
@@ -2203,7 +2203,7 @@ fn repartition_transitively_past_sort_with_projection() -> Result<()> { | |||
); | |||
|
|||
let expected = &[ | |||
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", | |||
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only test case change.
It's a no-op since we have only 1 partition here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wiedld -- this looks like an improvment -- and thank you for all the comments. I don't fully understand where some of this code is coming from -- I will give it a more careful look in the morning
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Outdated
Show resolved
Hide resolved
let fetch = requirements.data.fetch.or(sort_fetch); | ||
let parent_is_stricter = plan | ||
.equivalence_properties() | ||
.requirements_compatible(&parent_reqs, &child_reqs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where these requirements_compatible
calls come from. Is there an existing location?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The requirements_compatible (docs here) determine if the set of requirements is equal or stricter than the other set of requirements.
On current main, the handling of sort nodes includes the use of pushdown_requirement_to_children which itself uses the requirements_compatible. I simply removed some of the misdirection -- but I could add it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope this is great -- I just was trying to pattern match
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @wiedld -- I went through this logic carefully and I think it makes sense and is much easier to understand now
I am in the process of running the planning benchmarks to make sure these changes don't cause a performance regression, but overall I think they look quite good to me.
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Outdated
Show resolved
Hide resolved
So maybe there is a slight degredation but overall it looks fine to me Full results
|
FYI @xudong963 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you @wiedld. I just have 2 questions
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Outdated
Show resolved
Hide resolved
There is a new test on main (added 4 hours ago) which is failing. |
Testing for the win. That is a sign this code isn't sufficiently tested 🤔 |
It's failing sort pushdown on this test case:
It tries to push down only the The reason this doesn't happen on main is because they (1) immediately see if they can push down the sort to it's child (without the recursive call to walk down to the child), and then (2) if it cannot be pushed down then re-use the same SortExec node (this branch here), rather than reconstructing from the equivalence properties' ordering (which has constants removed). |
What are the output sort properties of
So then I would expect that the sort properties should be satisfied and this pass should eliminate the SortExec 🤔 |
I think I think the SQL looks like SELECT count() over () But maybe not 🤔 |
5a4ef63
to
4775354
Compare
The equivalence properties' ordering methods, as well as the Do you need me to make that^^ added commit into a separate PR? |
) -> bool { | ||
const_exprs.iter().any(|const_expr| { | ||
const_expr.expr.eq(expr) | ||
&& const_expr.across_partitions() != AcrossPartitions::Heterogeneous |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would find this code more explicit it it checked for == AcrossPartitions::Uniform
rather than != Heterogenious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this (which is how other parts of the code check this):
index 1e1266b7a..e9e7eb2ca 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -223,7 +223,10 @@ pub fn uniform_const_exprs_contains(
) -> bool {
const_exprs.iter().any(|const_expr| {
const_expr.expr.eq(expr)
- && const_expr.across_partitions() != AcrossPartitions::Heterogeneous
+ && matches!(
+ const_expr.across_partitions(),
+ AcrossPartitions::Uniform { .. }
+ )
})
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The equivalence properties' ordering methods, as well as the
add_sort_above
util method, were checking if the field was a constant -- and not considering if it's a heterogeneous constant. This commit 4775354 makes the tests pass; is it the correct fix?
I am not 100% what the correct fix is :) We need to figure it out together. The proposed change in 4775354 seems reasonable to me and the fact that all the tests pass is a good one
Given all the tests pass I think this PR is ready to go.
Let us know what you think @berkaysynnada
) -> bool { | ||
const_exprs.iter().any(|const_expr| { | ||
const_expr.expr.eq(expr) | ||
&& const_expr.across_partitions() != AcrossPartitions::Heterogeneous |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this (which is how other parts of the code check this):
index 1e1266b7a..e9e7eb2ca 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -223,7 +223,10 @@ pub fn uniform_const_exprs_contains(
) -> bool {
const_exprs.iter().any(|const_expr| {
const_expr.expr.eq(expr)
- && const_expr.across_partitions() != AcrossPartitions::Heterogeneous
+ && matches!(
+ const_expr.across_partitions(),
+ AcrossPartitions::Uniform { .. }
+ )
})
}
@@ -22,7 +22,9 @@ use std::slice::Iter; | |||
use std::sync::Arc; | |||
use std::{fmt, mem}; | |||
|
|||
use crate::equivalence::class::{const_exprs_contains, AcrossPartitions}; | |||
use crate::equivalence::class::{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking through the properties.rs file, it seems like it is getting huge -- I will file a follow on ticket about that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sort_pushdown changes seem good to me, but I don't agree the part where we filter the expressions being heterogeneously constant across partitions, while retaining the output ordering.
// Prune out constant expressions
output_ordering.retain(|sort_expr| {
!uniform_const_exprs_contains(constants, &sort_expr.expr)
});
I cannot follow from which point this fix arose, but we should fix the constantness check instead, where this function is called and returns and unexpected result. I'm saying this because output_ordering() API reflects the per-partition behavior.
For example, when a multi-partition operator returns an ordering from output_ordering() API, it means that its each partition is ordered according to that. But, we do not know how they will be once they are coalesced into 1 partition (it could still be ordered, or it could be broken)
BTW, I feel the need day by day more of expanding SortOptions with a new variant of "constant"
@berkaysynnada -- once I merged in the latest main, a (nice ❤️ ) test added by you a few hours earlier started failing. Specifically, it was failing due to the removal of heterogeneous constant fields from the ordering requirements. Your test case is not failing on main since the SortExec is kept intact, instead of selectively recreated (if needed) from the output ordering requirements. See here for exactly how this happens: #14821 (comment) As a result, I proposed a possible "fix" (more here: #14821 (comment)) based upon my limited understanding of how constants (and if they are heterogeneous across partitions) should be handled when calculating the output ordering. But I'm not sure this is the proper fix. 🤔 I'm going to convert this PR back to a draft, and make a reproducer test case isolating how |
…across partitions" This reverts commit 4775354.
… partition by (in window agg), and when aggregating on an unordered column
Thanks for giving me time to catch up. After digging more into the ordering calculation, and making a small PR with additional test cases (to demonstrate how I think its intended to work?), I now agree with @berkaysynnada. I'm going to undo the ordering calculation change (revert that single commit). Instead, I'm pushing up a temporary commit to show what test cases have a selective removal of the |
I took a look, but couldn't quite get the point you try to emphasize. Do you think that there is a problem in sort_pushdown or compute_properties of window operators?
I'll share my thoughts there
👍🏻 |
I'm still seeing the output_ordering() change which adds the heterogenous constant to the output_ordering, and I have said that doesn't seem correct to me. Did you mean to point out another part in that commit? |
I'm a bit confused about what are we trying to solve and how. Could you @wiedld tell me which issue or reproducer we are trying to solve in this collection of PR's please? I'd like to unblock you but unsure how to do it |
We are trying to fix the coalesce bug. In the process, I've been adding docs and making small improvements to make the code easier to understand.
I've already removed this change.
The goal is to make the enforce sorting optimizer easier to understand, with this PR focused on the |
@@ -2242,7 +2242,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { | |||
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet", | |||
], | |||
expected_plan: vec![ | |||
"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]", | |||
"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test change is actually ok.
The window agg is unpartitioned, and unbounded. Therefore the count value is constant across all rows; and therefore can be removed from the SortExec
. Therefore we think this change is an improvement.
The reason this plan got better is because the pushdown_sorts
is now recreating the sort from the eq props (that removes the constants), rather than re-attaching the existing sort node (as does main).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went over this PR again carefully with @wiedld and we discussed the plan changes and they seem like improvements to me
@berkaysynnada I wonder if you would be interested sometime in a video call or something to discuss these passes and what we are doing. We would be happy to set one up and it might help us be more efficient in the bac and forths
EquivalenceProperties
Of course, we can. My username |
I reached out to find some time |
@berkaysynnada @wiedld and I had a brief meeting. The outcomes from my perspective is:
|
and I'll investigate the temporal bug in release 44 |
Which issue does this PR close?
No issue.
Rationale for this change
It's a minor refactor on the
EnforceSorting
subrulesort_pushdown
. I was having a hard time reasoning and debugging a few things. Switching the traversal to a standard visitor pattern (transform_down
) made it a lot easier for me.What changes are included in this PR?
Move
pushdown_sorts
to use the standard top-down traversal.Only requires a bit of tweaking in the
pushdown_sorts_helper
.Add copious code docs.
Are these changes tested?
Yes, with existing tests.
Are there any user-facing changes?
No.