From 25d3f662e33aa75269e1532d0c238dfa398a3081 Mon Sep 17 00:00:00 2001 From: Pavel Tiunov Date: Mon, 16 Dec 2024 13:50:18 -0800 Subject: [PATCH] feat(tesseract): Make measure an entry point for multi-fact join for now and support template for params --- .../src/adapter/BaseQuery.js | 1 - .../src/adapter/BigqueryQuery.ts | 1 + .../src/adapter/SnowflakeQuery.ts | 1 + .../cubesqlplanner/src/plan/builder/select.rs | 35 +++++++++++- .../cubesqlplanner/src/plan/expression.rs | 19 +++++++ .../cubesqlplanner/src/plan/join.rs | 20 +++++-- .../src/planner/params_allocator.rs | 37 ++++++++++--- .../full_key_query_aggregate_planner.rs | 54 +++++++++++++------ .../multi_stage/member_query_planner.rs | 28 +++++----- .../multiplied_measures_query_planner.rs | 4 +- .../src/planner/query_properties.rs | 5 +- .../cubesqlplanner/src/planner/query_tools.rs | 4 +- .../src/planner/sql_templates/plan.rs | 29 ++++++++++ 13 files changed, 190 insertions(+), 48 deletions(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index 2c71d23778ea3..4c309d81ad264 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -3326,7 +3326,6 @@ export class BaseQuery { join_types: { inner: 'INNER', left: 'LEFT', - full: 'FULL', }, window_frame_types: { rows: 'ROWS', diff --git a/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts b/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts index 07cc1759f6a2d..5eff028b720b3 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts @@ -257,6 +257,7 @@ export class BigqueryQuery extends BaseQuery { templates.types.decimal = 'BIGDECIMAL({{ precision }},{{ scale }})'; templates.types.binary = 'BYTES'; templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM'; + templates.join_types.full = 'FULL'; return templates; } } diff --git a/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts b/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts index a59e139f9a2df..3970e67c526b2 100644 --- a/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts @@ -116,6 +116,7 @@ export class SnowflakeQuery extends BaseQuery { templates.expressions.interval = 'INTERVAL \'{{ interval }}\''; templates.expressions.timestamp_literal = '\'{{ value }}\'::timestamp_tz'; templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM'; + templates.join_types.full = 'FULL'; delete templates.types.interval; return templates; } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/select.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/select.rs index 4e143cb86f703..1a9482f68715f 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/select.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/select.rs @@ -1,8 +1,9 @@ use crate::plan::{ - AliasedExpr, Cte, Expr, Filter, From, MemberExpression, OrderBy, Schema, SchemaColumn, Select, - SingleAliasedSource, SingleSource, + AliasedExpr, Cte, Expr, Filter, From, MemberExpression, OrderBy, QualifiedColumnName, Schema, + SchemaColumn, Select, SingleAliasedSource, SingleSource, }; +use crate::plan::expression::FunctionExpression; use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory; use crate::planner::{BaseMember, VisitorContext}; use std::collections::HashMap; @@ -57,6 +58,36 @@ impl SelectBuilder { .add_column(SchemaColumn::new(alias.clone(), Some(member.full_name()))); } + pub fn add_projection_coalesce_member( + &mut self, + member: &Rc, + references: Vec, + alias: Option, + ) { + let alias = if let Some(alias) = alias { + alias + } else { + member.alias_name() + }; + + let expr = Expr::Function(FunctionExpression { + function: "COALESCE".to_string(), + arguments: references + .into_iter() + // TODO unwrap + .map(|r| Expr::Reference(r)) + .collect(), + }); + let aliased_expr = AliasedExpr { + expr, + alias: alias.clone(), + }; + + self.projection_columns.push(aliased_expr); + self.result_schema + .add_column(SchemaColumn::new(alias.clone(), Some(member.full_name()))); + } + pub fn set_filter(&mut self, filter: Option) { self.filter = filter; } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/expression.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/expression.rs index be13ad59816e7..34ff7816acb57 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/expression.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/expression.rs @@ -23,10 +23,17 @@ impl MemberExpression { } } +#[derive(Clone)] +pub struct FunctionExpression { + pub function: String, + pub arguments: Vec, +} + #[derive(Clone)] pub enum Expr { Member(MemberExpression), Reference(QualifiedColumnName), + Function(FunctionExpression), } impl Expr { @@ -46,6 +53,18 @@ impl Expr { Self::Reference(reference) => { templates.column_reference(reference.source(), &reference.name()) } + Expr::Function(FunctionExpression { + function, + arguments, + }) => templates.scalar_function( + function.to_string(), + arguments + .iter() + .map(|e| e.to_sql(&templates, context.clone())) + .collect::, _>>()?, + None, + None, + ), } } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs index 96d7355544c66..1ac747410d055 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs @@ -88,12 +88,13 @@ impl RollingWindowJoinCondition { } pub struct DimensionJoinCondition { - conditions: Vec<(Expr, Expr)>, + // AND (... OR ...) + conditions: Vec>, null_check: bool, } impl DimensionJoinCondition { - pub fn new(conditions: Vec<(Expr, Expr)>, null_check: bool) -> Self { + pub fn new(conditions: Vec>, null_check: bool) -> Self { Self { conditions, null_check, @@ -110,8 +111,17 @@ impl DimensionJoinCondition { } else { self.conditions .iter() - .map(|(left, right)| -> Result { - self.dimension_condition(templates, context.clone(), left, right) + .map(|or_conditions| -> Result<_, CubeError> { + Ok(format!( + "({})", + or_conditions + .iter() + .map(|(left, right)| -> Result { + self.dimension_condition(templates, context.clone(), left, right) + }) + .collect::, _>>()? + .join(" OR ") + )) }) .collect::, _>>()? .join(" AND ") @@ -139,7 +149,7 @@ pub enum JoinCondition { } impl JoinCondition { - pub fn new_dimension_join(conditions: Vec<(Expr, Expr)>, null_check: bool) -> Self { + pub fn new_dimension_join(conditions: Vec>, null_check: bool) -> Self { Self::DimensionJoinCondition(DimensionJoinCondition::new(conditions, null_check)) } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/params_allocator.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/params_allocator.rs index 47ffbd7268046..9699cbb8bd797 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/params_allocator.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/params_allocator.rs @@ -1,3 +1,4 @@ +use crate::planner::sql_templates::PlanSqlTemplates; use cubenativeutils::CubeError; use lazy_static::lazy_static; use regex::{Captures, Regex}; @@ -8,12 +9,16 @@ lazy_static! { static ref PARAMS_MATCH_RE: Regex = Regex::new(r"\$_(\d+)_\$").unwrap(); } pub struct ParamsAllocator { + sql_templates: PlanSqlTemplates, params: Vec, } impl ParamsAllocator { - pub fn new() -> ParamsAllocator { - ParamsAllocator { params: Vec::new() } + pub fn new(sql_templates: PlanSqlTemplates) -> ParamsAllocator { + ParamsAllocator { + sql_templates, + params: Vec::new(), + } } pub fn make_placeholder(&self, index: usize) -> String { @@ -38,6 +43,7 @@ impl ParamsAllocator { let (sql, params) = self.add_native_allocated_params(sql, &native_allocated_params)?; let mut params_in_sql_order = Vec::new(); let mut param_index_map: HashMap = HashMap::new(); + let mut error = None; let result_sql = if should_reuse_params { PARAMS_MATCH_RE .replace_all(&sql, |caps: &Captures| { @@ -45,24 +51,43 @@ impl ParamsAllocator { let new_index = if let Some(index) = param_index_map.get(&ind) { index.clone() } else { - params_in_sql_order.push(params[ind].clone()); let index = params_in_sql_order.len(); + params_in_sql_order.push(params[ind].clone()); param_index_map.insert(ind, index); index }; - format!("${}", new_index) //TODO get placeholder from js part + match self.sql_templates.param(new_index) { + Ok(res) => res, + Err(e) => { + if error.is_none() { + error = Some(e); + } + "$error$".to_string() + } + } }) .to_string() } else { PARAMS_MATCH_RE .replace_all(&sql, |caps: &Captures| { let ind: usize = caps[1].to_string().parse().unwrap(); - params_in_sql_order.push(params[ind].clone()); let index = params_in_sql_order.len(); - format!("${}", index) //TODO get placeholder from js part + params_in_sql_order.push(params[ind].clone()); + match self.sql_templates.param(index) { + Ok(res) => res, + Err(e) => { + if error.is_none() { + error = Some(e); + } + "$error$".to_string() + } + } }) .to_string() }; + if let Some(error) = error { + return Err(error); + } Ok((result_sql, params_in_sql_order)) } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/full_key_query_aggregate_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/full_key_query_aggregate_planner.rs index 4ec591e1baffc..fdb91795548d2 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/full_key_query_aggregate_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/full_key_query_aggregate_planner.rs @@ -60,7 +60,6 @@ impl FullKeyAggregateQueryPlanner { let mut join_builder = JoinBuilder::new_from_subselect(joins[0].clone(), format!("q_0")); let dimensions_to_select = self.query_properties.dimensions_for_select(); for (i, join) in joins.iter().enumerate().skip(1) { - let left_alias = format!("q_{}", i - 1); let right_alias = format!("q_{}", i); let left_schema = joins[i - 1].schema(); let right_schema = joins[i].schema(); @@ -68,25 +67,31 @@ impl FullKeyAggregateQueryPlanner { let conditions = dimensions_to_select .iter() .map(|dim| { - let alias_in_left_query = left_schema.resolve_member_alias(dim); - let left_ref = Expr::Reference(QualifiedColumnName::new( - Some(left_alias.clone()), - alias_in_left_query, - )); - let alias_in_right_query = right_schema.resolve_member_alias(dim); - let right_ref = Expr::Reference(QualifiedColumnName::new( - Some(right_alias.clone()), - alias_in_right_query, - )); - (left_ref, right_ref) + (0..i) + .map(|left_i| { + let left_alias = format!("q_{}", left_i); + let alias_in_left_query = left_schema.resolve_member_alias(dim); + let left_ref = Expr::Reference(QualifiedColumnName::new( + Some(left_alias.clone()), + alias_in_left_query, + )); + let alias_in_right_query = right_schema.resolve_member_alias(dim); + let right_ref = Expr::Reference(QualifiedColumnName::new( + Some(right_alias.clone()), + alias_in_right_query, + )); + (left_ref, right_ref) + }) + .collect::>() }) .collect_vec(); let on = JoinCondition::new_dimension_join(conditions, true); let next_alias = format!("q_{}", i); - if self.plan_sql_templates.supports_is_not_distinct_from() { - join_builder.inner_join_subselect(join.clone(), next_alias, on); - } else { + if self.plan_sql_templates.supports_full_join() { join_builder.full_join_subselect(join.clone(), next_alias, on); + } else { + // TODO in case of full join is not supported there should be correct blending query that keeps NULL values + join_builder.inner_join_subselect(join.clone(), next_alias, on); } } @@ -103,9 +108,26 @@ impl FullKeyAggregateQueryPlanner { &dimensions_source, &mut render_references, )?; + let references = (0..joins.len()) + .map(|i| { + let alias = format!("q_{}", i); + references_builder + .find_reference_for_member( + &member.member_evaluator().full_name(), + &Some(alias.clone()), + ) + .ok_or_else(|| { + CubeError::internal(format!( + "Reference for join not found for {} in {}", + member.member_evaluator().full_name(), + alias + )) + }) + }) + .collect::, _>>()?; let alias = references_builder .resolve_alias_for_member(&member.full_name(), &dimensions_source); - select_builder.add_projection_member(member, alias); + select_builder.add_projection_coalesce_member(member, references, alias); } for member in BaseMemberHelper::iter_as_base_member(&outer_measures) { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs index 5d906ecb83552..fd0c237164271 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs @@ -267,24 +267,28 @@ impl MultiStageMemberQueryPlanner { Some(root_alias.clone()), ); for (i, input) in inputs.iter().enumerate().skip(1) { - let left_alias = format!("q_{}", i - 1); let right_alias = format!("q_{}", i); let left_schema = cte_schemas.get(&inputs[i - 1]).unwrap().clone(); let cte_schema = cte_schemas.get(input).unwrap().clone(); let conditions = dimensions .iter() .map(|dim| { - let alias_in_left_query = left_schema.resolve_member_alias(dim); - let left_ref = Expr::Reference(QualifiedColumnName::new( - Some(left_alias.clone()), - alias_in_left_query, - )); - let alias_in_right_query = cte_schema.resolve_member_alias(dim); - let right_ref = Expr::Reference(QualifiedColumnName::new( - Some(right_alias.clone()), - alias_in_right_query, - )); - (left_ref, right_ref) + (0..i) + .map(|left_alias| { + let left_alias = format!("q_{}", left_alias); + let alias_in_left_query = left_schema.resolve_member_alias(dim); + let left_ref = Expr::Reference(QualifiedColumnName::new( + Some(left_alias.clone()), + alias_in_left_query, + )); + let alias_in_right_query = cte_schema.resolve_member_alias(dim); + let right_ref = Expr::Reference(QualifiedColumnName::new( + Some(right_alias.clone()), + alias_in_right_query, + )); + (left_ref, right_ref) + }) + .collect() }) .collect_vec(); let on = JoinCondition::new_dimension_join(conditions, true); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs index 5e4dcce5ab367..14ecfd86aa0e5 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs @@ -134,7 +134,7 @@ impl MultipliedMeasuresQueryPlanner { Some(pk_cube_alias.clone()), alias_in_subquery, )); - (keys_query_ref, subquery_ref) + vec![(keys_query_ref, subquery_ref)] }) .collect_vec(); @@ -165,7 +165,7 @@ impl MultipliedMeasuresQueryPlanner { alias_in_keys_query, )); let pk_cube_expr = Expr::Member(MemberExpression::new(dim.clone())); - (keys_query_ref, pk_cube_expr) + vec![(keys_query_ref, pk_cube_expr)] }) .collect_vec(); join_builder.left_join_cube( diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs index b14e1fc882b84..2b7c2d2f06d15 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs @@ -295,10 +295,9 @@ impl QueryProperties { .cached_data_mut() .join_hints_for_member(m.member_evaluator())?; let join = query_tools.cached_data_mut().join_by_hints( - dimension_and_filter_join_hints_concat - .clone() + vec![measure_join_hints] .into_iter() - .chain(vec![measure_join_hints].into_iter()) + .chain(dimension_and_filter_join_hints_concat.clone().into_iter()) .collect::>(), |hints| query_tools.join_graph().build_join(hints), )?; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs index ff1dfb9244ab0..5a1bfe3e4cd9c 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs @@ -8,6 +8,7 @@ use crate::cube_bridge::join_item::JoinItemStatic; use crate::cube_bridge::sql_templates_render::SqlTemplatesRender; use crate::plan::FilterItem; use crate::planner::sql_evaluator::collectors::collect_join_hints; +use crate::planner::sql_templates::PlanSqlTemplates; use chrono_tz::Tz; use convert_case::{Case, Casing}; use cubenativeutils::CubeError; @@ -144,12 +145,13 @@ impl QueryTools { } else { None }; + let sql_templates = PlanSqlTemplates::new(templates_render.clone()); Ok(Rc::new(Self { cube_evaluator, base_tools, join_graph, templates_render, - params_allocator: Rc::new(RefCell::new(ParamsAllocator::new())), + params_allocator: Rc::new(RefCell::new(ParamsAllocator::new(sql_templates))), evaluator_compiler, cached_data: RefCell::new(QueryToolsCachedData::new()), timezone, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/plan.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/plan.rs index d00ebafa4caba..04b716de0256d 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/plan.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/plan.rs @@ -224,8 +224,37 @@ impl PlanSqlTemplates { )) } + pub fn supports_full_join(&self) -> bool { + self.render.contains_template("join_types/full") + } + pub fn supports_is_not_distinct_from(&self) -> bool { self.render .contains_template("operators/is_not_distinct_from") } + + pub fn param(&self, param_index: usize) -> Result { + self.render + .render_template("params/param", context! { param_index => param_index }) + } + + pub fn scalar_function( + &self, + scalar_function: String, + args: Vec, + date_part: Option, + interval: Option, + ) -> Result { + let function = scalar_function.to_string().to_uppercase(); + let args_concat = args.join(", "); + self.render.render_template( + &format!("functions/{}", function), + context! { + args_concat => args_concat, + args => args, + date_part => date_part, + interval => interval, + }, + ) + } }