Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cubesql): Calculate proper limit and offset for CubeScan in nested limits case #8924

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 77 additions & 27 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@
"?members",
"?filters",
"?orders",
"?cube_fetch",
"?offset",
"?inner_fetch",
"?inner_skip",
"?split",
"?can_pushdown_join",
"CubeScanWrapped:false",
Expand All @@ -260,7 +260,14 @@
"CubeScanWrapped:false",
"?ungrouped",
),
self.push_down_limit("?skip", "?fetch", "?new_skip", "?new_fetch"),
self.push_down_limit(
"?skip",
"?fetch",
"?inner_skip",
"?inner_fetch",
"?new_skip",
"?new_fetch",
),
),
// MOD function to binary expr
transforming_rewrite_with_root(
Expand Down Expand Up @@ -1597,47 +1604,90 @@
&self,
skip_var: &'static str,
fetch_var: &'static str,
inner_skip_var: &'static str,
inner_fetch_var: &'static str,
new_skip_var: &'static str,
new_fetch_var: &'static str,
) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool {
let skip_var = var!(skip_var);
let fetch_var = var!(fetch_var);
let inner_skip_var = var!(inner_skip_var);
let inner_fetch_var = var!(inner_fetch_var);
let new_skip_var = var!(new_skip_var);
let new_fetch_var = var!(new_fetch_var);
move |egraph, subst| {
// This transform expects only single value in every (eclass, kind)
// No two different values of fetch or skip should ever get unified

let mut skip_value = None;
for skip in var_iter!(egraph[subst[skip_var]], LimitSkip) {
if skip.unwrap_or_default() > 0 {
skip_value = *skip;
break;
}
skip_value = *skip;
break;
}
let mut fetch_value = None;
for fetch in var_iter!(egraph[subst[fetch_var]], LimitFetch) {
if fetch.unwrap_or_default() > 0 {
fetch_value = *fetch;
break;
}
fetch_value = *fetch;
break;
}
// TODO support this case
if fetch_value == Some(0) {
// Broken and unsupported case for now
return false;
}

if skip_value.is_some() || fetch_value.is_some() {
subst.insert(
new_skip_var,
egraph.add(LogicalPlanLanguage::CubeScanOffset(CubeScanOffset(
skip_value,
))),
);
subst.insert(
new_fetch_var,
egraph.add(LogicalPlanLanguage::CubeScanLimit(CubeScanLimit(
fetch_value,
))),
);

return true;
let mut inner_skip_value = None;
for inner_skip in var_iter!(egraph[subst[inner_skip_var]], CubeScanOffset) {
inner_skip_value = *inner_skip;
break;
}

false
let mut inner_fetch_value = None;
for inner_fetch in var_iter!(egraph[subst[inner_fetch_var]], CubeScanLimit) {
inner_fetch_value = *inner_fetch;
break;
}

let new_skip = match (skip_value, inner_skip_value) {
(None, None) => None,
(Some(skip), None) | (None, Some(skip)) => Some(skip),
(Some(outer_skip), Some(inner_skip)) => Some(outer_skip + inner_skip),
};
// No need to set offset=0, it's same as no offset
let new_skip = if new_skip != Some(0) { new_skip } else { None };
let new_fetch = match (fetch_value, inner_fetch_value) {
(None, None) => None,
// Inner node have no limit, maybe just offset, result limit is same as for outer node
(Some(outer_fetch), None) => Some(outer_fetch),
// Outer node have no limit, but may have offset
// First, inner offset would apply
// Then inner node would limit rows
// Then outer offset would apply, which would yield no more than `inner_fetch - outer_skip` rows
(None, Some(inner_fetch)) => {
Some(inner_fetch.saturating_sub(skip_value.unwrap_or(0)))

Check warning on line 1666 in rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs#L1665-L1666

Added lines #L1665 - L1666 were not covered by tests
}
// Both nodes have a limit
// First, inner offset would apply
// Then inner node would limit rows
// Then outer offset would apply, which would yield no more than `in_limit - out_offset` rows
// Then outer limit would apply, which would yield no more than minimal of two
(Some(outer_fetch), Some(inner_fetch)) => Some(usize::min(
inner_fetch.saturating_sub(skip_value.unwrap_or(0)),
outer_fetch,
)),
};

subst.insert(
new_skip_var,
egraph.add(LogicalPlanLanguage::CubeScanOffset(CubeScanOffset(
new_skip,
))),
);
subst.insert(
new_fetch_var,
egraph.add(LogicalPlanLanguage::CubeScanLimit(CubeScanLimit(new_fetch))),
);

true
}
}

Expand Down
2 changes: 2 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub mod test_cube_join;
#[cfg(test)]
pub mod test_cube_join_grouped;
#[cfg(test)]
pub mod test_cube_scan;
#[cfg(test)]
pub mod test_df_execution;
#[cfg(test)]
pub mod test_introspection;
Expand Down
226 changes: 226 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/test_cube_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
use cubeclient::models::V1LoadRequestQuery;
use pretty_assertions::assert_eq;

use crate::compile::{
test::{convert_select_to_query_plan, init_testing_logger, utils::LogicalPlanTestUtils},
DatabaseProtocol,
};

/// LIMIT n OFFSET m should be pushed to CubeScan
#[tokio::test]
async fn cubescan_limit_offset() {
init_testing_logger();

let query_plan = convert_select_to_query_plan(
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
LIMIT 2
OFFSET 3
"#
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(2),
offset: Some(3),
..Default::default()
}
);
}

/// LIMIT over LIMIT should be pushed to single CubeScan
#[tokio::test]
async fn cubescan_limit_limit() {
init_testing_logger();

let variants = vec![
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
LIMIT 3
) scan
LIMIT 2
"#,
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
LIMIT 2
) scan
LIMIT 3
"#,
];

for variant in variants {
let query_plan =
convert_select_to_query_plan(variant.to_string(), DatabaseProtocol::PostgreSQL).await;

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(2),
..Default::default()
}
);
}
}

/// OFFSET over OFFSET should be pushed to single CubeScan
#[tokio::test]
async fn cubescan_offset_offset() {
init_testing_logger();

let variants = vec![
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
OFFSET 3
) scan
OFFSET 2
"#,
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
OFFSET 2
) scan
OFFSET 3
"#,
];

for variant in variants {
let query_plan =
convert_select_to_query_plan(variant.to_string(), DatabaseProtocol::PostgreSQL).await;

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
segments: Some(vec![]),
order: Some(vec![]),
offset: Some(5),
..Default::default()
}
);
}
}

/// LIMIT OFFSET over LIMIT OFFSET should be pushed to single CubeScan with a proper values
#[tokio::test]
async fn cubescan_limit_offset_limit_offset() {
init_testing_logger();

let variants = vec![
(
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
LIMIT 3
OFFSET 3
) scan
LIMIT 2
OFFSET 2
"#,
1,
),
(
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
GROUP BY
1
LIMIT 10
OFFSET 3
) scan
LIMIT 2
OFFSET 2
"#,
2,
),
];

for (variant, limit) in variants {
let query_plan =
convert_select_to_query_plan(variant.to_string(), DatabaseProtocol::PostgreSQL).await;

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec!["KibanaSampleDataEcommerce.customer_gender".to_string()]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(limit),
offset: Some(5),
..Default::default()
}
);
}
}
Loading
Loading