From fa59a46f1536dc60e7a803117102cd00e9c12a0e Mon Sep 17 00:00:00 2001 From: Wendell Smith Date: Wed, 24 Jul 2024 10:58:01 -0400 Subject: [PATCH 1/3] Add support for USING to SQL unparser --- datafusion/sql/src/unparser/plan.rs | 89 ++++++++++++++--------- datafusion/sql/tests/cases/plan_to_sql.rs | 2 + 2 files changed, 58 insertions(+), 33 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 59660f4f0404..00c3bab236cf 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::{internal_err, not_impl_err, plan_err, DataFusionError, Result}; +use datafusion_common::{internal_err, not_impl_err, plan_err, Column, DataFusionError, Result}; use datafusion_expr::{ expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, Projection, }; @@ -368,37 +368,7 @@ impl Unparser<'_> { self.select_to_sql_recursively(input, query, select, relation) } LogicalPlan::Join(join) => { - match join.join_constraint { - JoinConstraint::On => {} - JoinConstraint::Using => { - return not_impl_err!( - "Unsupported join constraint: {:?}", - join.join_constraint - ) - } - } - - // parse filter if exists - let join_filter = match &join.filter { - Some(filter) => Some(self.expr_to_sql(filter)?), - None => None, - }; - - // map join.on to `l.a = r.a AND l.b = r.b AND ...` - let eq_op = ast::BinaryOperator::Eq; - let join_on = self.join_conditions_to_sql(&join.on, eq_op)?; - - // Merge `join_on` and `join_filter` - let join_expr = match (join_filter, join_on) { - (Some(filter), Some(on)) => Some(self.and_op_to_sql(filter, on)), - (Some(filter), None) => Some(filter), - (None, Some(on)) => Some(on), - (None, None) => None, - }; - let join_constraint = match join_expr { - Some(expr) => ast::JoinConstraint::On(expr), - None => ast::JoinConstraint::None, - }; + let join_constraint = self.join_constraint_to_sql(join.join_constraint, &join.on, join.filter.as_ref())?; let mut right_relation = RelationBuilder::default(); @@ -582,9 +552,62 @@ impl Unparser<'_> { } } + /// Convert the components of a USING clause to the USING AST + fn join_using_to_sql(&self, join_conditions: &[(Expr, Expr)]) -> Result { + let mut idents = Vec::with_capacity(join_conditions.len()); + for (left, right) in join_conditions { + match (left, right) { + (Expr::Column(Column { relation: _, name: left_name }), Expr::Column(Column { relation: _, name: right_name })) => { + if left_name != right_name { + // USING is only valid when the column names are the + // same, so they should never be different + return not_impl_err!("Unsupported USING with different column names"); + } + idents.push(self.new_ident_quoted_if_needs(left_name.to_string())); + }, + // USING is only valid with column names; arbitrary expressions + // are not allowed + _ => return not_impl_err!("Unsupported USING with non-column expressions"), + } + } + Ok(ast::JoinConstraint::Using(idents)) + } + + /// Convert a join constraint and associated conditions and filter to a SQL AST node + fn join_constraint_to_sql(&self, constraint: JoinConstraint, conditions: &[(Expr, Expr)], filter: Option<&Expr>) -> Result { + match (constraint, conditions, filter) { + // No constraints + (JoinConstraint::On, [], None) => Ok(ast::JoinConstraint::None), + // Only equi-join conditions + (JoinConstraint::On, conditions, None) => { + let expr = self.join_conditions_to_sql(conditions, ast::BinaryOperator::Eq)?; + match expr { + Some(expr) => Ok(ast::JoinConstraint::On(expr)), + None => Ok(ast::JoinConstraint::None), + } + }, + // More complex filter with non-equi-join conditions; so we combine + // all conditions into a single AST Expr + (JoinConstraint::On, conditions, Some(filter)) => { + let filter_expr = self.expr_to_sql(filter)?; + let expr = self.join_conditions_to_sql(conditions, ast::BinaryOperator::Eq)?; + match expr { + Some(expr) => { + let join_expr = self.and_op_to_sql(filter_expr, expr); + Ok(ast::JoinConstraint::On(join_expr)) + }, + None => Ok(ast::JoinConstraint::On(filter_expr)), + } + }, + + (JoinConstraint::Using, conditions, None) => self.join_using_to_sql(conditions), + (JoinConstraint::Using, _, Some(_)) => not_impl_err!("Unsupported USING with filter"), + } + } + fn join_conditions_to_sql( &self, - join_conditions: &Vec<(Expr, Expr)>, + join_conditions: &[(Expr, Expr)], eq_op: ast::BinaryOperator, ) -> Result> { // Only support AND conjunction for each binary expression in join conditions diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index aada560fd884..a52333e54fac 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -84,6 +84,7 @@ fn roundtrip_statement() -> Result<()> { "select 1;", "select 1 limit 0;", "select ta.j1_id from j1 ta join (select 1 as j1_id) tb on ta.j1_id = tb.j1_id;", + "select ta.j1_id from j1 ta join (select 1 as j1_id) tb using (j1_id);", "select ta.j1_id from j1 ta join (select 1 as j1_id) tb on ta.j1_id = tb.j1_id where ta.j1_id > 1;", "select ta.j1_id from (select 1 as j1_id) ta;", "select ta.j1_id from j1 ta;", @@ -142,6 +143,7 @@ fn roundtrip_statement() -> Result<()> { r#"SELECT id, count(distinct id) over (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), sum(id) OVER (PARTITION BY first_name ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from person"#, "SELECT id, sum(id) OVER (PARTITION BY first_name ROWS BETWEEN 5 PRECEDING AND 2 FOLLOWING) from person", + "WITH t1 AS (SELECT j1_id AS id, j1_string name FROM j1), t2 AS (SELECT j2_id AS id, j2_string name FROM j2) SELECT * FROM t1 JOIN t2 USING (id, name)", ]; // For each test sql string, we transform as follows: From 11324895d04034e7f74e492a1fefc2f19f9a5e39 Mon Sep 17 00:00:00 2001 From: Wendell Smith Date: Wed, 24 Jul 2024 11:53:48 -0400 Subject: [PATCH 2/3] cargo fmt --- datafusion/sql/src/unparser/plan.rs | 63 ++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 00c3bab236cf..274be4353bfb 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::{internal_err, not_impl_err, plan_err, Column, DataFusionError, Result}; +use datafusion_common::{ + internal_err, not_impl_err, plan_err, Column, DataFusionError, Result, +}; use datafusion_expr::{ expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, Projection, }; @@ -368,7 +370,11 @@ impl Unparser<'_> { self.select_to_sql_recursively(input, query, select, relation) } LogicalPlan::Join(join) => { - let join_constraint = self.join_constraint_to_sql(join.join_constraint, &join.on, join.filter.as_ref())?; + let join_constraint = self.join_constraint_to_sql( + join.join_constraint, + &join.on, + join.filter.as_ref(), + )?; let mut right_relation = RelationBuilder::default(); @@ -553,55 +559,82 @@ impl Unparser<'_> { } /// Convert the components of a USING clause to the USING AST - fn join_using_to_sql(&self, join_conditions: &[(Expr, Expr)]) -> Result { + fn join_using_to_sql( + &self, + join_conditions: &[(Expr, Expr)], + ) -> Result { let mut idents = Vec::with_capacity(join_conditions.len()); for (left, right) in join_conditions { match (left, right) { - (Expr::Column(Column { relation: _, name: left_name }), Expr::Column(Column { relation: _, name: right_name })) => { + ( + Expr::Column(Column { + relation: _, + name: left_name, + }), + Expr::Column(Column { + relation: _, + name: right_name, + }), + ) => { if left_name != right_name { // USING is only valid when the column names are the // same, so they should never be different - return not_impl_err!("Unsupported USING with different column names"); + return not_impl_err!( + "Unsupported USING with different column names" + ); } idents.push(self.new_ident_quoted_if_needs(left_name.to_string())); - }, + } // USING is only valid with column names; arbitrary expressions // are not allowed - _ => return not_impl_err!("Unsupported USING with non-column expressions"), + _ => { + return not_impl_err!("Unsupported USING with non-column expressions") + } } } Ok(ast::JoinConstraint::Using(idents)) } /// Convert a join constraint and associated conditions and filter to a SQL AST node - fn join_constraint_to_sql(&self, constraint: JoinConstraint, conditions: &[(Expr, Expr)], filter: Option<&Expr>) -> Result { + fn join_constraint_to_sql( + &self, + constraint: JoinConstraint, + conditions: &[(Expr, Expr)], + filter: Option<&Expr>, + ) -> Result { match (constraint, conditions, filter) { // No constraints (JoinConstraint::On, [], None) => Ok(ast::JoinConstraint::None), // Only equi-join conditions (JoinConstraint::On, conditions, None) => { - let expr = self.join_conditions_to_sql(conditions, ast::BinaryOperator::Eq)?; + let expr = + self.join_conditions_to_sql(conditions, ast::BinaryOperator::Eq)?; match expr { Some(expr) => Ok(ast::JoinConstraint::On(expr)), None => Ok(ast::JoinConstraint::None), } - }, + } // More complex filter with non-equi-join conditions; so we combine // all conditions into a single AST Expr (JoinConstraint::On, conditions, Some(filter)) => { let filter_expr = self.expr_to_sql(filter)?; - let expr = self.join_conditions_to_sql(conditions, ast::BinaryOperator::Eq)?; + let expr = + self.join_conditions_to_sql(conditions, ast::BinaryOperator::Eq)?; match expr { Some(expr) => { let join_expr = self.and_op_to_sql(filter_expr, expr); Ok(ast::JoinConstraint::On(join_expr)) - }, + } None => Ok(ast::JoinConstraint::On(filter_expr)), } - }, + } - (JoinConstraint::Using, conditions, None) => self.join_using_to_sql(conditions), - (JoinConstraint::Using, _, Some(_)) => not_impl_err!("Unsupported USING with filter"), + (JoinConstraint::Using, conditions, None) => { + self.join_using_to_sql(conditions) + } + (JoinConstraint::Using, _, Some(_)) => { + not_impl_err!("Unsupported USING with filter") + } } } From 8aef3700ef3f27deaaea69b847f935d34a2b3482 Mon Sep 17 00:00:00 2001 From: Wendell Smith Date: Wed, 24 Jul 2024 13:54:15 -0400 Subject: [PATCH 3/3] Downgrade USING to ON when necessary when unparsing When the conditions and filters in the LogicalPlan are not in a form compatible with USING, we can instead use ON - so we do. --- datafusion/sql/src/unparser/plan.rs | 110 ++++++++++++++-------------- 1 file changed, 57 insertions(+), 53 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 274be4353bfb..9c31db4bc883 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -558,11 +558,13 @@ impl Unparser<'_> { } } - /// Convert the components of a USING clause to the USING AST + /// Convert the components of a USING clause to the USING AST. Returns + /// 'None' if the conditions are not compatible with a USING expression, + /// e.g. non-column expressions or non-matching names. fn join_using_to_sql( &self, join_conditions: &[(Expr, Expr)], - ) -> Result { + ) -> Option { let mut idents = Vec::with_capacity(join_conditions.len()); for (left, right) in join_conditions { match (left, right) { @@ -575,24 +577,15 @@ impl Unparser<'_> { relation: _, name: right_name, }), - ) => { - if left_name != right_name { - // USING is only valid when the column names are the - // same, so they should never be different - return not_impl_err!( - "Unsupported USING with different column names" - ); - } + ) if left_name == right_name => { idents.push(self.new_ident_quoted_if_needs(left_name.to_string())); } - // USING is only valid with column names; arbitrary expressions + // USING is only valid with matching column names; arbitrary expressions // are not allowed - _ => { - return not_impl_err!("Unsupported USING with non-column expressions") - } + _ => return None, } } - Ok(ast::JoinConstraint::Using(idents)) + Some(ast::JoinConstraint::Using(idents)) } /// Convert a join constraint and associated conditions and filter to a SQL AST node @@ -604,58 +597,69 @@ impl Unparser<'_> { ) -> Result { match (constraint, conditions, filter) { // No constraints - (JoinConstraint::On, [], None) => Ok(ast::JoinConstraint::None), - // Only equi-join conditions - (JoinConstraint::On, conditions, None) => { - let expr = - self.join_conditions_to_sql(conditions, ast::BinaryOperator::Eq)?; - match expr { - Some(expr) => Ok(ast::JoinConstraint::On(expr)), - None => Ok(ast::JoinConstraint::None), - } - } - // More complex filter with non-equi-join conditions; so we combine - // all conditions into a single AST Expr - (JoinConstraint::On, conditions, Some(filter)) => { - let filter_expr = self.expr_to_sql(filter)?; - let expr = - self.join_conditions_to_sql(conditions, ast::BinaryOperator::Eq)?; - match expr { - Some(expr) => { - let join_expr = self.and_op_to_sql(filter_expr, expr); - Ok(ast::JoinConstraint::On(join_expr)) - } - None => Ok(ast::JoinConstraint::On(filter_expr)), - } + (JoinConstraint::On | JoinConstraint::Using, [], None) => { + Ok(ast::JoinConstraint::None) } (JoinConstraint::Using, conditions, None) => { - self.join_using_to_sql(conditions) + match self.join_using_to_sql(conditions) { + Some(using) => Ok(using), + // As above, this should not be reachable from parsed SQL, + // but a user could create this; we "downgrade" to ON. + None => self.join_conditions_to_sql_on(conditions, None), + } } - (JoinConstraint::Using, _, Some(_)) => { - not_impl_err!("Unsupported USING with filter") + + // Two cases here: + // 1. Straightforward ON case, with possible equi-join conditions + // and additional filters + // 2. USING with additional filters; we "downgrade" to ON, because + // you can't use USING with arbitrary filters. (This should not + // be accessible from parsed SQL, but may have been a + // custom-built JOIN by a user.) + (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => { + self.join_conditions_to_sql_on(conditions, filter) } } } - fn join_conditions_to_sql( + // Convert a list of equi0join conditions and an optional filter to a SQL ON + // AST node, with the equi-join conditions and the filter merged into a + // single conditional expression + fn join_conditions_to_sql_on( &self, join_conditions: &[(Expr, Expr)], - eq_op: ast::BinaryOperator, - ) -> Result> { - // Only support AND conjunction for each binary expression in join conditions - let mut exprs: Vec = vec![]; + filter: Option<&Expr>, + ) -> Result { + let mut condition = None; + // AND the join conditions together to create the overall condition for (left, right) in join_conditions { - // Parse left + // Parse left and right let l = self.expr_to_sql(left)?; - // Parse right let r = self.expr_to_sql(right)?; - // AND with existing expression - exprs.push(self.binary_op_to_sql(l, r, eq_op.clone())); + let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq); + condition = match condition { + Some(expr) => Some(self.and_op_to_sql(expr, e)), + None => Some(e), + }; } - let join_expr: Option = - exprs.into_iter().reduce(|r, l| self.and_op_to_sql(r, l)); - Ok(join_expr) + + // Then AND the non-equijoin filter condition as well + condition = match (condition, filter) { + (Some(expr), Some(filter)) => { + Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?)) + } + (Some(expr), None) => Some(expr), + (None, Some(filter)) => Some(self.expr_to_sql(filter)?), + (None, None) => None, + }; + + let constraint = match condition { + Some(filter) => ast::JoinConstraint::On(filter), + None => ast::JoinConstraint::None, + }; + + Ok(constraint) } fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {