diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs index f5f7c809fc428..0db08cf749cc1 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs @@ -240,8 +240,8 @@ impl RewriteRules for MemberRules { "?members", "?filters", "?orders", - "?cube_fetch", - "?offset", + "?inner_fetch", + "?inner_skip", "?split", "?can_pushdown_join", "CubeScanWrapped:false", @@ -260,7 +260,14 @@ impl RewriteRules for MemberRules { "CubeScanWrapped:false", "?ungrouped", ), - self.push_down_limit("?skip", "?fetch", "?new_skip", "?new_fetch"), + self.push_down_limit( + "?skip", + "?fetch", + "?inner_skip", + "?inner_fetch", + "?new_skip", + "?new_fetch", + ), ), // MOD function to binary expr transforming_rewrite_with_root( @@ -1597,47 +1604,90 @@ impl MemberRules { &self, skip_var: &'static str, fetch_var: &'static str, + inner_skip_var: &'static str, + inner_fetch_var: &'static str, new_skip_var: &'static str, new_fetch_var: &'static str, ) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool { let skip_var = var!(skip_var); let fetch_var = var!(fetch_var); + let inner_skip_var = var!(inner_skip_var); + let inner_fetch_var = var!(inner_fetch_var); let new_skip_var = var!(new_skip_var); let new_fetch_var = var!(new_fetch_var); move |egraph, subst| { + // This transform expects only single value in every (eclass, kind) + // No two different values of fetch or skip should ever get unified + let mut skip_value = None; for skip in var_iter!(egraph[subst[skip_var]], LimitSkip) { - if skip.unwrap_or_default() > 0 { - skip_value = *skip; - break; - } + skip_value = *skip; + break; } let mut fetch_value = None; for fetch in var_iter!(egraph[subst[fetch_var]], LimitFetch) { - if fetch.unwrap_or_default() > 0 { - fetch_value = *fetch; - break; - } + fetch_value = *fetch; + break; + } + // TODO support this case + if fetch_value == Some(0) { + // Broken and unsupported case for now + return false; } - if skip_value.is_some() || fetch_value.is_some() { - subst.insert( - new_skip_var, - egraph.add(LogicalPlanLanguage::CubeScanOffset(CubeScanOffset( - skip_value, - ))), - ); - subst.insert( - new_fetch_var, - egraph.add(LogicalPlanLanguage::CubeScanLimit(CubeScanLimit( - fetch_value, - ))), - ); - - return true; + let mut inner_skip_value = None; + for inner_skip in var_iter!(egraph[subst[inner_skip_var]], CubeScanOffset) { + inner_skip_value = *inner_skip; + break; } - false + let mut inner_fetch_value = None; + for inner_fetch in var_iter!(egraph[subst[inner_fetch_var]], CubeScanLimit) { + inner_fetch_value = *inner_fetch; + break; + } + + let new_skip = match (skip_value, inner_skip_value) { + (None, None) => None, + (Some(skip), None) | (None, Some(skip)) => Some(skip), + (Some(outer_skip), Some(inner_skip)) => Some(outer_skip + inner_skip), + }; + // No need to set offset=0, it's same as no offset + let new_skip = if new_skip != Some(0) { new_skip } else { None }; + let new_fetch = match (fetch_value, inner_fetch_value) { + (None, None) => None, + // Inner node have no limit, maybe just offset, result limit is same as for outer node + (Some(outer_fetch), None) => Some(outer_fetch), + // Outer node have no limit, but may have offset + // First, inner offset would apply + // Then inner node would limit rows + // Then outer offset would apply, which would yield no more than `inner_fetch - outer_skip` rows + (None, Some(inner_fetch)) => { + Some(inner_fetch.saturating_sub(skip_value.unwrap_or(0))) + } + // Both nodes have a limit + // First, inner offset would apply + // Then inner node would limit rows + // Then outer offset would apply, which would yield no more than `in_limit - out_offset` rows + // Then outer limit would apply, which would yield no more than minimal of two + (Some(outer_fetch), Some(inner_fetch)) => Some(usize::min( + inner_fetch.saturating_sub(skip_value.unwrap_or(0)), + outer_fetch, + )), + }; + + subst.insert( + new_skip_var, + egraph.add(LogicalPlanLanguage::CubeScanOffset(CubeScanOffset( + new_skip, + ))), + ); + subst.insert( + new_fetch_var, + egraph.add(LogicalPlanLanguage::CubeScanLimit(CubeScanLimit(new_fetch))), + ); + + true } } diff --git a/rust/cubesql/cubesql/src/compile/test/mod.rs b/rust/cubesql/cubesql/src/compile/test/mod.rs index 411bd05f26bef..e36f4933292b1 100644 --- a/rust/cubesql/cubesql/src/compile/test/mod.rs +++ b/rust/cubesql/cubesql/src/compile/test/mod.rs @@ -32,6 +32,8 @@ pub mod test_cube_join; #[cfg(test)] pub mod test_cube_join_grouped; #[cfg(test)] +pub mod test_cube_scan; +#[cfg(test)] pub mod test_df_execution; #[cfg(test)] pub mod test_introspection; diff --git a/rust/cubesql/cubesql/src/compile/test/test_cube_scan.rs b/rust/cubesql/cubesql/src/compile/test/test_cube_scan.rs new file mode 100644 index 0000000000000..0b6143ff0cc1a --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/test/test_cube_scan.rs @@ -0,0 +1,226 @@ +use cubeclient::models::V1LoadRequestQuery; +use pretty_assertions::assert_eq; + +use crate::compile::{ + test::{convert_select_to_query_plan, init_testing_logger, utils::LogicalPlanTestUtils}, + DatabaseProtocol, +}; + +/// LIMIT n OFFSET m should be pushed to CubeScan +#[tokio::test] +async fn cubescan_limit_offset() { + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" + SELECT + customer_gender + FROM + KibanaSampleDataEcommerce + GROUP BY + 1 + LIMIT 2 + OFFSET 3 + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let logical_plan = query_plan.as_logical_plan(); + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![]), + dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]), + segments: Some(vec![]), + order: Some(vec![]), + limit: Some(2), + offset: Some(3), + ..Default::default() + } + ); +} + +/// LIMIT over LIMIT should be pushed to single CubeScan +#[tokio::test] +async fn cubescan_limit_limit() { + init_testing_logger(); + + let variants = vec![ + // language=PostgreSQL + r#" + SELECT + customer_gender + FROM ( + SELECT + customer_gender + FROM + KibanaSampleDataEcommerce + GROUP BY + 1 + LIMIT 3 + ) scan + LIMIT 2 + "#, + // language=PostgreSQL + r#" + SELECT + customer_gender + FROM ( + SELECT + customer_gender + FROM + KibanaSampleDataEcommerce + GROUP BY + 1 + LIMIT 2 + ) scan + LIMIT 3 + "#, + ]; + + for variant in variants { + let query_plan = + convert_select_to_query_plan(variant.to_string(), DatabaseProtocol::PostgreSQL).await; + + let logical_plan = query_plan.as_logical_plan(); + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![]), + dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]), + segments: Some(vec![]), + order: Some(vec![]), + limit: Some(2), + ..Default::default() + } + ); + } +} + +/// OFFSET over OFFSET should be pushed to single CubeScan +#[tokio::test] +async fn cubescan_offset_offset() { + init_testing_logger(); + + let variants = vec![ + // language=PostgreSQL + r#" + SELECT + customer_gender + FROM ( + SELECT + customer_gender + FROM + KibanaSampleDataEcommerce + GROUP BY + 1 + OFFSET 3 + ) scan + OFFSET 2 + "#, + // language=PostgreSQL + r#" + SELECT + customer_gender + FROM ( + SELECT + customer_gender + FROM + KibanaSampleDataEcommerce + GROUP BY + 1 + OFFSET 2 + ) scan + OFFSET 3 + "#, + ]; + + for variant in variants { + let query_plan = + convert_select_to_query_plan(variant.to_string(), DatabaseProtocol::PostgreSQL).await; + + let logical_plan = query_plan.as_logical_plan(); + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![]), + dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]), + segments: Some(vec![]), + order: Some(vec![]), + offset: Some(5), + ..Default::default() + } + ); + } +} + +/// LIMIT OFFSET over LIMIT OFFSET should be pushed to single CubeScan with a proper values +#[tokio::test] +async fn cubescan_limit_offset_limit_offset() { + init_testing_logger(); + + let variants = vec![ + ( + // language=PostgreSQL + r#" + SELECT + customer_gender + FROM ( + SELECT + customer_gender + FROM + KibanaSampleDataEcommerce + GROUP BY + 1 + LIMIT 3 + OFFSET 3 + ) scan + LIMIT 2 + OFFSET 2 + "#, + 1, + ), + ( + // language=PostgreSQL + r#" + SELECT + customer_gender + FROM ( + SELECT + customer_gender + FROM + KibanaSampleDataEcommerce + GROUP BY + 1 + LIMIT 10 + OFFSET 3 + ) scan + LIMIT 2 + OFFSET 2 + "#, + 2, + ), + ]; + + for (variant, limit) in variants { + let query_plan = + convert_select_to_query_plan(variant.to_string(), DatabaseProtocol::PostgreSQL).await; + + let logical_plan = query_plan.as_logical_plan(); + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![]), + dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]), + segments: Some(vec![]), + order: Some(vec![]), + limit: Some(limit), + offset: Some(5), + ..Default::default() + } + ); + } +} diff --git a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs index fc6267d32c44e..96a2f7f273d4e 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs @@ -1417,3 +1417,98 @@ async fn wrapper_agg_dimension_over_limit() { .sql .contains("\"ungrouped\": true")); } + +/// Aggregation with falsy filter should NOT get pushed to CubeScan with limit=0 +/// This test currently produces WrappedSelect with WHERE FALSE, which is OK for our purposes +#[tokio::test] +async fn select_agg_where_false() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + "SELECT SUM(sumPrice) FROM KibanaSampleDataEcommerce WHERE 1 = 0".to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let physical_plan = query_plan.as_physical_plan().await.unwrap(); + println!( + "Physical plan: {}", + displayable(physical_plan.as_ref()).indent() + ); + + let logical_plan = query_plan.as_logical_plan(); + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![]), + segments: Some(vec![]), + dimensions: Some(vec![]), + order: Some(vec![]), + limit: None, + ungrouped: Some(true), + ..Default::default() + } + ); + + let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql; + + // Final query uses grouped query to Cube.js with WHERE FALSE, but without LIMIT 0 + assert!(!sql.contains("\"ungrouped\":")); + assert!(sql.contains(r#"\"expr\":\"FALSE\""#)); + assert!(sql.contains(r#""limit": 50000"#)); +} + +/// Aggregation(dimension) with falsy filter should NOT get pushed to CubeScan with limit=0 +/// This test currently produces WrappedSelect with WHERE FALSE, which is OK for our purposes +#[tokio::test] +async fn wrapper_dimension_agg_where_false() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" + SELECT + MAX(customer_gender) + FROM + KibanaSampleDataEcommerce + WHERE 1 = 0 + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let physical_plan = query_plan.as_physical_plan().await.unwrap(); + println!( + "Physical plan: {}", + displayable(physical_plan.as_ref()).indent() + ); + + let logical_plan = query_plan.as_logical_plan(); + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![]), + dimensions: Some(vec![]), + segments: Some(vec![]), + order: Some(vec![]), + limit: None, + ungrouped: Some(true), + ..Default::default() + } + ); + + let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql; + + // Final query uses grouped query to Cube.js with WHERE FALSE, but without LIMIT 0 + assert!(!sql.contains("\"ungrouped\":")); + assert!(sql.contains(r#"\"expr\":\"FALSE\""#)); + assert!(!sql.contains(r#""limit""#)); + assert!(sql.contains("LIMIT 50000")); +}