Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
waralexrom committed Jan 20, 2025
1 parent a2ba03d commit df31e89
Show file tree
Hide file tree
Showing 15 changed files with 400 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub struct RollingWindow {
pub trailing: Option<String>,
pub leading: Option<String>,
pub offset: Option<String>,
#[serde(rename = "type")]
pub rolling_type: Option<String>,
pub granularity: Option<String>,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down
77 changes: 71 additions & 6 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
use super::{Expr, SingleAliasedSource};
use crate::planner::query_tools::QueryTools;
use crate::planner::sql_templates::PlanSqlTemplates;
use crate::planner::{BaseJoinCondition, VisitorContext};
use cubenativeutils::CubeError;
use lazy_static::lazy_static;

use std::rc::Rc;

pub struct RollingWindowJoinCondition {
pub struct RegularRollingWindowJoinCondition {
time_series_source: String,
trailing_interval: Option<String>,
leading_interval: Option<String>,
offset: String,
time_dimension: Expr,
}

impl RollingWindowJoinCondition {
impl RegularRollingWindowJoinCondition {
pub fn new(
time_series_source: String,
trailing_interval: Option<String>,
Expand Down Expand Up @@ -87,6 +88,50 @@ impl RollingWindowJoinCondition {
}
}

pub struct ToDateRollingWindowJoinCondition {
time_series_source: String,
granularity: String,
time_dimension: Expr,
query_tools: Rc<QueryTools>,
}

impl ToDateRollingWindowJoinCondition {
pub fn new(
time_series_source: String,
granularity: String,
time_dimension: Expr,
query_tools: Rc<QueryTools>,
) -> Self {
Self {
time_series_source,
granularity,
time_dimension,
query_tools,
}
}

pub fn to_sql(
&self,
templates: &PlanSqlTemplates,
context: Rc<VisitorContext>,
) -> Result<String, CubeError> {
let date_column = self.time_dimension.to_sql(templates, context)?;

//(dateFrom, dateTo, dateField, dimensionDateFrom, dimensionDateTo, isFromStartToEnd) => `${dateField} >= ${this.timeGroupedColumn(granularity, dateFrom)} AND ${dateField} <= ${dateTo}`

let date_from =
templates.column_reference(&Some(self.time_series_source.clone()), "date_to")?;
let date_to =
templates.column_reference(&Some(self.time_series_source.clone()), "date_from")?;
let grouped_from = self
.query_tools
.base_tools()
.time_grouped_column(self.granularity.clone(), date_from)?;
let result = format!("{date_column} >= {grouped_from} and {date_column} <= {date_to}");
Ok(result)
}
}

pub struct DimensionJoinCondition {
// AND (... OR ...)
conditions: Vec<Vec<(Expr, Expr)>>,
Expand Down Expand Up @@ -145,22 +190,23 @@ impl DimensionJoinCondition {
pub enum JoinCondition {
DimensionJoinCondition(DimensionJoinCondition),
BaseJoinCondition(Rc<dyn BaseJoinCondition>),
RollingWindowJoinCondition(RollingWindowJoinCondition),
RegularRollingWindowJoinCondition(RegularRollingWindowJoinCondition),
ToDateRollingWindowJoinCondition(ToDateRollingWindowJoinCondition),
}

impl JoinCondition {
pub fn new_dimension_join(conditions: Vec<Vec<(Expr, Expr)>>, null_check: bool) -> Self {
Self::DimensionJoinCondition(DimensionJoinCondition::new(conditions, null_check))
}

pub fn new_rolling_join(
pub fn new_regular_rolling_join(
time_series_source: String,
trailing_interval: Option<String>,
leading_interval: Option<String>,
offset: String,
time_dimension: Expr,
) -> Self {
Self::RollingWindowJoinCondition(RollingWindowJoinCondition::new(
Self::RegularRollingWindowJoinCondition(RegularRollingWindowJoinCondition::new(
time_series_source,
trailing_interval,
leading_interval,
Expand All @@ -169,6 +215,20 @@ impl JoinCondition {
))
}

pub fn new_to_date_rolling_join(
time_series_source: String,
granularity: String,
time_dimension: Expr,
query_tools: Rc<QueryTools>,
) -> Self {
Self::ToDateRollingWindowJoinCondition(ToDateRollingWindowJoinCondition::new(
time_series_source,
granularity,
time_dimension,
query_tools,
))
}

pub fn new_base_join(base: Rc<dyn BaseJoinCondition>) -> Self {
Self::BaseJoinCondition(base)
}
Expand All @@ -181,7 +241,12 @@ impl JoinCondition {
match &self {
JoinCondition::DimensionJoinCondition(cond) => cond.to_sql(templates, context),
JoinCondition::BaseJoinCondition(cond) => cond.to_sql(context, templates),
JoinCondition::RollingWindowJoinCondition(cond) => cond.to_sql(templates, context),
JoinCondition::RegularRollingWindowJoinCondition(cond) => {
cond.to_sql(templates, context)
}
JoinCondition::ToDateRollingWindowJoinCondition(cond) => {
cond.to_sql(templates, context)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/cubesqlplanner/cubesqlplanner/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use cte::Cte;
pub use expression::{Expr, MemberExpression};
pub use filter::{Filter, FilterGroup, FilterItem};
pub use from::{From, FromSource, SingleAliasedSource, SingleSource};
pub use join::{Join, JoinCondition, JoinItem, RollingWindowJoinCondition};
pub use join::{Join, JoinCondition, JoinItem, RegularRollingWindowJoinCondition};
pub use order::OrderBy;
pub use query_plan::QueryPlan;
pub use schema::{QualifiedColumnName, Schema, SchemaColumn};
Expand Down
4 changes: 4 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/planner/base_member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl BaseMemberHelper {
members.iter().map(|m| m.alias_name()).collect_vec()

Check warning on line 44 in rust/cubesqlplanner/cubesqlplanner/src/planner/base_member.rs

View workflow job for this annotation

GitHub Actions / Check fmt/clippy

Diff in /__w/cube/cube/rust/cubesqlplanner/cubesqlplanner/src/planner/base_member.rs
}

pub fn extract_symbols_from_members(members: &Vec<Rc<dyn BaseMember>>) -> Vec<Rc<MemberSymbol>> {
members.iter().map(|m| m.member_evaluator()).collect_vec()
}

pub fn default_alias(
cube_name: &String,
member_name: &String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<IT: InnerTypes> BaseQuery<IT> {
pub fn build_sql_and_params(&self) -> Result<NativeObjectHandle<IT>, CubeError> {
let templates = PlanSqlTemplates::new(self.query_tools.templates_render());
let query_planner = QueryPlanner::new(self.request.clone(), self.query_tools.clone());
let plan = query_planner.build_sql()?;
let plan = query_planner.plan()?;

let sql = plan.to_sql(&templates)?;
let (result_sql, params) = self.query_tools.build_sql_and_params(&sql, true)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ impl BaseFilter {
FilterOperator::Equal => self.equals_where(&member_sql)?,
FilterOperator::NotEqual => self.not_equals_where(&member_sql)?,
FilterOperator::InDateRange => self.in_date_range(&member_sql)?,
FilterOperator::InDateRangeExtended => self.in_date_range_extended(&member_sql)?,
FilterOperator::RegularRollingWindowDateRange => {
self.regular_rolling_window_date_range(&member_sql)?
}
FilterOperator::ToDateRollingWindowDateRange => {
self.to_date_rolling_window_date_range(&member_sql)?
}
FilterOperator::In => self.in_where(&member_sql)?,
FilterOperator::NotIn => self.not_in_where(&member_sql)?,
FilterOperator::Set => self.set_where(&member_sql)?,
Expand Down Expand Up @@ -211,7 +216,7 @@ impl BaseFilter {
}

fn in_date_range(&self, member_sql: &str) -> Result<String, CubeError> {
let (from, to) = self.allocate_date_params()?;
let (from, to) = self.allocate_date_params(true)?;
self.templates
.time_range_filter(member_sql.to_string(), from, to)
}
Expand All @@ -236,8 +241,9 @@ impl BaseFilter {
Ok(date.to_string())
}
}
fn in_date_range_extended(&self, member_sql: &str) -> Result<String, CubeError> {
let (from, to) = self.allocate_date_params()?;

fn regular_rolling_window_date_range(&self, member_sql: &str) -> Result<String, CubeError> {
let (from, to) = self.allocate_date_params(false)?;

let from = if self.values.len() >= 3 {
self.extend_date_range_bound(from, &self.values[2], true)?
Expand All @@ -251,8 +257,37 @@ impl BaseFilter {
to
};

self.templates
.time_range_filter(member_sql.to_string(), from, to)
let date_field = self
.query_tools
.base_tools()
.convert_tz(member_sql.to_string())?;
self.templates.time_range_filter(date_field, from, to)
}

fn to_date_rolling_window_date_range(&self, member_sql: &str) -> Result<String, CubeError> {
let (from, to) = self.allocate_date_params(false)?;

let from = if self.values.len() >= 3 {
if let Some(granularity) = &self.values[2] {
self.query_tools
.base_tools()
.time_grouped_column(granularity.clone(), from)?
} else {
return Err(CubeError::user(format!(
"Granularity required for to_date rolling window"
)));
}
} else {
return Err(CubeError::user(format!(
"Granularity required for to_date rolling window"
)));
};

let date_field = self
.query_tools
.base_tools()
.convert_tz(member_sql.to_string())?;
self.templates.time_range_filter(date_field, from, to)
}

fn in_where(&self, member_sql: &str) -> Result<String, CubeError> {
Expand Down Expand Up @@ -353,22 +388,30 @@ impl BaseFilter {
))
}

fn allocate_date_params(&self) -> Result<(String, String), CubeError> {
fn allocate_date_params(&self, use_db_time_zone: bool) -> Result<(String, String), CubeError> {
if self.values.len() >= 2 {
let from = if let Some(from_str) = &self.values[0] {
self.query_tools
.base_tools()
.in_db_time_zone(self.format_from_date(&from_str)?)?
let from = self.format_from_date(&from_str)?;

if use_db_time_zone {
self.query_tools.base_tools().in_db_time_zone(from)?
} else {
from
}
} else {
return Err(CubeError::user(format!(
"Arguments for date range is not valid"
)));
};

let to = if let Some(to_str) = &self.values[1] {
self.query_tools
.base_tools()
.in_db_time_zone(self.format_to_date(&to_str)?)?
let to = self.format_to_date(&to_str)?;

if use_db_time_zone {
self.query_tools.base_tools().in_db_time_zone(to)?
} else {
to
}
} else {
return Err(CubeError::user(format!(
"Arguments for date range is not valid"
Expand Down Expand Up @@ -411,11 +454,7 @@ impl BaseFilter {
"0".repeat(precision as usize)
));
}
//FIXME chrono don't support parsing date without specified format
Err(CubeError::user(format!(
"Unsupported date format: {}",
date
)))
Ok(date.to_string())
}

fn format_to_date(&self, date: &str) -> Result<String, CubeError> {
Expand Down Expand Up @@ -447,11 +486,8 @@ impl BaseFilter {
"9".repeat(precision as usize)
));
}
//FIXME chrono don't support parsing date without specified format
Err(CubeError::user(format!(
"Unsupported date format: {}",
date
)))

Ok(date.to_string())
}

fn allocate_param(&self, param: &str) -> String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ pub enum FilterOperator {
Equal,
NotEqual,
InDateRange,
InDateRangeExtended,
RegularRollingWindowDateRange,
ToDateRollingWindowDateRange,
In,
NotIn,
Set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl DimensionSubqueryPlanner {
false,
)?;
let query_planner = QueryPlanner::new(sub_query_properties, self.query_tools.clone());
let sub_query = query_planner.build_sql()?;
let sub_query = query_planner.plan()?;
let sub_query_alias = format!("{cube_name}_{dim_name}_subquery");

let conditions = primary_keys_dimensions
Expand Down
Loading

0 comments on commit df31e89

Please sign in to comment.