-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: LimitPushdown rule uncorrect remove some GlobalLimitExec #14245
Changes from 4 commits
9781cc1
d3969a2
dbb20ee
87dba0b
2421f91
8263922
ee2bda7
412b104
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,6 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; | |
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; | ||
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; | ||
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; | ||
|
||
/// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from | ||
/// the parent to the child if applicable. | ||
#[derive(Default, Debug)] | ||
|
@@ -247,7 +246,15 @@ pub fn pushdown_limit_helper( | |
} | ||
} else { | ||
// Add fetch or a `LimitExec`: | ||
global_state.satisfied = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the original logic for setting back to true. |
||
// If the plan's children have limit, we shouldn't change the global state to true, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the children's limit is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question @xudong963 , added the slt testing for children's limit is >= the globe limit, the limit should also support push down consistent with current behaviour, thanks! |
||
// because the children limit will be overridden if the global state is changed. | ||
if pushdown_plan | ||
.children() | ||
.iter() | ||
.any(|&child| extract_limit(child).is_some()) | ||
{ | ||
global_state.satisfied = false; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the logic is strange, if comes to the else branch(248 lines), it means There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it @xudong963 , the previous logic will always setting true in the global_state.satisfied == false. This logic is keep the false for some cases. I change the logic to more clear that, we only setting to true to exclude the above case. |
||
pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable { | ||
if global_skip > 0 { | ||
add_global_limit(plan_with_fetch, global_skip, Some(global_fetch)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4225,7 +4225,7 @@ query IIIIB | |
SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2 LIMIT 2; | ||
---- | ||
2 2 2 2 true | ||
3 3 2 2 true | ||
2 2 2 2 false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this change? According to this PR, the limit push down to full join both side: |
||
|
||
query IIIIB | ||
SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 AND t0.c2 >= t1.c2 LIMIT 2; | ||
|
@@ -4247,8 +4247,10 @@ logical_plan | |
physical_plan | ||
01)CoalesceBatchesExec: target_batch_size=3, fetch=2 | ||
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)] | ||
03)----MemoryExec: partitions=1, partition_sizes=[1] | ||
04)----MemoryExec: partitions=1, partition_sizes=[1] | ||
03)----GlobalLimitExec: skip=0, fetch=2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this change? According to this PR, the limit push down to full join both side: So the limit will apply both side already. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't think it is correct to push a limit below a Full join -- a FullJoin will create null values to match any misisng rows 🤔 So even if you limited both sides you'll still get rows out of there that shouldn't be ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Thank you @alamb for review, this is a good point, and we don't do anything for the full join limit in current PR. The logical plan already push down full join limit since: For example the following code:
And the physical plan will apply the limit pushdown, but without current PR, the child limit will be overridden by parent limit, so it does not show the limit in physical plan before. I suggest we can create a following issue to discuss do we need to fallback or change the full join push down limit case: What's your opinion? Thanks a lot! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the current datafusion code pushes limits down through FULL OUTER JOINs I agree we should file a bug and fix it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @alamb : /// Adds a limit to the inputs of a join, if possible
fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
use JoinType::*;
fn is_no_join_condition(join: &Join) -> bool {
join.on.is_empty() && join.filter.is_none()
}
let (left_limit, right_limit) = if is_no_join_condition(&join) {
match join.join_type {
Left | Right | Full | Inner => (Some(limit), Some(limit)),
LeftAnti | LeftSemi | LeftMark => (Some(limit), None),
RightAnti | RightSemi => (None, Some(limit)),
}
} else {
match join.join_type {
Left => (Some(limit), None),
Right => (None, Some(limit)),
Full => (Some(limit), Some(limit)),
_ => (None, None),
}
};
if left_limit.is_none() && right_limit.is_none() {
return Transformed::no(join);
}
if let Some(limit) = left_limit {
join.left = make_arc_limit(0, limit, join.left);
}
if let Some(limit) = right_limit {
join.right = make_arc_limit(0, limit, join.right);
}
Transformed::yes(join)
} I think it's safe we just want to limit any result, we don't care about which line to return for the optimization. If we want to get the accurate limit same with not limit, i think we need to remove those push down join optimization? Thanks a lot! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this line looks suspicious.
I will work on trying to file a bug tomorrow. BTW I don't think this is introduced in your PR I have also marked this PR as must be part of the 45 release There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks a lot @alamb! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am pretty sure this a bug (not introduced by this PR): I am now reviewing the rest of this PR more carefully Thank you for your patience @zhuqi-lucas |
||
04)------MemoryExec: partitions=1, partition_sizes=[1] | ||
05)----GlobalLimitExec: skip=0, fetch=2 | ||
06)------MemoryExec: partitions=1, partition_sizes=[1] | ||
|
||
## Test join.on.is_empty() && join.filter.is_some() | ||
query TT | ||
|
@@ -4264,8 +4266,10 @@ logical_plan | |
physical_plan | ||
01)GlobalLimitExec: skip=0, fetch=2 | ||
02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1 | ||
03)----MemoryExec: partitions=1, partition_sizes=[1] | ||
04)----MemoryExec: partitions=1, partition_sizes=[1] | ||
03)----GlobalLimitExec: skip=0, fetch=2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This plans is incorrect due to (not this PR) |
||
04)------MemoryExec: partitions=1, partition_sizes=[1] | ||
05)----GlobalLimitExec: skip=0, fetch=2 | ||
06)------MemoryExec: partitions=1, partition_sizes=[1] | ||
|
||
## Test !join.on.is_empty() && join.filter.is_some() | ||
query TT | ||
|
@@ -4281,8 +4285,10 @@ logical_plan | |
physical_plan | ||
01)CoalesceBatchesExec: target_batch_size=3, fetch=2 | ||
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1 | ||
03)----MemoryExec: partitions=1, partition_sizes=[1] | ||
04)----MemoryExec: partitions=1, partition_sizes=[1] | ||
03)----GlobalLimitExec: skip=0, fetch=2 | ||
04)------MemoryExec: partitions=1, partition_sizes=[1] | ||
05)----GlobalLimitExec: skip=0, fetch=2 | ||
06)------MemoryExec: partitions=1, partition_sizes=[1] | ||
|
||
# Test Utf8View as Join Key | ||
# Issue: https://github.com/apache/datafusion/issues/12468 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -711,3 +711,37 @@ OFFSET 3 LIMIT 2; | |
|
||
statement ok | ||
drop table ordered_table; | ||
|
||
# Test limit pushdown with subquery | ||
statement ok | ||
create table testSubQueryLimit (a int, b int) as values (1,2), (2,3), (3,4); | ||
|
||
query IIII | ||
select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10; | ||
---- | ||
1 2 1 2 | ||
2 3 1 2 | ||
3 4 1 2 | ||
|
||
query TT | ||
explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10; | ||
---- | ||
logical_plan | ||
01)Limit: skip=0, fetch=10 | ||
02)--Cross Join: | ||
03)----SubqueryAlias: t1 | ||
04)------Limit: skip=0, fetch=10 | ||
05)--------TableScan: testsubquerylimit projection=[a, b], fetch=10 | ||
06)----Limit: skip=0, fetch=1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this plan looks good to me -- the Limit 1 is still here |
||
07)------TableScan: testsubquerylimit projection=[a, b], fetch=1 | ||
physical_plan | ||
01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] | ||
02)--GlobalLimitExec: skip=0, fetch=10 | ||
03)----CrossJoinExec | ||
04)------GlobalLimitExec: skip=0, fetch=1 | ||
05)--------MemoryExec: partitions=1, partition_sizes=[1] | ||
06)------GlobalLimitExec: skip=0, fetch=10 | ||
07)--------MemoryExec: partitions=1, partition_sizes=[1] | ||
|
||
statement ok | ||
drop table testSubQueryLimit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Remove print info