diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 954c46ab27e4..1ebbf92c736e 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2217,11 +2217,6 @@ async fn write_parquet_with_order() -> Result<()> { let df = ctx.sql("SELECT * FROM data").await?; let results = df.collect().await?; - let df_explain = ctx.sql("explain SELECT a FROM data").await?; - let explain_result = df_explain.collect().await?; - - println!("explain_result {:?}", explain_result); - assert_batches_eq!( &[ "+---+---+", diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 1c7e4d3d4c3d..5887cb51a727 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -31,7 +31,6 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; - /// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from /// the parent to the child if applicable. #[derive(Default, Debug)] @@ -248,7 +247,15 @@ pub fn pushdown_limit_helper( } } else { // Add fetch or a `LimitExec`: - global_state.satisfied = true; + // If the plan's children have limit and the child's limit < parent's limit, we shouldn't change the global state to true, + // because the children limit will be overridden if the global state is changed. + if !pushdown_plan + .children() + .iter() + .any(|&child| extract_limit(child).is_some()) + { + global_state.satisfied = true; + } pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable { if global_skip > 0 { add_global_limit(plan_with_fetch, global_skip, Some(global_fetch)) diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 5b98392f1aa0..308e759fa9fa 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -711,3 +711,64 @@ OFFSET 3 LIMIT 2; statement ok drop table ordered_table; + +# Test issue: https://github.com/apache/datafusion/issues/14204 +# Test limit pushdown with subquery +statement ok +create table testSubQueryLimit (a int, b int) as values (1,2), (2,3), (3,4); + +query IIII +select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10; +---- +1 2 1 2 +2 3 1 2 +3 4 1 2 + +query TT +explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 1) limit 10; +---- +logical_plan +01)Limit: skip=0, fetch=10 +02)--Cross Join: +03)----SubqueryAlias: t1 +04)------Limit: skip=0, fetch=10 +05)--------TableScan: testsubquerylimit projection=[a, b], fetch=10 +06)----Limit: skip=0, fetch=1 +07)------TableScan: testsubquerylimit projection=[a, b], fetch=1 +physical_plan +01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] +02)--GlobalLimitExec: skip=0, fetch=10 +03)----CrossJoinExec +04)------GlobalLimitExec: skip=0, fetch=1 +05)--------MemoryExec: partitions=1, partition_sizes=[1] +06)------GlobalLimitExec: skip=0, fetch=10 +07)--------MemoryExec: partitions=1, partition_sizes=[1] + + +query IIII +select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 10) limit 2; +---- +1 2 1 2 +1 2 2 3 + +query TT +explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLimit limit 10) limit 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Cross Join: +03)----SubqueryAlias: t1 +04)------Limit: skip=0, fetch=2 +05)--------TableScan: testsubquerylimit projection=[a, b], fetch=2 +06)----Limit: skip=0, fetch=2 +07)------TableScan: testsubquerylimit projection=[a, b], fetch=2 +physical_plan +01)GlobalLimitExec: skip=0, fetch=2 +02)--CrossJoinExec +03)----GlobalLimitExec: skip=0, fetch=2 +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)----GlobalLimitExec: skip=0, fetch=2 +06)------MemoryExec: partitions=1, partition_sizes=[1] + +statement ok +drop table testSubQueryLimit;