-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement semi/anti join output statistics estimation #9800
Merged
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -825,27 +825,27 @@ fn estimate_join_cardinality( | |
right_stats: Statistics, | ||
on: &JoinOn, | ||
) -> Option<PartialJoinStatistics> { | ||
let (left_col_stats, right_col_stats) = on | ||
.iter() | ||
.map(|(left, right)| { | ||
match ( | ||
left.as_any().downcast_ref::<Column>(), | ||
right.as_any().downcast_ref::<Column>(), | ||
) { | ||
(Some(left), Some(right)) => ( | ||
left_stats.column_statistics[left.index()].clone(), | ||
right_stats.column_statistics[right.index()].clone(), | ||
), | ||
_ => ( | ||
ColumnStatistics::new_unknown(), | ||
ColumnStatistics::new_unknown(), | ||
), | ||
} | ||
}) | ||
.unzip::<_, _, Vec<_>, Vec<_>>(); | ||
|
||
match join_type { | ||
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { | ||
let (left_col_stats, right_col_stats) = on | ||
.iter() | ||
.map(|(left, right)| { | ||
match ( | ||
left.as_any().downcast_ref::<Column>(), | ||
right.as_any().downcast_ref::<Column>(), | ||
) { | ||
(Some(left), Some(right)) => ( | ||
left_stats.column_statistics[left.index()].clone(), | ||
right_stats.column_statistics[right.index()].clone(), | ||
), | ||
_ => ( | ||
ColumnStatistics::new_unknown(), | ||
ColumnStatistics::new_unknown(), | ||
), | ||
} | ||
}) | ||
.unzip::<_, _, Vec<_>, Vec<_>>(); | ||
|
||
let ij_cardinality = estimate_inner_join_cardinality( | ||
Statistics { | ||
num_rows: left_stats.num_rows.clone(), | ||
|
@@ -888,10 +888,45 @@ fn estimate_join_cardinality( | |
}) | ||
} | ||
|
||
JoinType::LeftSemi | ||
| JoinType::RightSemi | ||
| JoinType::LeftAnti | ||
| JoinType::RightAnti => None, | ||
JoinType::LeftSemi | JoinType::LeftAnti => { | ||
let cardinality = estimate_semi_join_cardinality( | ||
Statistics { | ||
num_rows: left_stats.num_rows.clone(), | ||
total_byte_size: Precision::Absent, | ||
column_statistics: left_col_stats, | ||
}, | ||
Statistics { | ||
num_rows: right_stats.num_rows.clone(), | ||
total_byte_size: Precision::Absent, | ||
column_statistics: right_col_stats, | ||
}, | ||
)?; | ||
|
||
Some(PartialJoinStatistics { | ||
num_rows: *cardinality.get_value()?, | ||
column_statistics: left_stats.column_statistics, | ||
}) | ||
} | ||
|
||
JoinType::RightSemi | JoinType::RightAnti => { | ||
let cardinality = estimate_semi_join_cardinality( | ||
Statistics { | ||
num_rows: right_stats.num_rows.clone(), | ||
total_byte_size: Precision::Absent, | ||
column_statistics: right_col_stats, | ||
}, | ||
Statistics { | ||
num_rows: left_stats.num_rows.clone(), | ||
total_byte_size: Precision::Absent, | ||
column_statistics: left_col_stats, | ||
}, | ||
)?; | ||
|
||
Some(PartialJoinStatistics { | ||
num_rows: *cardinality.get_value()?, | ||
column_statistics: right_stats.column_statistics, | ||
}) | ||
} | ||
} | ||
} | ||
|
||
|
@@ -903,6 +938,11 @@ fn estimate_inner_join_cardinality( | |
left_stats: Statistics, | ||
right_stats: Statistics, | ||
) -> Option<Precision<usize>> { | ||
// Immediatedly return if inputs considered as non-overlapping | ||
if let Some(estimation) = estimate_disjoint_inputs(&left_stats, &right_stats) { | ||
return Some(estimation); | ||
}; | ||
|
||
// The algorithm here is partly based on the non-histogram selectivity estimation | ||
// from Spark's Catalyst optimizer. | ||
let mut join_selectivity = Precision::Absent; | ||
|
@@ -911,30 +951,13 @@ fn estimate_inner_join_cardinality( | |
.iter() | ||
.zip(right_stats.column_statistics.iter()) | ||
{ | ||
// If there is no overlap in any of the join columns, this means the join | ||
// itself is disjoint and the cardinality is 0. Though we can only assume | ||
// this when the statistics are exact (since it is a very strong assumption). | ||
if left_stat.min_value.get_value()? > right_stat.max_value.get_value()? { | ||
return Some( | ||
if left_stat.min_value.is_exact().unwrap_or(false) | ||
&& right_stat.max_value.is_exact().unwrap_or(false) | ||
{ | ||
Precision::Exact(0) | ||
} else { | ||
Precision::Inexact(0) | ||
}, | ||
); | ||
} | ||
if left_stat.max_value.get_value()? < right_stat.min_value.get_value()? { | ||
return Some( | ||
if left_stat.max_value.is_exact().unwrap_or(false) | ||
&& right_stat.min_value.is_exact().unwrap_or(false) | ||
{ | ||
Precision::Exact(0) | ||
} else { | ||
Precision::Inexact(0) | ||
}, | ||
); | ||
// Break if any of statistics bounds are undefined | ||
if left_stat.min_value.get_value().is_none() | ||
|| left_stat.max_value.get_value().is_none() | ||
|| right_stat.min_value.get_value().is_none() | ||
|| right_stat.max_value.get_value().is_none() | ||
{ | ||
return None; | ||
} | ||
|
||
let left_max_distinct = max_distinct_count(&left_stats.num_rows, left_stat); | ||
|
@@ -968,6 +991,78 @@ fn estimate_inner_join_cardinality( | |
} | ||
} | ||
|
||
/// Estimates semi join cardinality based on statistics. | ||
/// | ||
/// The estimation result is either zero, in cases inputs statistics are non-overlapping | ||
/// or equal to number of rows for outer input. | ||
fn estimate_semi_join_cardinality( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Long term it would be really nice to pull these types of calculations into some trait (aka an extensibility API) |
||
outer_stats: Statistics, | ||
inner_stats: Statistics, | ||
) -> Option<Precision<usize>> { | ||
// Immediatedly return if inputs considered as non-overlapping | ||
if let Some(estimation) = estimate_disjoint_inputs(&outer_stats, &inner_stats) { | ||
return Some(estimation); | ||
}; | ||
|
||
// Otherwise estimate SemiJoin output as whole outer side | ||
outer_stats | ||
.num_rows | ||
.get_value() | ||
.map(|val| Precision::Inexact(*val)) | ||
} | ||
|
||
/// Estimates if inputs are non-overlapping, using input statistics. | ||
/// If inputs are disjoint, returns zero estimation, otherwise returns None | ||
fn estimate_disjoint_inputs( | ||
left_stats: &Statistics, | ||
right_stats: &Statistics, | ||
) -> Option<Precision<usize>> { | ||
for (left_stat, right_stat) in left_stats | ||
.column_statistics | ||
.iter() | ||
.zip(right_stats.column_statistics.iter()) | ||
{ | ||
// If there is no overlap in any of the join columns, this means the join | ||
// itself is disjoint and the cardinality is 0. Though we can only assume | ||
// this when the statistics are exact (since it is a very strong assumption). | ||
let left_min_val = left_stat.min_value.get_value(); | ||
let right_max_val = right_stat.max_value.get_value(); | ||
if left_min_val.is_some() | ||
&& right_max_val.is_some() | ||
&& left_min_val > right_max_val | ||
{ | ||
return Some( | ||
if left_stat.min_value.is_exact().unwrap_or(false) | ||
&& right_stat.max_value.is_exact().unwrap_or(false) | ||
{ | ||
Precision::Exact(0) | ||
} else { | ||
Precision::Inexact(0) | ||
}, | ||
); | ||
} | ||
|
||
let left_max_val = left_stat.max_value.get_value(); | ||
let right_min_val = right_stat.min_value.get_value(); | ||
if left_max_val.is_some() | ||
&& right_min_val.is_some() | ||
&& left_max_val < right_min_val | ||
{ | ||
return Some( | ||
if left_stat.max_value.is_exact().unwrap_or(false) | ||
&& right_stat.min_value.is_exact().unwrap_or(false) | ||
{ | ||
Precision::Exact(0) | ||
} else { | ||
Precision::Inexact(0) | ||
}, | ||
); | ||
} | ||
} | ||
|
||
None | ||
} | ||
|
||
/// Estimate the number of maximum distinct values that can be present in the | ||
/// given column from its statistics. If distinct_count is available, uses it | ||
/// directly. Otherwise, if the column is numeric and has min/max values, it | ||
|
@@ -1716,9 +1811,11 @@ mod tests { | |
#[test] | ||
fn test_inner_join_cardinality_single_column() -> Result<()> { | ||
let cases: Vec<(PartialStats, PartialStats, Option<Precision<usize>>)> = vec![ | ||
// ----------------------------------------------------------------------------- | ||
// | left(rows, min, max, distinct), right(rows, min, max, distinct), expected | | ||
// ----------------------------------------------------------------------------- | ||
// ------------------------------------------------ | ||
// | left(rows, min, max, distinct, null_count), | | ||
// | right(rows, min, max, distinct, null_count), | | ||
// | expected, | | ||
// ------------------------------------------------ | ||
|
||
// Cardinality computation | ||
// ======================= | ||
|
@@ -1824,6 +1921,11 @@ mod tests { | |
None, | ||
), | ||
// Non overlapping min/max (when exact=False). | ||
( | ||
(10, Absent, Inexact(4), Absent, Absent), | ||
(10, Inexact(5), Absent, Absent, Absent), | ||
Some(Inexact(0)), | ||
), | ||
( | ||
(10, Inexact(0), Inexact(10), Absent, Absent), | ||
(10, Inexact(11), Inexact(20), Absent, Absent), | ||
|
@@ -2106,6 +2208,136 @@ mod tests { | |
Ok(()) | ||
} | ||
|
||
#[test] | ||
fn estimate_semi_join_cardinality_absent_rows() -> Result<()> { | ||
let cases: Vec<(PartialStats, PartialStats, Option<Precision<usize>>)> = vec![ | ||
// ------------------------------------------------ | ||
// | outer(rows, min, max, distinct, null_count), | | ||
// | inner(rows, min, max, distinct, null_count), | | ||
// | expected, | | ||
// ------------------------------------------------ | ||
|
||
// Cardinality computation | ||
// ======================= | ||
// | ||
// distinct(left) == NaN, distinct(right) == NaN | ||
( | ||
(50, Inexact(10), Inexact(20), Absent, Absent), | ||
(10, Inexact(15), Inexact(25), Absent, Absent), | ||
Some(Inexact(50)), | ||
), | ||
( | ||
(10, Absent, Absent, Absent, Absent), | ||
(50, Absent, Absent, Absent, Absent), | ||
Some(Inexact(10)), | ||
), | ||
( | ||
(50, Inexact(10), Inexact(20), Absent, Absent), | ||
(10, Inexact(30), Inexact(40), Absent, Absent), | ||
Some(Inexact(0)), | ||
), | ||
( | ||
(50, Inexact(10), Absent, Absent, Absent), | ||
(10, Absent, Inexact(5), Absent, Absent), | ||
Some(Inexact(0)), | ||
), | ||
( | ||
(50, Absent, Inexact(20), Absent, Absent), | ||
(10, Inexact(30), Absent, Absent, Absent), | ||
Some(Inexact(0)), | ||
), | ||
]; | ||
|
||
for (outer_info, inner_info, expected_cardinality) in cases { | ||
let outer_num_rows = outer_info.0; | ||
let outer_col_stats = vec![create_column_stats( | ||
outer_info.1, | ||
outer_info.2, | ||
outer_info.3, | ||
outer_info.4, | ||
)]; | ||
|
||
let inner_num_rows = inner_info.0; | ||
let inner_col_stats = vec![create_column_stats( | ||
inner_info.1, | ||
inner_info.2, | ||
inner_info.3, | ||
inner_info.4, | ||
)]; | ||
|
||
assert_eq!( | ||
estimate_semi_join_cardinality( | ||
Statistics { | ||
num_rows: Inexact(outer_num_rows), | ||
total_byte_size: Absent, | ||
column_statistics: outer_col_stats, | ||
}, | ||
Statistics { | ||
num_rows: Inexact(inner_num_rows), | ||
total_byte_size: Absent, | ||
column_statistics: inner_col_stats, | ||
}, | ||
), | ||
expected_cardinality | ||
); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
#[test] | ||
fn test_semi_join_cardinality() -> Result<()> { | ||
let dummy_column_stats = | ||
vec![create_column_stats(Absent, Absent, Absent, Absent)]; | ||
|
||
let absent_outer_estimation = estimate_semi_join_cardinality( | ||
Statistics { | ||
num_rows: Absent, | ||
total_byte_size: Absent, | ||
column_statistics: dummy_column_stats.clone(), | ||
}, | ||
Statistics { | ||
num_rows: Exact(10), | ||
total_byte_size: Absent, | ||
column_statistics: dummy_column_stats.clone(), | ||
}, | ||
); | ||
assert_eq!( | ||
absent_outer_estimation, None, | ||
"Expected \"None\" esimated SemiJoin cardinality for absent outer num_rows" | ||
); | ||
|
||
let absent_inner_estimation = estimate_semi_join_cardinality( | ||
Statistics { | ||
num_rows: Inexact(500), | ||
total_byte_size: Absent, | ||
column_statistics: dummy_column_stats.clone(), | ||
}, | ||
Statistics { | ||
num_rows: Absent, | ||
total_byte_size: Absent, | ||
column_statistics: dummy_column_stats.clone(), | ||
}, | ||
); | ||
assert_eq!(absent_inner_estimation, Some(Inexact(500)), "Expected outer.num_rows esimated SemiJoin cardinality for absent inner num_rows"); | ||
|
||
let absent_inner_estimation = estimate_semi_join_cardinality( | ||
Statistics { | ||
num_rows: Absent, | ||
total_byte_size: Absent, | ||
column_statistics: dummy_column_stats.clone(), | ||
}, | ||
Statistics { | ||
num_rows: Absent, | ||
total_byte_size: Absent, | ||
column_statistics: dummy_column_stats.clone(), | ||
}, | ||
); | ||
assert_eq!(absent_inner_estimation, None, "Expected \"None\" esimated SemiJoin cardinality for absent outer and inner num_rows"); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[test] | ||
fn test_calculate_join_output_ordering() -> Result<()> { | ||
let options = SortOptions::default(); | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't seem correct to me that the same calculation is used for both Semi and Anti joins (shouldn't they be the inverse of each other?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, they were not correct. I've changed estimations a bit -- now disjoint statistics affects only semi-joins (filtering outer table should produce zero rows). For anti-joins, disjoint inputs don't seem to make much sense -- if statistics are non-overlapping the result will be equal to outer num_rows side, otherwise (having no info or overlapping statistics) -- it still will be estimated as outer side, since we know nothing about actual distribution besides min/max, and assuming that all rows will be filtered out is too much (may significantly affect further planning)