Skip to content

Commit

Permalink
feat(tesseract): Make measure an entry point for multi-fact join for …
Browse files Browse the repository at this point in the history
…now and support template for params
  • Loading branch information
paveltiunov committed Dec 16, 2024
1 parent 64104c5 commit 25d3f66
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 48 deletions.
1 change: 0 additions & 1 deletion packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -3326,7 +3326,6 @@ export class BaseQuery {
join_types: {
inner: 'INNER',
left: 'LEFT',
full: 'FULL',
},
window_frame_types: {
rows: 'ROWS',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ export class BigqueryQuery extends BaseQuery {
templates.types.decimal = 'BIGDECIMAL({{ precision }},{{ scale }})';
templates.types.binary = 'BYTES';
templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM';
templates.join_types.full = 'FULL';
return templates;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export class SnowflakeQuery extends BaseQuery {
templates.expressions.interval = 'INTERVAL \'{{ interval }}\'';
templates.expressions.timestamp_literal = '\'{{ value }}\'::timestamp_tz';
templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM';
templates.join_types.full = 'FULL';
delete templates.types.interval;
return templates;
}
Expand Down
35 changes: 33 additions & 2 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/builder/select.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::plan::{
AliasedExpr, Cte, Expr, Filter, From, MemberExpression, OrderBy, Schema, SchemaColumn, Select,
SingleAliasedSource, SingleSource,
AliasedExpr, Cte, Expr, Filter, From, MemberExpression, OrderBy, QualifiedColumnName, Schema,
SchemaColumn, Select, SingleAliasedSource, SingleSource,
};

use crate::plan::expression::FunctionExpression;
use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory;
use crate::planner::{BaseMember, VisitorContext};
use std::collections::HashMap;
Expand Down Expand Up @@ -57,6 +58,36 @@ impl SelectBuilder {
.add_column(SchemaColumn::new(alias.clone(), Some(member.full_name())));
}

pub fn add_projection_coalesce_member(
&mut self,
member: &Rc<dyn BaseMember>,
references: Vec<QualifiedColumnName>,
alias: Option<String>,
) {
let alias = if let Some(alias) = alias {
alias
} else {
member.alias_name()
};

let expr = Expr::Function(FunctionExpression {
function: "COALESCE".to_string(),
arguments: references
.into_iter()
// TODO unwrap
.map(|r| Expr::Reference(r))
.collect(),
});
let aliased_expr = AliasedExpr {
expr,
alias: alias.clone(),
};

self.projection_columns.push(aliased_expr);
self.result_schema
.add_column(SchemaColumn::new(alias.clone(), Some(member.full_name())));
}

pub fn set_filter(&mut self, filter: Option<Filter>) {
self.filter = filter;
}
Expand Down
19 changes: 19 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@ impl MemberExpression {
}
}

#[derive(Clone)]
pub struct FunctionExpression {
pub function: String,
pub arguments: Vec<Expr>,
}

#[derive(Clone)]
pub enum Expr {
Member(MemberExpression),
Reference(QualifiedColumnName),
Function(FunctionExpression),
}

impl Expr {
Expand All @@ -46,6 +53,18 @@ impl Expr {
Self::Reference(reference) => {
templates.column_reference(reference.source(), &reference.name())
}
Expr::Function(FunctionExpression {
function,
arguments,
}) => templates.scalar_function(
function.to_string(),
arguments
.iter()
.map(|e| e.to_sql(&templates, context.clone()))
.collect::<Result<Vec<_>, _>>()?,
None,
None,
),
}
}
}
20 changes: 15 additions & 5 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,13 @@ impl RollingWindowJoinCondition {
}

pub struct DimensionJoinCondition {
conditions: Vec<(Expr, Expr)>,
// AND (... OR ...)
conditions: Vec<Vec<(Expr, Expr)>>,
null_check: bool,
}

impl DimensionJoinCondition {
pub fn new(conditions: Vec<(Expr, Expr)>, null_check: bool) -> Self {
pub fn new(conditions: Vec<Vec<(Expr, Expr)>>, null_check: bool) -> Self {
Self {
conditions,
null_check,
Expand All @@ -110,8 +111,17 @@ impl DimensionJoinCondition {
} else {
self.conditions
.iter()
.map(|(left, right)| -> Result<String, CubeError> {
self.dimension_condition(templates, context.clone(), left, right)
.map(|or_conditions| -> Result<_, CubeError> {
Ok(format!(
"({})",
or_conditions
.iter()
.map(|(left, right)| -> Result<String, CubeError> {
self.dimension_condition(templates, context.clone(), left, right)
})
.collect::<Result<Vec<_>, _>>()?
.join(" OR ")
))
})
.collect::<Result<Vec<_>, _>>()?
.join(" AND ")
Expand Down Expand Up @@ -139,7 +149,7 @@ pub enum JoinCondition {
}

impl JoinCondition {
pub fn new_dimension_join(conditions: Vec<(Expr, Expr)>, null_check: bool) -> Self {
pub fn new_dimension_join(conditions: Vec<Vec<(Expr, Expr)>>, null_check: bool) -> Self {
Self::DimensionJoinCondition(DimensionJoinCondition::new(conditions, null_check))
}

Expand Down
37 changes: 31 additions & 6 deletions rust/cubesqlplanner/cubesqlplanner/src/planner/params_allocator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::planner::sql_templates::PlanSqlTemplates;
use cubenativeutils::CubeError;
use lazy_static::lazy_static;
use regex::{Captures, Regex};
Expand All @@ -8,12 +9,16 @@ lazy_static! {
static ref PARAMS_MATCH_RE: Regex = Regex::new(r"\$_(\d+)_\$").unwrap();
}
pub struct ParamsAllocator {
sql_templates: PlanSqlTemplates,
params: Vec<String>,
}

impl ParamsAllocator {
pub fn new() -> ParamsAllocator {
ParamsAllocator { params: Vec::new() }
pub fn new(sql_templates: PlanSqlTemplates) -> ParamsAllocator {
ParamsAllocator {
sql_templates,
params: Vec::new(),
}
}

pub fn make_placeholder(&self, index: usize) -> String {
Expand All @@ -38,31 +43,51 @@ impl ParamsAllocator {
let (sql, params) = self.add_native_allocated_params(sql, &native_allocated_params)?;
let mut params_in_sql_order = Vec::new();
let mut param_index_map: HashMap<usize, usize> = HashMap::new();
let mut error = None;
let result_sql = if should_reuse_params {
PARAMS_MATCH_RE
.replace_all(&sql, |caps: &Captures| {
let ind: usize = caps[1].to_string().parse().unwrap();
let new_index = if let Some(index) = param_index_map.get(&ind) {
index.clone()
} else {
params_in_sql_order.push(params[ind].clone());
let index = params_in_sql_order.len();
params_in_sql_order.push(params[ind].clone());
param_index_map.insert(ind, index);
index
};
format!("${}", new_index) //TODO get placeholder from js part
match self.sql_templates.param(new_index) {
Ok(res) => res,
Err(e) => {
if error.is_none() {
error = Some(e);
}
"$error$".to_string()
}
}
})
.to_string()
} else {
PARAMS_MATCH_RE
.replace_all(&sql, |caps: &Captures| {
let ind: usize = caps[1].to_string().parse().unwrap();
params_in_sql_order.push(params[ind].clone());
let index = params_in_sql_order.len();
format!("${}", index) //TODO get placeholder from js part
params_in_sql_order.push(params[ind].clone());
match self.sql_templates.param(index) {
Ok(res) => res,
Err(e) => {
if error.is_none() {
error = Some(e);
}
"$error$".to_string()
}
}
})
.to_string()
};
if let Some(error) = error {
return Err(error);
}
Ok((result_sql, params_in_sql_order))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,33 +60,38 @@ impl FullKeyAggregateQueryPlanner {
let mut join_builder = JoinBuilder::new_from_subselect(joins[0].clone(), format!("q_0"));
let dimensions_to_select = self.query_properties.dimensions_for_select();
for (i, join) in joins.iter().enumerate().skip(1) {
let left_alias = format!("q_{}", i - 1);
let right_alias = format!("q_{}", i);
let left_schema = joins[i - 1].schema();
let right_schema = joins[i].schema();
// TODO every next join should join to all previous dimensions through OR: q_0.a = q_1.a, q_0.a = q_2.a OR q_1.a = q_2.a, ...
let conditions = dimensions_to_select
.iter()
.map(|dim| {
let alias_in_left_query = left_schema.resolve_member_alias(dim);
let left_ref = Expr::Reference(QualifiedColumnName::new(
Some(left_alias.clone()),
alias_in_left_query,
));
let alias_in_right_query = right_schema.resolve_member_alias(dim);
let right_ref = Expr::Reference(QualifiedColumnName::new(
Some(right_alias.clone()),
alias_in_right_query,
));
(left_ref, right_ref)
(0..i)
.map(|left_i| {
let left_alias = format!("q_{}", left_i);
let alias_in_left_query = left_schema.resolve_member_alias(dim);
let left_ref = Expr::Reference(QualifiedColumnName::new(
Some(left_alias.clone()),
alias_in_left_query,
));
let alias_in_right_query = right_schema.resolve_member_alias(dim);
let right_ref = Expr::Reference(QualifiedColumnName::new(
Some(right_alias.clone()),
alias_in_right_query,
));
(left_ref, right_ref)
})
.collect::<Vec<_>>()
})
.collect_vec();
let on = JoinCondition::new_dimension_join(conditions, true);
let next_alias = format!("q_{}", i);
if self.plan_sql_templates.supports_is_not_distinct_from() {
join_builder.inner_join_subselect(join.clone(), next_alias, on);
} else {
if self.plan_sql_templates.supports_full_join() {
join_builder.full_join_subselect(join.clone(), next_alias, on);
} else {
// TODO in case of full join is not supported there should be correct blending query that keeps NULL values
join_builder.inner_join_subselect(join.clone(), next_alias, on);
}
}

Expand All @@ -103,9 +108,26 @@ impl FullKeyAggregateQueryPlanner {
&dimensions_source,
&mut render_references,
)?;
let references = (0..joins.len())
.map(|i| {
let alias = format!("q_{}", i);
references_builder
.find_reference_for_member(
&member.member_evaluator().full_name(),
&Some(alias.clone()),
)
.ok_or_else(|| {
CubeError::internal(format!(
"Reference for join not found for {} in {}",
member.member_evaluator().full_name(),
alias
))
})
})
.collect::<Result<Vec<_>, _>>()?;
let alias = references_builder
.resolve_alias_for_member(&member.full_name(), &dimensions_source);
select_builder.add_projection_member(member, alias);
select_builder.add_projection_coalesce_member(member, references, alias);
}

for member in BaseMemberHelper::iter_as_base_member(&outer_measures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,24 +267,28 @@ impl MultiStageMemberQueryPlanner {
Some(root_alias.clone()),
);
for (i, input) in inputs.iter().enumerate().skip(1) {
let left_alias = format!("q_{}", i - 1);
let right_alias = format!("q_{}", i);
let left_schema = cte_schemas.get(&inputs[i - 1]).unwrap().clone();
let cte_schema = cte_schemas.get(input).unwrap().clone();
let conditions = dimensions
.iter()
.map(|dim| {
let alias_in_left_query = left_schema.resolve_member_alias(dim);
let left_ref = Expr::Reference(QualifiedColumnName::new(
Some(left_alias.clone()),
alias_in_left_query,
));
let alias_in_right_query = cte_schema.resolve_member_alias(dim);
let right_ref = Expr::Reference(QualifiedColumnName::new(
Some(right_alias.clone()),
alias_in_right_query,
));
(left_ref, right_ref)
(0..i)
.map(|left_alias| {
let left_alias = format!("q_{}", left_alias);
let alias_in_left_query = left_schema.resolve_member_alias(dim);
let left_ref = Expr::Reference(QualifiedColumnName::new(
Some(left_alias.clone()),
alias_in_left_query,
));
let alias_in_right_query = cte_schema.resolve_member_alias(dim);
let right_ref = Expr::Reference(QualifiedColumnName::new(
Some(right_alias.clone()),
alias_in_right_query,
));
(left_ref, right_ref)
})
.collect()
})
.collect_vec();
let on = JoinCondition::new_dimension_join(conditions, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl MultipliedMeasuresQueryPlanner {
Some(pk_cube_alias.clone()),
alias_in_subquery,
));
(keys_query_ref, subquery_ref)
vec![(keys_query_ref, subquery_ref)]
})
.collect_vec();

Expand Down Expand Up @@ -165,7 +165,7 @@ impl MultipliedMeasuresQueryPlanner {
alias_in_keys_query,
));
let pk_cube_expr = Expr::Member(MemberExpression::new(dim.clone()));
(keys_query_ref, pk_cube_expr)
vec![(keys_query_ref, pk_cube_expr)]
})
.collect_vec();
join_builder.left_join_cube(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,9 @@ impl QueryProperties {
.cached_data_mut()
.join_hints_for_member(m.member_evaluator())?;
let join = query_tools.cached_data_mut().join_by_hints(
dimension_and_filter_join_hints_concat
.clone()
vec![measure_join_hints]
.into_iter()
.chain(vec![measure_join_hints].into_iter())
.chain(dimension_and_filter_join_hints_concat.clone().into_iter())
.collect::<Vec<_>>(),
|hints| query_tools.join_graph().build_join(hints),
)?;
Expand Down
Loading

0 comments on commit 25d3f66

Please sign in to comment.