From 9781cc14f0bce202edfc2865cd9fd04442db38ad Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 23 Jan 2025 13:20:13 +0800 Subject: [PATCH 1/7] fix: LimitPushdown rule uncorrect remove some GlobalLimitExec --- datafusion/core/tests/dataframe/mod.rs | 34 ++++++++++++++++--- .../physical-optimizer/src/limit_pushdown.rs | 11 ++++-- datafusion/sqllogictest/test_files/limit.slt | 34 +++++++++++++++++++ 3 files changed, 72 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 954c46ab27e4..ab44523fe99d 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2217,11 +2217,6 @@ async fn write_parquet_with_order() -> Result<()> { let df = ctx.sql("SELECT * FROM data").await?; let results = df.collect().await?; - let df_explain = ctx.sql("explain SELECT a FROM data").await?; - let explain_result = df_explain.collect().await?; - - println!("explain_result {:?}", explain_result); - assert_batches_eq!( &[ "+---+---+", @@ -5182,3 +5177,32 @@ async fn register_non_parquet_file() { "1.json' does not match the expected extension '.parquet'" ); } + +// Test issue: https://github.com/apache/datafusion/issues/14204 +#[tokio::test] +async fn test_with_subquery_limit() -> Result<()> { + let ctx = SessionContext::new(); + ctx.sql("create table t(a int, b int) as values (1,2), (2,3), (3,4)") + .await?; + + let df = ctx + .sql("select * from t as t1 join (select * from t limit 1) limit 10") + .await? + .collect() + .await?; + + assert_batches_eq!( + &[ + "+---+---+---+---+", + "| a | b | a | b |", + "+---+---+---+---+", + "| 1 | 2 | 1 | 2 |", + "| 2 | 3 | 1 | 2 |", + "| 3 | 4 | 1 | 2 |", + "+---+---+---+---+", + ], + &df + ); + + Ok(()) +} diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 7a44b2e90dde..a4911914822a 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -30,7 +30,6 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; - /// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from /// the parent to the child if applicable. #[derive(Default, Debug)] @@ -247,7 +246,15 @@ pub fn pushdown_limit_helper( } } else { // Add fetch or a `LimitExec`: - global_state.satisfied = true; + + // If the plan's children have limit, we shouldn't change the global state to true, + // because the children limit will be overridden if the global state is changed. + if pushdown_plan.children().iter().any(|child| { + child.as_any().is::() + || child.as_any().is::() + }) { + global_state.satisfied = false; + } pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable { if global_skip > 0 { add_global_limit(plan_with_fetch, global_skip, Some(global_fetch)) diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 5b98392f1aa0..fe8852d26e79 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -711,3 +711,37 @@ OFFSET 3 LIMIT 2; statement ok drop table ordered_table; + +# Test limit pushdown with subquery +statement ok +create table testSubQueryLimit (a int, b int) as values (1,2), (2,3), (3,4); + +query IIII +select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10; +---- +1 2 1 2 +2 3 1 2 +3 4 1 2 + +query TT +explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10; +---- +logical_plan +01)Limit: skip=0, fetch=10 +02)--Cross Join: +03)----SubqueryAlias: t1 +04)------Limit: skip=0, fetch=10 +05)--------TableScan: testsubquerylimit projection=[a, b], fetch=10 +06)----Limit: skip=0, fetch=1 +07)------TableScan: testsubquerylimit projection=[a, b], fetch=1 +physical_plan +01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] +02)--GlobalLimitExec: skip=0, fetch=10 +03)----CrossJoinExec +04)------GlobalLimitExec: skip=0, fetch=1 +05)--------MemoryExec: partitions=1, partition_sizes=[1] +06)------GlobalLimitExec: skip=0, fetch=10 +07)--------MemoryExec: partitions=1, partition_sizes=[1] + +statement ok +drop table testSubQueryLimit; From d3969a2941cbfbff48c5cd01fb632a50f703e0d7 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 23 Jan 2025 15:32:15 +0800 Subject: [PATCH 2/7] Fix some logic for maybe fetch --- datafusion/physical-plan/src/memory.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index fb58a04fcc20..3621705f8612 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -62,6 +62,8 @@ pub struct MemoryExec { cache: PlanProperties, /// if partition sizes should be displayed show_sizes: bool, + /// it is a flag used by limit pushdown logic, but we don't use limit value until now + limit: Option, } impl fmt::Debug for MemoryExec { @@ -166,6 +168,13 @@ impl ExecutionPlan for MemoryExec { self.projection.clone(), )) } + + /// This logic is used by limit pushdown, but we don't use limit until now + fn with_fetch(&self, limit: Option) -> Option> { + let mut new_self = self.clone(); + new_self.limit = limit; + Some(Arc::new(new_self)) + } } impl MemoryExec { @@ -192,6 +201,7 @@ impl MemoryExec { sort_information: vec![], cache, show_sizes: true, + limit: None, }) } @@ -287,6 +297,7 @@ impl MemoryExec { sort_information: vec![], cache, show_sizes: true, + limit: None, }) } From dbb20eed97f104feb18f3fc8a83bc6583fb2d998 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Thu, 23 Jan 2025 17:14:32 +0800 Subject: [PATCH 3/7] Fix test --- .../physical-optimizer/src/limit_pushdown.rs | 1 - datafusion/physical-plan/src/memory.rs | 11 ---------- datafusion/sqllogictest/test_files/joins.slt | 20 ++++++++++++------- 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index a4911914822a..a489b042d2a8 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -246,7 +246,6 @@ pub fn pushdown_limit_helper( } } else { // Add fetch or a `LimitExec`: - // If the plan's children have limit, we shouldn't change the global state to true, // because the children limit will be overridden if the global state is changed. if pushdown_plan.children().iter().any(|child| { diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 3621705f8612..fb58a04fcc20 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -62,8 +62,6 @@ pub struct MemoryExec { cache: PlanProperties, /// if partition sizes should be displayed show_sizes: bool, - /// it is a flag used by limit pushdown logic, but we don't use limit value until now - limit: Option, } impl fmt::Debug for MemoryExec { @@ -168,13 +166,6 @@ impl ExecutionPlan for MemoryExec { self.projection.clone(), )) } - - /// This logic is used by limit pushdown, but we don't use limit until now - fn with_fetch(&self, limit: Option) -> Option> { - let mut new_self = self.clone(); - new_self.limit = limit; - Some(Arc::new(new_self)) - } } impl MemoryExec { @@ -201,7 +192,6 @@ impl MemoryExec { sort_information: vec![], cache, show_sizes: true, - limit: None, }) } @@ -297,7 +287,6 @@ impl MemoryExec { sort_information: vec![], cache, show_sizes: true, - limit: None, }) } diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 68426f180d99..cf9e0120f8f5 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4225,7 +4225,7 @@ query IIIIB SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2 LIMIT 2; ---- 2 2 2 2 true -3 3 2 2 true +2 2 2 2 false query IIIIB SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 AND t0.c2 >= t1.c2 LIMIT 2; @@ -4247,8 +4247,10 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=3, fetch=2 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)] -03)----MemoryExec: partitions=1, partition_sizes=[1] -04)----MemoryExec: partitions=1, partition_sizes=[1] +03)----GlobalLimitExec: skip=0, fetch=2 +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)----GlobalLimitExec: skip=0, fetch=2 +06)------MemoryExec: partitions=1, partition_sizes=[1] ## Test join.on.is_empty() && join.filter.is_some() query TT @@ -4264,8 +4266,10 @@ logical_plan physical_plan 01)GlobalLimitExec: skip=0, fetch=2 02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1 -03)----MemoryExec: partitions=1, partition_sizes=[1] -04)----MemoryExec: partitions=1, partition_sizes=[1] +03)----GlobalLimitExec: skip=0, fetch=2 +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)----GlobalLimitExec: skip=0, fetch=2 +06)------MemoryExec: partitions=1, partition_sizes=[1] ## Test !join.on.is_empty() && join.filter.is_some() query TT @@ -4281,8 +4285,10 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=3, fetch=2 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1 -03)----MemoryExec: partitions=1, partition_sizes=[1] -04)----MemoryExec: partitions=1, partition_sizes=[1] +03)----GlobalLimitExec: skip=0, fetch=2 +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)----GlobalLimitExec: skip=0, fetch=2 +06)------MemoryExec: partitions=1, partition_sizes=[1] # Test Utf8View as Join Key # Issue: https://github.com/apache/datafusion/issues/12468 From 87dba0ba5307b3cc7c5887efe5ce03928193e660 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Tue, 28 Jan 2025 21:53:48 +0800 Subject: [PATCH 4/7] Address comments --- datafusion/physical-optimizer/src/limit_pushdown.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index a489b042d2a8..d974b643315b 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -248,10 +248,11 @@ pub fn pushdown_limit_helper( // Add fetch or a `LimitExec`: // If the plan's children have limit, we shouldn't change the global state to true, // because the children limit will be overridden if the global state is changed. - if pushdown_plan.children().iter().any(|child| { - child.as_any().is::() - || child.as_any().is::() - }) { + if pushdown_plan + .children() + .iter() + .any(|&child| extract_limit(child).is_some()) + { global_state.satisfied = false; } pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable { From 8263922ae781bdcecf942cffb10df3a116decc9a Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 29 Jan 2025 17:10:32 +0800 Subject: [PATCH 5/7] Address comments --- datafusion/core/tests/dataframe/mod.rs | 29 -------------------- datafusion/sqllogictest/test_files/joins.slt | 20 +++++--------- datafusion/sqllogictest/test_files/limit.slt | 27 ++++++++++++++++++ 3 files changed, 34 insertions(+), 42 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index ab44523fe99d..1ebbf92c736e 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -5177,32 +5177,3 @@ async fn register_non_parquet_file() { "1.json' does not match the expected extension '.parquet'" ); } - -// Test issue: https://github.com/apache/datafusion/issues/14204 -#[tokio::test] -async fn test_with_subquery_limit() -> Result<()> { - let ctx = SessionContext::new(); - ctx.sql("create table t(a int, b int) as values (1,2), (2,3), (3,4)") - .await?; - - let df = ctx - .sql("select * from t as t1 join (select * from t limit 1) limit 10") - .await? - .collect() - .await?; - - assert_batches_eq!( - &[ - "+---+---+---+---+", - "| a | b | a | b |", - "+---+---+---+---+", - "| 1 | 2 | 1 | 2 |", - "| 2 | 3 | 1 | 2 |", - "| 3 | 4 | 1 | 2 |", - "+---+---+---+---+", - ], - &df - ); - - Ok(()) -} diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index ae2a3e7f27f6..ac02aeb6fea4 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4225,7 +4225,7 @@ query IIIIB SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2 LIMIT 2; ---- 2 2 2 2 true -2 2 2 2 false +3 3 2 2 true query IIIIB SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 AND t0.c2 >= t1.c2 LIMIT 2; @@ -4245,10 +4245,8 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=3, fetch=2 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)] -03)----GlobalLimitExec: skip=0, fetch=2 -04)------MemoryExec: partitions=1, partition_sizes=[1] -05)----GlobalLimitExec: skip=0, fetch=2 -06)------MemoryExec: partitions=1, partition_sizes=[1] +03)----MemoryExec: partitions=1, partition_sizes=[1] +04)----MemoryExec: partitions=1, partition_sizes=[1] ## Test join.on.is_empty() && join.filter.is_some() query TT @@ -4262,10 +4260,8 @@ logical_plan physical_plan 01)GlobalLimitExec: skip=0, fetch=2 02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1 -03)----GlobalLimitExec: skip=0, fetch=2 -04)------MemoryExec: partitions=1, partition_sizes=[1] -05)----GlobalLimitExec: skip=0, fetch=2 -06)------MemoryExec: partitions=1, partition_sizes=[1] +03)----MemoryExec: partitions=1, partition_sizes=[1] +04)----MemoryExec: partitions=1, partition_sizes=[1] ## Test !join.on.is_empty() && join.filter.is_some() query TT @@ -4279,10 +4275,8 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=3, fetch=2 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1 -03)----GlobalLimitExec: skip=0, fetch=2 -04)------MemoryExec: partitions=1, partition_sizes=[1] -05)----GlobalLimitExec: skip=0, fetch=2 -06)------MemoryExec: partitions=1, partition_sizes=[1] +03)----MemoryExec: partitions=1, partition_sizes=[1] +04)----MemoryExec: partitions=1, partition_sizes=[1] ## Add more test cases for join limit pushdown statement ok diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index fe8852d26e79..308e759fa9fa 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -712,6 +712,7 @@ OFFSET 3 LIMIT 2; statement ok drop table ordered_table; +# Test issue: https://github.com/apache/datafusion/issues/14204 # Test limit pushdown with subquery statement ok create table testSubQueryLimit (a int, b int) as values (1,2), (2,3), (3,4); @@ -743,5 +744,31 @@ physical_plan 06)------GlobalLimitExec: skip=0, fetch=10 07)--------MemoryExec: partitions=1, partition_sizes=[1] + +query IIII +select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 10) limit 2; +---- +1 2 1 2 +1 2 2 3 + +query TT +explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 10) limit 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Cross Join: +03)----SubqueryAlias: t1 +04)------Limit: skip=0, fetch=2 +05)--------TableScan: testsubquerylimit projection=[a, b], fetch=2 +06)----Limit: skip=0, fetch=2 +07)------TableScan: testsubquerylimit projection=[a, b], fetch=2 +physical_plan +01)GlobalLimitExec: skip=0, fetch=2 +02)--CrossJoinExec +03)----GlobalLimitExec: skip=0, fetch=2 +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)----GlobalLimitExec: skip=0, fetch=2 +06)------MemoryExec: partitions=1, partition_sizes=[1] + statement ok drop table testSubQueryLimit; From ee2bda7ae639fcb101c7d2d15d14f0469eb0c0d0 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 29 Jan 2025 17:13:26 +0800 Subject: [PATCH 6/7] Add comments --- datafusion/physical-optimizer/src/limit_pushdown.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 96513928b616..ddeb654dd506 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -247,7 +247,7 @@ pub fn pushdown_limit_helper( } } else { // Add fetch or a `LimitExec`: - // If the plan's children have limit, we shouldn't change the global state to true, + // If the plan's children have limit and the child's limit < parent's limit, we shouldn't change the global state to true, // because the children limit will be overridden if the global state is changed. if pushdown_plan .children() From 412b10428a30039131bd779a6d55accae6adac61 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Wed, 29 Jan 2025 23:18:52 +0800 Subject: [PATCH 7/7] Address comments --- datafusion/physical-optimizer/src/limit_pushdown.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index ddeb654dd506..5887cb51a727 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -249,12 +249,12 @@ pub fn pushdown_limit_helper( // Add fetch or a `LimitExec`: // If the plan's children have limit and the child's limit < parent's limit, we shouldn't change the global state to true, // because the children limit will be overridden if the global state is changed. - if pushdown_plan + if !pushdown_plan .children() .iter() .any(|&child| extract_limit(child).is_some()) { - global_state.satisfied = false; + global_state.satisfied = true; } pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable { if global_skip > 0 {