diff --git a/Cargo.toml b/Cargo.toml index 1bd9381..744de0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,6 @@ members = [ "examples", "sources/sql", "sources/flight-sql", - "sql-writer", ] [patch.crates-io] @@ -22,5 +21,5 @@ readme = "README.md" [workspace.dependencies] async-trait = "0.1.77" -datafusion = "34.0.0" -datafusion-substrait = "34.0.0" +datafusion = "37.0.0" +datafusion-substrait = "37.0.0" diff --git a/datafusion-federation/src/analyzer.rs b/datafusion-federation/src/analyzer.rs index 322d2da..7da905a 100644 --- a/datafusion-federation/src/analyzer.rs +++ b/datafusion-federation/src/analyzer.rs @@ -113,7 +113,7 @@ impl FederationAnalyzerRule { }) .collect::>>()?; - let new_plan = plan.with_new_inputs(&new_inputs)?; + let new_plan = plan.with_new_exprs(plan.expressions(), new_inputs)?; Ok((Some(new_plan), None)) } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 87d7c8e..13eb599 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -6,18 +6,18 @@ license.workspace = true readme.workspace = true [dev-dependencies] -arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] } +arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] } tokio = "1.35.1" async-trait.workspace = true datafusion.workspace = true datafusion-federation.path = "../datafusion-federation" datafusion-federation-sql.path = "../sources/sql" datafusion-federation-flight-sql.path = "../sources/flight-sql" -connectorx = { git = "https://github.com/sfu-db/connector-x.git", rev = "fa0fc7bc", features = [ +connectorx = { git = "https://github.com/devinjdangelo/connector-x.git", features = [ "dst_arrow", - "src_sqlite", + "src_sqlite" ] } -tonic = "0.10.2" +tonic = "0.11.0" [dependencies] async-std = "1.12.0" diff --git a/examples/examples/sqlite-partial.rs b/examples/examples/sqlite-partial.rs index 660abc7..780462b 100644 --- a/examples/examples/sqlite-partial.rs +++ b/examples/examples/sqlite-partial.rs @@ -92,13 +92,13 @@ impl SchemaProvider for MultiSchemaProvider { self.children.iter().flat_map(|p| p.table_names()).collect() } - async fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> Result>> { for child in &self.children { - if let Some(table) = child.table(name).await { - return Some(table); + if let Ok(Some(table)) = child.table(name).await { + return Ok(Some(table)); } } - None + Ok(None) } fn table_exist(&self, name: &str) -> bool { diff --git a/sources/flight-sql/Cargo.toml b/sources/flight-sql/Cargo.toml index 8ab3d00..f778dfc 100644 --- a/sources/flight-sql/Cargo.toml +++ b/sources/flight-sql/Cargo.toml @@ -16,8 +16,8 @@ datafusion-substrait.workspace = true datafusion-federation.path = "../../datafusion-federation" datafusion-federation-sql.path = "../sql" futures = "0.3.30" -tonic = {version="0.10.2", features=["tls"] } +tonic = {version="0.11.0", features=["tls"] } prost = "0.12.3" -arrow = "49.0.0" -arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] } +arrow = "51.0.0" +arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] } log = "0.4.20" diff --git a/sources/sql/Cargo.toml b/sources/sql/Cargo.toml index 8f966c8..bc89d95 100644 --- a/sources/sql/Cargo.toml +++ b/sources/sql/Cargo.toml @@ -13,12 +13,12 @@ path = "src/lib.rs" async-trait.workspace = true # connectorx = { version = "0.3.2", features = ["src_sqlite"] } # https://github.com/sfu-db/connector-x/pull/555 -connectorx = { git = "https://github.com/sfu-db/connector-x.git", rev = "fa0fc7bc", features = [ +connectorx = { git = "https://github.com/devinjdangelo/connector-x.git", features = [ "dst_arrow", + "src_sqlite" ] } datafusion.workspace = true datafusion-federation.path = "../../datafusion-federation" -datafusion-sql-writer.path = "../../sql-writer" # derive_builder = "0.13.0" futures = "0.3.30" tokio = "1.35.1" diff --git a/sources/sql/src/connectorx/executor.rs b/sources/sql/src/connectorx/executor.rs index a66dfac..fc5ea3d 100644 --- a/sources/sql/src/connectorx/executor.rs +++ b/sources/sql/src/connectorx/executor.rs @@ -53,7 +53,7 @@ impl SQLExecutor for CXExecutor { fn compute_context(&self) -> Option { Some(self.context.clone()) } - fn execute(&self, sql: &str, _schema: SchemaRef) -> Result { + fn execute(&self, sql: &str, schema: SchemaRef) -> Result { let conn = self.conn.clone(); let query: CXQuery = sql.into(); @@ -69,8 +69,6 @@ impl SQLExecutor for CXExecutor { )))); }; - let schema = schema_to_lowercase(dst.arrow_schema()); - Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } diff --git a/sources/sql/src/lib.rs b/sources/sql/src/lib.rs index 1d997e5..e0410f5 100644 --- a/sources/sql/src/lib.rs +++ b/sources/sql/src/lib.rs @@ -9,13 +9,16 @@ use datafusion::{ execution::{context::SessionState, TaskContext}, logical_expr::{Extension, LogicalPlan}, optimizer::analyzer::{Analyzer, AnalyzerRule}, - physical_expr::PhysicalSortExpr, - physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream}, + physical_expr::EquivalenceProperties, + physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, + }, + sql::unparser::plan_to_sql, }; use datafusion_federation::{FederatedPlanNode, FederationPlanner, FederationProvider}; mod schema; -use datafusion_sql_writer::from_df_plan; pub use schema::*; pub mod connectorx; @@ -112,11 +115,22 @@ impl FederationPlanner for SQLFederationPlanner { struct VirtualExecutionPlan { plan: LogicalPlan, executor: Arc, + props: PlanProperties, } impl VirtualExecutionPlan { pub fn new(plan: LogicalPlan, executor: Arc) -> Self { - Self { plan, executor } + let schema: Schema = plan.schema().as_ref().into(); + let props = PlanProperties::new( + EquivalenceProperties::new(Arc::new(schema)), + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ); + Self { + plan, + executor, + props, + } } fn schema(&self) -> SchemaRef { @@ -140,14 +154,6 @@ impl ExecutionPlan for VirtualExecutionPlan { self.schema() } - fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { - datafusion::physical_plan::Partitioning::UnknownPartitioning(1) - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - fn children(&self) -> Vec> { vec![] } @@ -164,9 +170,13 @@ impl ExecutionPlan for VirtualExecutionPlan { _partition: usize, _context: Arc, ) -> Result { - let ast = from_df_plan(&self.plan, self.executor.dialect())?; + let ast = plan_to_sql(&self.plan)?; let query = format!("{ast}"); self.executor.execute(query.as_str(), self.schema()) } + + fn properties(&self) -> &PlanProperties { + &self.props + } } diff --git a/sources/sql/src/schema.rs b/sources/sql/src/schema.rs index 4d69ebe..c780f23 100644 --- a/sources/sql/src/schema.rs +++ b/sources/sql/src/schema.rs @@ -54,16 +54,16 @@ impl SchemaProvider for SQLSchemaProvider { self.tables.iter().map(|s| s.table_name.clone()).collect() } - async fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> Result>> { if let Some(source) = self .tables .iter() .find(|s| s.table_name.eq_ignore_ascii_case(name)) { let adaptor = FederatedTableProviderAdaptor::new(source.clone()); - return Some(Arc::new(adaptor)); + return Ok(Some(Arc::new(adaptor))); } - None + Ok(None) } fn table_exist(&self, name: &str) -> bool { @@ -93,13 +93,13 @@ impl SchemaProvider for MultiSchemaProvider { self.children.iter().flat_map(|p| p.table_names()).collect() } - async fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> Result>> { for child in &self.children { - if let Some(table) = child.table(name).await { - return Some(table); + if let Ok(Some(table)) = child.table(name).await { + return Ok(Some(table)); } } - None + Ok(None) } fn table_exist(&self, name: &str) -> bool { diff --git a/sql-writer/Cargo.toml b/sql-writer/Cargo.toml deleted file mode 100644 index 6b716b3..0000000 --- a/sql-writer/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "datafusion-sql-writer" -version.workspace = true -edition.workspace = true -license.workspace = true -readme.workspace = true - -[lib] -name = "datafusion_sql_writer" -path = "src/lib.rs" - -[dependencies] -datafusion.workspace = true -# derive_builder = "0.13.0" - -[dev-dependencies] -tokio = "1.35.1" diff --git a/sql-writer/examples/expr.rs b/sql-writer/examples/expr.rs deleted file mode 100644 index aa7d250..0000000 --- a/sql-writer/examples/expr.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::sync::Arc; - -use datafusion::{ - common::Column, - logical_expr::{BinaryExpr, Operator}, - prelude::Expr, - sql::{sqlparser::dialect::GenericDialect, TableReference}, -}; -use datafusion_sql_writer::from_df_expr; - -fn main() -> Result<(), Box> { - // Example expression - let expr = Expr::BinaryExpr(BinaryExpr { - left: Box::new(Expr::Column(Column { - relation: Some(TableReference::bare("table_a")), - name: "id".to_string(), - })), - op: Operator::Gt, - right: Box::new(Expr::Column(Column { - relation: Some(TableReference::bare("table_b")), - name: "b".to_string(), - })), - }); - - // datafusion::Expr -> sqlparser::ast - let dialect = Arc::new(GenericDialect {}); - let ast = from_df_expr(&expr, dialect)?; - - // Get SQL string by formatting the AST - let sql = format!("{}", ast); - - println!("{sql}"); - - Ok(()) -} diff --git a/sql-writer/examples/plan.rs b/sql-writer/examples/plan.rs deleted file mode 100644 index 3f9a5c8..0000000 --- a/sql-writer/examples/plan.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::sync::Arc; - -use datafusion::{ - execution::context::SessionContext, sql::sqlparser::dialect::GenericDialect, - test_util::TestTableFactory, -}; -use datafusion_sql_writer::from_df_plan; - -#[tokio::main] -async fn main() -> Result<(), Box> { - // Example query - let query = "select ta.id, tb.value from table_a ta join table_b tb on ta.id = tb.id;"; - - // Create the DataFusion plan - let dialect = Arc::new(GenericDialect {}); - let ctx = mock_ctx().await; - let plan = ctx.sql(query).await.unwrap().into_unoptimized_plan(); - - // datafusion::LogicalPlan -> sqlparser::ast - let ast = from_df_plan(&plan, dialect)?; - - // Get SQL string by formatting the AST - let sql = format!("{}", ast); - - println!("{sql}"); - Ok(()) -} - -async fn mock_ctx() -> SessionContext { - let mut state = SessionContext::new().state(); - state - .table_factories_mut() - .insert("MOCKTABLE".to_string(), Arc::new(TestTableFactory {})); - let ctx = SessionContext::new_with_state(state); - - ctx.sql("CREATE EXTERNAL TABLE table_a (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap(); - ctx.sql("CREATE EXTERNAL TABLE table_b (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap(); - ctx.sql("CREATE EXTERNAL TABLE table_c (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap(); - - ctx -} diff --git a/sql-writer/src/ast_builder.rs b/sql-writer/src/ast_builder.rs deleted file mode 100644 index 3cf070f..0000000 --- a/sql-writer/src/ast_builder.rs +++ /dev/null @@ -1,587 +0,0 @@ -use core::fmt; - -use datafusion::sql::sqlparser::ast; - -#[derive(Clone)] -#[doc = "Builder for [`Query`](struct.Query.html).\n"] -pub struct QueryBuilder { - with: Option, - body: Option>, - order_by: Vec, - limit: Option, - limit_by: Vec, - offset: Option, - fetch: Option, - locks: Vec, - for_clause: Option, -} - -#[allow(dead_code)] -impl QueryBuilder { - #[allow(unused_mut)] - pub fn with(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.with = value; - new - } - #[allow(unused_mut)] - pub fn body(&mut self, value: Box) -> &mut Self { - let mut new = self; - new.body = Option::Some(value); - new - } - #[allow(unused_mut)] - pub fn order_by(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.order_by = value; - new - } - #[allow(unused_mut)] - pub fn limit(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.limit = value; - new - } - #[allow(unused_mut)] - pub fn limit_by(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.limit_by = value; - new - } - #[allow(unused_mut)] - pub fn offset(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.offset = value; - new - } - #[allow(unused_mut)] - pub fn fetch(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.fetch = value; - new - } - #[allow(unused_mut)] - pub fn locks(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.locks = value; - new - } - #[allow(unused_mut)] - pub fn for_clause(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.for_clause = value; - new - } - #[doc = "Builds a new `Query`.\n\n# Errors\n\nIf a required field has not been initialized.\n"] - pub fn build(&self) -> Result { - Ok(ast::Query { - with: self.with.clone(), - body: match self.body { - Some(ref value) => value.clone(), - None => return Result::Err(Into::into(UninitializedFieldError::from("body"))), - }, - order_by: self.order_by.clone(), - limit: self.limit.clone(), - limit_by: self.limit_by.clone(), - offset: self.offset.clone(), - fetch: self.fetch.clone(), - locks: self.locks.clone(), - for_clause: self.for_clause.clone(), - }) - } - #[doc = r" Create an empty builder, with all fields set to `None` or `PhantomData`."] - fn create_empty() -> Self { - Self { - with: Default::default(), - body: Default::default(), - order_by: Default::default(), - limit: Default::default(), - limit_by: Default::default(), - offset: Default::default(), - fetch: Default::default(), - locks: Default::default(), - for_clause: Default::default(), - } - } -} -impl Default for QueryBuilder { - fn default() -> Self { - Self::create_empty() - } -} - -#[derive(Clone)] -#[doc = "Builder for [`Select`](struct.Select.html).\n"] -pub struct SelectBuilder { - distinct: Option, - top: Option, - projection: Vec, - into: Option, - from: Vec, - lateral_views: Vec, - selection: Option, - group_by: Option, - cluster_by: Vec, - distribute_by: Vec, - sort_by: Vec, - having: Option, - named_window: Vec, - qualify: Option, -} -#[allow(dead_code)] -impl SelectBuilder { - #[allow(unused_mut)] - pub fn distinct(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.distinct = value; - new - } - #[allow(unused_mut)] - pub fn top(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.top = value; - new - } - #[allow(unused_mut)] - pub fn projection(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.projection = value; - new - } - #[allow(unused_mut)] - pub fn into(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.into = value; - new - } - #[allow(unused_mut)] - pub fn from(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.from = value; - new - } - #[allow(unused_mut)] - pub fn push_from(&mut self, value: TableWithJoinsBuilder) -> &mut Self { - let mut new = self; - new.from.push(value); - new - } - #[allow(unused_mut)] - pub fn pop_from(&mut self) -> Option { - self.from.pop() - } - #[allow(unused_mut)] - pub fn lateral_views(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.lateral_views = value; - new - } - #[allow(unused_mut)] - pub fn selection(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.selection = value; - new - } - #[allow(unused_mut)] - pub fn group_by(&mut self, value: ast::GroupByExpr) -> &mut Self { - let mut new = self; - new.group_by = Option::Some(value); - new - } - #[allow(unused_mut)] - pub fn cluster_by(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.cluster_by = value; - new - } - #[allow(unused_mut)] - pub fn distribute_by(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.distribute_by = value; - new - } - #[allow(unused_mut)] - pub fn sort_by(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.sort_by = value; - new - } - #[allow(unused_mut)] - pub fn having(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.having = value; - new - } - #[allow(unused_mut)] - pub fn named_window(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.named_window = value; - new - } - #[allow(unused_mut)] - pub fn qualify(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.qualify = value; - new - } - #[doc = "Builds a new `Select`.\n\n# Errors\n\nIf a required field has not been initialized.\n"] - pub fn build(&self) -> Result { - Ok(ast::Select { - distinct: self.distinct.clone(), - top: self.top.clone(), - projection: self.projection.clone(), - into: self.into.clone(), - from: self - .from - .iter() - .map(|b| b.build()) - .collect::, BuilderError>>()?, - lateral_views: self.lateral_views.clone(), - selection: self.selection.clone(), - group_by: match self.group_by { - Some(ref value) => value.clone(), - None => return Result::Err(Into::into(UninitializedFieldError::from("group_by"))), - }, - cluster_by: self.cluster_by.clone(), - distribute_by: self.distribute_by.clone(), - sort_by: self.sort_by.clone(), - having: self.having.clone(), - named_window: self.named_window.clone(), - qualify: self.qualify.clone(), - }) - } - #[doc = r" Create an empty builder, with all fields set to `None` or `PhantomData`."] - fn create_empty() -> Self { - Self { - distinct: Default::default(), - top: Default::default(), - projection: Default::default(), - into: Default::default(), - from: Default::default(), - lateral_views: Default::default(), - selection: Default::default(), - group_by: Some(ast::GroupByExpr::Expressions(Vec::new())), - cluster_by: Default::default(), - distribute_by: Default::default(), - sort_by: Default::default(), - having: Default::default(), - named_window: Default::default(), - qualify: Default::default(), - } - } -} -impl Default for SelectBuilder { - fn default() -> Self { - Self::create_empty() - } -} - -#[derive(Clone)] -#[doc = "Builder for [`TableWithJoins`](struct.TableWithJoins.html).\n"] -pub struct TableWithJoinsBuilder { - relation: Option, - joins: Vec, -} -#[allow(dead_code)] -impl TableWithJoinsBuilder { - #[allow(unused_mut)] - pub fn relation(&mut self, value: RelationBuilder) -> &mut Self { - let mut new = self; - new.relation = Option::Some(value); - new - } - - #[allow(unused_mut)] - pub fn joins(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.joins = value; - new - } - #[allow(unused_mut)] - pub fn push_join(&mut self, value: ast::Join) -> &mut Self { - let mut new = self; - new.joins.push(value); - new - } - - #[doc = "Builds a new `TableWithJoins`.\n\n# Errors\n\nIf a required field has not been initialized.\n"] - pub fn build(&self) -> Result { - Ok(ast::TableWithJoins { - relation: match self.relation { - Some(ref value) => value.build()?, - None => return Result::Err(Into::into(UninitializedFieldError::from("relation"))), - }, - joins: self.joins.clone(), - }) - } - #[doc = r" Create an empty builder, with all fields set to `None` or `PhantomData`."] - fn create_empty() -> Self { - Self { - relation: Default::default(), - joins: Default::default(), - } - } -} -impl Default for TableWithJoinsBuilder { - fn default() -> Self { - Self::create_empty() - } -} - -#[derive(Clone)] -pub struct RelationBuilder { - relation: Option, -} - -#[derive(Clone)] -enum TableFactorBuilder { - Table(TableRelationBuilder), - Derived(DerivedRelationBuilder), -} - -#[allow(dead_code)] -impl RelationBuilder { - #[allow(unused_mut)] - pub fn has_relation(&self) -> bool { - self.relation.is_some() - } - #[allow(unused_mut)] - pub fn table(&mut self, value: TableRelationBuilder) -> &mut Self { - let mut new = self; - new.relation = Option::Some(TableFactorBuilder::Table(value)); - new - } - #[allow(unused_mut)] - pub fn derived(&mut self, value: DerivedRelationBuilder) -> &mut Self { - let mut new = self; - new.relation = Option::Some(TableFactorBuilder::Derived(value)); - new - } - #[allow(unused_mut)] - pub fn alias(&mut self, value: Option) -> &mut Self { - let mut new = self; - match new.relation { - Some(TableFactorBuilder::Table(ref mut rel_builder)) => { - rel_builder.alias = value; - } - Some(TableFactorBuilder::Derived(ref mut rel_builder)) => { - rel_builder.alias = value; - } - None => (), - } - new - } - pub fn build(&self) -> Result { - Ok(match self.relation { - Some(TableFactorBuilder::Table(ref value)) => value.build()?, - Some(TableFactorBuilder::Derived(ref value)) => value.build()?, - None => return Result::Err(Into::into(UninitializedFieldError::from("relation"))), - }) - } - #[doc = r" Create an empty builder, with all fields set to `None` or `PhantomData`."] - fn create_empty() -> Self { - Self { - relation: Default::default(), - } - } -} -impl Default for RelationBuilder { - fn default() -> Self { - Self::create_empty() - } -} - -#[derive(Clone)] -#[doc = "Builder for [`Table`](struct.Table.html).\n"] -pub struct TableRelationBuilder { - name: Option, - alias: Option, - args: Option>, - with_hints: Vec, - version: Option, - partitions: Vec, -} -#[allow(dead_code)] -impl TableRelationBuilder { - #[allow(unused_mut)] - pub fn name(&mut self, value: ast::ObjectName) -> &mut Self { - let mut new = self; - new.name = Option::Some(value); - new - } - #[allow(unused_mut)] - pub fn alias(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.alias = value; - new - } - #[allow(unused_mut)] - pub fn args(&mut self, value: Option>) -> &mut Self { - let mut new = self; - new.args = value; - new - } - #[allow(unused_mut)] - pub fn with_hints(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.with_hints = value; - new - } - #[allow(unused_mut)] - pub fn version(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.version = value; - new - } - #[allow(unused_mut)] - pub fn partitions(&mut self, value: Vec) -> &mut Self { - let mut new = self; - new.partitions = value; - new - } - #[doc = "Builds a new `Table`.\n\n# Errors\n\nIf a required field has not been initialized.\n"] - pub fn build(&self) -> Result { - Ok(ast::TableFactor::Table { - name: match self.name { - Some(ref value) => value.clone(), - None => return Result::Err(Into::into(UninitializedFieldError::from("name"))), - }, - alias: self.alias.clone(), - args: self.args.clone(), - with_hints: self.with_hints.clone(), - version: self.version.clone(), - partitions: self.partitions.clone(), - }) - } - #[doc = r" Create an empty builder, with all fields set to `None` or `PhantomData`."] - fn create_empty() -> Self { - Self { - name: Default::default(), - alias: Default::default(), - args: Default::default(), - with_hints: Default::default(), - version: Default::default(), - partitions: Default::default(), - } - } -} -impl Default for TableRelationBuilder { - fn default() -> Self { - Self::create_empty() - } -} -#[derive(Clone)] -#[doc = "Builder for [`DerivedRelation`](struct.DerivedRelation.html).\n"] -pub struct DerivedRelationBuilder { - lateral: Option, - subquery: Option>, - alias: Option, -} -#[allow(clippy::all)] -#[allow(dead_code)] -impl DerivedRelationBuilder { - #[allow(unused_mut)] - pub fn lateral(&mut self, value: bool) -> &mut Self { - let mut new = self; - new.lateral = Option::Some(value); - new - } - #[allow(unused_mut)] - pub fn subquery(&mut self, value: Box) -> &mut Self { - let mut new = self; - new.subquery = Option::Some(value); - new - } - #[allow(unused_mut)] - pub fn alias(&mut self, value: Option) -> &mut Self { - let mut new = self; - new.alias = value; - new - } - #[doc = "Builds a new `DerivedRelation`.\n\n# Errors\n\nIf a required field has not been initialized.\n"] - fn build(&self) -> Result { - Ok(ast::TableFactor::Derived { - lateral: match self.lateral { - Some(ref value) => value.clone(), - None => return Result::Err(Into::into(UninitializedFieldError::from("lateral"))), - }, - subquery: match self.subquery { - Some(ref value) => value.clone(), - None => return Result::Err(Into::into(UninitializedFieldError::from("subquery"))), - }, - alias: self.alias.clone(), - }) - } - #[doc = r" Create an empty builder, with all fields set to `None` or `PhantomData`."] - fn create_empty() -> Self { - Self { - lateral: Default::default(), - subquery: Default::default(), - alias: Default::default(), - } - } -} -impl Default for DerivedRelationBuilder { - fn default() -> Self { - Self::create_empty() - } -} - -/// Runtime error when a `build()` method is called and one or more required fields -/// do not have a value. -#[derive(Debug, Clone)] -pub struct UninitializedFieldError(&'static str); - -impl UninitializedFieldError { - /// Create a new `UnitializedFieldError` for the specified field name. - pub fn new(field_name: &'static str) -> Self { - UninitializedFieldError(field_name) - } - - /// Get the name of the first-declared field that wasn't initialized - pub fn field_name(&self) -> &'static str { - self.0 - } -} - -impl fmt::Display for UninitializedFieldError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Field not initialized: {}", self.0) - } -} - -impl From<&'static str> for UninitializedFieldError { - fn from(field_name: &'static str) -> Self { - Self::new(field_name) - } -} -impl std::error::Error for UninitializedFieldError {} - -#[doc = "Error type for Builder"] -#[derive(Debug)] -#[non_exhaustive] -pub enum BuilderError { - #[doc = r" Uninitialized field"] - UninitializedField(&'static str), - #[doc = r" Custom validation error"] - ValidationError(String), -} -impl From for BuilderError { - fn from(s: UninitializedFieldError) -> Self { - Self::UninitializedField(s.field_name()) - } -} -impl From for BuilderError { - fn from(s: String) -> Self { - Self::ValidationError(s) - } -} -impl fmt::Display for BuilderError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::UninitializedField(ref field) => write!(f, "`{}` must be initialized", field), - Self::ValidationError(ref error) => write!(f, "{}", error), - } - } -} -impl std::error::Error for BuilderError {} diff --git a/sql-writer/src/lib.rs b/sql-writer/src/lib.rs deleted file mode 100644 index 39b1d28..0000000 --- a/sql-writer/src/lib.rs +++ /dev/null @@ -1,655 +0,0 @@ -use std::sync::Arc; - -use datafusion::logical_expr::{JoinConstraint, JoinType, Like}; -use datafusion::sql::sqlparser::ast::{JoinOperator, OrderByExpr}; -use datafusion::{ - error::{DataFusionError, Result}, - scalar::ScalarValue, - sql::sqlparser::ast::{self, Expr as SQLExpr}, -}; - -use datafusion::common::{not_impl_err, DFSchema}; -use datafusion::common::{Column, DFSchemaRef}; -#[allow(unused_imports)] -use datafusion::logical_expr::aggregate_function; -use datafusion::logical_expr::expr::{ - Alias, BinaryExpr, Case, Cast, InList, ScalarFunction as DFScalarFunction, WindowFunction, -}; -use datafusion::logical_expr::{Between, LogicalPlan, Operator}; -use datafusion::prelude::Expr; -use datafusion::sql::sqlparser::dialect::{ - Dialect, GenericDialect, PostgreSqlDialect, SQLiteDialect, -}; - -mod ast_builder; -use crate::ast_builder::{ - BuilderError, QueryBuilder, RelationBuilder, SelectBuilder, TableRelationBuilder, - TableWithJoinsBuilder, -}; - -pub fn from_df_plan(plan: &LogicalPlan, dialect: Arc) -> Result { - query_to_sql(plan, dialect) -} - -pub fn from_df_expr(expr: &Expr, dialect: Arc) -> Result { - let schema = DFSchema::empty(); - expr_to_sql(expr, &Arc::new(schema), 0, dialect) -} - -fn query_to_sql(plan: &LogicalPlan, dialect: Arc) -> Result { - match plan { - LogicalPlan::Projection(_) - | LogicalPlan::Filter(_) - | LogicalPlan::Window(_) - | LogicalPlan::Aggregate(_) - | LogicalPlan::Sort(_) - | LogicalPlan::Join(_) - | LogicalPlan::CrossJoin(_) - | LogicalPlan::Repartition(_) - | LogicalPlan::Union(_) - | LogicalPlan::TableScan(_) - | LogicalPlan::EmptyRelation(_) - | LogicalPlan::Subquery(_) - | LogicalPlan::SubqueryAlias(_) - | LogicalPlan::Limit(_) - | LogicalPlan::Statement(_) - | LogicalPlan::Values(_) - | LogicalPlan::Distinct(_) => { - let mut query_builder = QueryBuilder::default(); - let mut select_builder = SelectBuilder::default(); - select_builder.push_from(TableWithJoinsBuilder::default()); - let mut relation_builder = RelationBuilder::default(); - select_to_sql( - plan, - &mut query_builder, - &mut select_builder, - &mut relation_builder, - dialect, - )?; - - let mut twj = select_builder.pop_from().unwrap(); - twj.relation(relation_builder); - select_builder.push_from(twj); - - let body = ast::SetExpr::Select(Box::new( - select_builder.build().map_err(builder_error_to_df)?, - )); - let query = query_builder - .body(Box::new(body)) - .build() - .map_err(builder_error_to_df)?; - - Ok(ast::Statement::Query(Box::new(query))) - } - LogicalPlan::Dml(_) => dml_to_sql(plan), - LogicalPlan::Explain(_) - | LogicalPlan::Analyze(_) - | LogicalPlan::Extension(_) - | LogicalPlan::Prepare(_) - | LogicalPlan::Ddl(_) - | LogicalPlan::Copy(_) - | LogicalPlan::DescribeTable(_) - | LogicalPlan::Unnest(_) => Err(DataFusionError::NotImplemented( - "Unsupported operator: {plan:?}".to_string(), - )), - } -} - -fn select_to_sql( - plan: &LogicalPlan, - query: &mut QueryBuilder, - select: &mut SelectBuilder, - relation: &mut RelationBuilder, - dialect: Arc, -) -> Result<()> { - match plan { - LogicalPlan::TableScan(scan) => { - let mut builder = TableRelationBuilder::default(); - builder.name(ast::ObjectName(vec![new_ident( - scan.table_name.table().to_string(), - dialect, - )])); - relation.table(builder); - - Ok(()) - } - LogicalPlan::Projection(p) => { - let items = p - .expr - .iter() - .map(|e| select_item_to_sql(e, p.input.schema(), 0, dialect.clone()).unwrap()) - .collect::>(); - select.projection(items); - - select_to_sql(p.input.as_ref(), query, select, relation, dialect.clone()) - } - LogicalPlan::Filter(filter) => { - let filter_expr = - expr_to_sql(&filter.predicate, filter.input.schema(), 0, dialect.clone())?; - - select.selection(Some(filter_expr)); - - select_to_sql( - filter.input.as_ref(), - query, - select, - relation, - dialect.clone(), - ) - } - LogicalPlan::Limit(limit) => { - if let Some(fetch) = limit.fetch { - query.limit(Some(ast::Expr::Value(ast::Value::Number( - fetch.to_string(), - false, - )))); - } - - select_to_sql( - limit.input.as_ref(), - query, - select, - relation, - dialect.clone(), - ) - } - LogicalPlan::Sort(sort) => { - query.order_by(sort_to_sql( - sort.expr.clone(), - sort.input.schema(), - 0, - dialect.clone(), - )?); - - select_to_sql( - sort.input.as_ref(), - query, - select, - relation, - dialect.clone(), - ) - } - LogicalPlan::Aggregate(_agg) => { - not_impl_err!("Unsupported operator: {plan:?}") - } - LogicalPlan::Distinct(_distinct) => { - not_impl_err!("Unsupported operator: {plan:?}") - } - LogicalPlan::Join(join) => { - match join.join_constraint { - JoinConstraint::On => {} - JoinConstraint::Using => { - return not_impl_err!("Unsupported join constraint: {:?}", join.join_constraint) - } - } - - // parse filter if exists - let in_join_schema = join.left.schema().join(join.right.schema())?; - let join_filter = match &join.filter { - Some(filter) => Some(expr_to_sql( - filter, - &Arc::new(in_join_schema), - 0, - dialect.clone(), - )?), - None => None, - }; - - // map join.on to `l.a = r.a AND l.b = r.b AND ...` - let eq_op = ast::BinaryOperator::Eq; - let join_on = join_conditions_to_sql( - &join.on, - eq_op, - join.left.schema(), - join.right.schema(), - dialect.clone(), - )?; - - // Merge `join_on` and `join_filter` - let join_expr = match (join_filter, join_on) { - (Some(filter), Some(on)) => Some(and_op_to_sql(filter, on)), - (Some(filter), None) => Some(filter), - (None, Some(on)) => Some(on), - (None, None) => None, - }; - let join_constraint = match join_expr { - Some(expr) => ast::JoinConstraint::On(expr), - None => ast::JoinConstraint::None, - }; - - let mut right_relation = RelationBuilder::default(); - - select_to_sql(join.left.as_ref(), query, select, relation, dialect.clone())?; - select_to_sql( - join.right.as_ref(), - query, - select, - &mut right_relation, - dialect.clone(), - )?; - - let ast_join = ast::Join { - relation: right_relation.build().map_err(builder_error_to_df)?, - join_operator: join_operator_to_sql(join.join_type, join_constraint), - }; - let mut from = select.pop_from().unwrap(); - from.push_join(ast_join); - select.push_from(from); - - Ok(()) - } - LogicalPlan::SubqueryAlias(plan_alias) => { - // Handle bottom-up to allocate relation - select_to_sql( - plan_alias.input.as_ref(), - query, - select, - relation, - dialect.clone(), - )?; - - relation.alias(Some(new_table_alias( - plan_alias.alias.table().to_string(), - dialect.clone(), - ))); - - Ok(()) - } - LogicalPlan::Union(_union) => { - not_impl_err!("Unsupported operator: {plan:?}") - } - LogicalPlan::Window(_window) => { - not_impl_err!("Unsupported operator: {plan:?}") - } - LogicalPlan::Extension(_) => not_impl_err!("Unsupported operator: {plan:?}"), - _ => not_impl_err!("Unsupported operator: {plan:?}"), - } -} - -fn select_item_to_sql( - expr: &Expr, - schema: &DFSchemaRef, - col_ref_offset: usize, - dialect: Arc, -) -> Result { - match expr { - Expr::Alias(Alias { expr, name, .. }) => { - let inner = expr_to_sql(expr, schema, col_ref_offset, dialect.clone())?; - - Ok(ast::SelectItem::ExprWithAlias { - expr: inner, - alias: new_ident(name.to_string(), dialect.clone()), - }) - } - _ => { - let inner = expr_to_sql(expr, schema, col_ref_offset, dialect.clone())?; - - Ok(ast::SelectItem::UnnamedExpr(inner)) - } - } -} - -fn expr_to_sql( - expr: &Expr, - _schema: &DFSchemaRef, - _col_ref_offset: usize, - dialect: Arc, -) -> Result { - match expr { - Expr::InList(InList { - expr, - list: _, - negated: _, - }) => { - not_impl_err!("Unsupported expression: {expr:?}") - } - Expr::ScalarFunction(DFScalarFunction { .. }) => { - not_impl_err!("Unsupported expression: {expr:?}") - } - Expr::Between(Between { - expr, - negated: _, - low: _, - high: _, - }) => { - not_impl_err!("Unsupported expression: {expr:?}") - } - Expr::Column(col) => col_to_sql(col, dialect.clone()), - Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - let l = expr_to_sql(left.as_ref(), _schema, 0, dialect.clone())?; - let r = expr_to_sql(right.as_ref(), _schema, 0, dialect.clone())?; - let op = op_to_sql(op)?; - - Ok(binary_op_to_sql(l, r, op)) - } - Expr::Case(Case { - expr, - when_then_expr: _, - else_expr: _, - }) => { - not_impl_err!("Unsupported expression: {expr:?}") - } - Expr::Cast(Cast { expr, data_type: _ }) => { - not_impl_err!("Unsupported expression: {expr:?}") - } - Expr::Literal(value) => Ok(ast::Expr::Value(scalar_to_sql(value)?)), - Expr::Alias(Alias { expr, name: _, .. }) => { - expr_to_sql(expr, _schema, _col_ref_offset, dialect.clone()) - } - Expr::WindowFunction(WindowFunction { - fun: _, - args: _, - partition_by: _, - order_by: _, - window_frame: _, - }) => { - not_impl_err!("Unsupported expression: {expr:?}") - } - Expr::Like(Like { - negated: _, - expr, - pattern: _, - escape_char: _, - case_insensitive: _, - }) => { - not_impl_err!("Unsupported expression: {expr:?}") - } - _ => not_impl_err!("Unsupported expression: {expr:?}"), - } -} - -fn sort_to_sql( - sort_exprs: Vec, - _schema: &DFSchemaRef, - _col_ref_offset: usize, - dialect: Arc, -) -> Result> { - sort_exprs - .iter() - .map(|expr: &Expr| match expr { - Expr::Sort(sort_expr) => { - let col = expr_to_sql(&sort_expr.expr, _schema, _col_ref_offset, dialect.clone())?; - Ok(OrderByExpr { - asc: Some(sort_expr.asc), - expr: col, - nulls_first: Some(sort_expr.nulls_first), - }) - } - _ => Err(DataFusionError::Plan("Expecting Sort expr".to_string())), - }) - .collect::>>() -} - -fn op_to_sql(op: &Operator) -> Result { - match op { - Operator::Eq => Ok(ast::BinaryOperator::Eq), - Operator::NotEq => Ok(ast::BinaryOperator::NotEq), - Operator::Lt => Ok(ast::BinaryOperator::Lt), - Operator::LtEq => Ok(ast::BinaryOperator::LtEq), - Operator::Gt => Ok(ast::BinaryOperator::Gt), - Operator::GtEq => Ok(ast::BinaryOperator::GtEq), - Operator::Plus => Ok(ast::BinaryOperator::Plus), - Operator::Minus => Ok(ast::BinaryOperator::Minus), - Operator::Multiply => Ok(ast::BinaryOperator::Multiply), - Operator::Divide => Ok(ast::BinaryOperator::Divide), - Operator::Modulo => Ok(ast::BinaryOperator::Modulo), - Operator::And => Ok(ast::BinaryOperator::And), - Operator::Or => Ok(ast::BinaryOperator::Or), - Operator::IsDistinctFrom => not_impl_err!("unsupported operation: {op:?}"), - Operator::IsNotDistinctFrom => not_impl_err!("unsupported operation: {op:?}"), - Operator::RegexMatch => Ok(ast::BinaryOperator::PGRegexMatch), - Operator::RegexIMatch => Ok(ast::BinaryOperator::PGRegexIMatch), - Operator::RegexNotMatch => Ok(ast::BinaryOperator::PGRegexNotMatch), - Operator::RegexNotIMatch => Ok(ast::BinaryOperator::PGRegexNotIMatch), - Operator::BitwiseAnd => Ok(ast::BinaryOperator::BitwiseAnd), - Operator::BitwiseOr => Ok(ast::BinaryOperator::BitwiseOr), - Operator::BitwiseXor => Ok(ast::BinaryOperator::BitwiseXor), - Operator::BitwiseShiftRight => Ok(ast::BinaryOperator::PGBitwiseShiftRight), - Operator::BitwiseShiftLeft => Ok(ast::BinaryOperator::PGBitwiseShiftLeft), - Operator::StringConcat => Ok(ast::BinaryOperator::StringConcat), - Operator::AtArrow => not_impl_err!("unsupported operation: {op:?}"), - Operator::ArrowAt => not_impl_err!("unsupported operation: {op:?}"), - } -} - -fn scalar_to_sql(v: &ScalarValue) -> Result { - match v { - ScalarValue::Null => Ok(ast::Value::Null), - ScalarValue::Boolean(Some(b)) => Ok(ast::Value::Boolean(b.to_owned())), - ScalarValue::Boolean(None) => Ok(ast::Value::Null), - ScalarValue::Float32(Some(f)) => Ok(ast::Value::Number(f.to_string(), false)), - ScalarValue::Float32(None) => Ok(ast::Value::Null), - ScalarValue::Float64(Some(f)) => Ok(ast::Value::Number(f.to_string(), false)), - ScalarValue::Float64(None) => Ok(ast::Value::Null), - ScalarValue::Decimal128(Some(_), ..) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::Decimal128(None, ..) => Ok(ast::Value::Null), - ScalarValue::Decimal256(Some(_), ..) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::Decimal256(None, ..) => Ok(ast::Value::Null), - ScalarValue::Int8(Some(i)) => Ok(ast::Value::Number(i.to_string(), false)), - ScalarValue::Int8(None) => Ok(ast::Value::Null), - ScalarValue::Int16(Some(i)) => Ok(ast::Value::Number(i.to_string(), false)), - ScalarValue::Int16(None) => Ok(ast::Value::Null), - ScalarValue::Int32(Some(i)) => Ok(ast::Value::Number(i.to_string(), false)), - ScalarValue::Int32(None) => Ok(ast::Value::Null), - ScalarValue::Int64(Some(i)) => Ok(ast::Value::Number(i.to_string(), false)), - ScalarValue::Int64(None) => Ok(ast::Value::Null), - ScalarValue::UInt8(Some(ui)) => Ok(ast::Value::Number(ui.to_string(), false)), - ScalarValue::UInt8(None) => Ok(ast::Value::Null), - ScalarValue::UInt16(Some(ui)) => Ok(ast::Value::Number(ui.to_string(), false)), - ScalarValue::UInt16(None) => Ok(ast::Value::Null), - ScalarValue::UInt32(Some(ui)) => Ok(ast::Value::Number(ui.to_string(), false)), - ScalarValue::UInt32(None) => Ok(ast::Value::Null), - ScalarValue::UInt64(Some(ui)) => Ok(ast::Value::Number(ui.to_string(), false)), - ScalarValue::UInt64(None) => Ok(ast::Value::Null), - ScalarValue::Utf8(Some(str)) => Ok(ast::Value::SingleQuotedString(str.to_string())), - ScalarValue::Utf8(None) => Ok(ast::Value::Null), - ScalarValue::LargeUtf8(Some(str)) => Ok(ast::Value::SingleQuotedString(str.to_string())), - ScalarValue::LargeUtf8(None) => Ok(ast::Value::Null), - ScalarValue::Binary(Some(_)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::Binary(None) => Ok(ast::Value::Null), - ScalarValue::FixedSizeBinary(..) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::LargeBinary(Some(_)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::LargeBinary(None) => Ok(ast::Value::Null), - ScalarValue::FixedSizeList(_a) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::List(_a) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::LargeList(_a) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::Date32(Some(_d)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::Date32(None) => Ok(ast::Value::Null), - ScalarValue::Date64(Some(_d)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::Date64(None) => Ok(ast::Value::Null), - ScalarValue::Time32Second(Some(_t)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::Time32Second(None) => Ok(ast::Value::Null), - ScalarValue::Time32Millisecond(Some(_t)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::Time32Millisecond(None) => Ok(ast::Value::Null), - ScalarValue::Time64Microsecond(Some(_t)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::Time64Microsecond(None) => Ok(ast::Value::Null), - ScalarValue::Time64Nanosecond(Some(_t)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::Time64Nanosecond(None) => Ok(ast::Value::Null), - ScalarValue::TimestampSecond(Some(_ts), _) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::TimestampSecond(None, _) => Ok(ast::Value::Null), - ScalarValue::TimestampMillisecond(Some(_ts), _) => { - not_impl_err!("Unsupported scalar: {v:?}") - } - ScalarValue::TimestampMillisecond(None, _) => Ok(ast::Value::Null), - ScalarValue::TimestampMicrosecond(Some(_ts), _) => { - not_impl_err!("Unsupported scalar: {v:?}") - } - ScalarValue::TimestampMicrosecond(None, _) => Ok(ast::Value::Null), - ScalarValue::TimestampNanosecond(Some(_ts), _) => { - not_impl_err!("Unsupported scalar: {v:?}") - } - ScalarValue::TimestampNanosecond(None, _) => Ok(ast::Value::Null), - ScalarValue::IntervalYearMonth(Some(_i)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::IntervalYearMonth(None) => Ok(ast::Value::Null), - ScalarValue::IntervalDayTime(Some(_i)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::IntervalDayTime(None) => Ok(ast::Value::Null), - ScalarValue::IntervalMonthDayNano(Some(_i)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::IntervalMonthDayNano(None) => Ok(ast::Value::Null), - ScalarValue::DurationSecond(Some(_d)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::DurationSecond(None) => Ok(ast::Value::Null), - ScalarValue::DurationMillisecond(Some(_d)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::DurationMillisecond(None) => Ok(ast::Value::Null), - ScalarValue::DurationMicrosecond(Some(_d)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::DurationMicrosecond(None) => Ok(ast::Value::Null), - ScalarValue::DurationNanosecond(Some(_d)) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::DurationNanosecond(None) => Ok(ast::Value::Null), - ScalarValue::Struct(Some(_), _) => not_impl_err!("Unsupported scalar: {v:?}"), - ScalarValue::Struct(None, _) => Ok(ast::Value::Null), - ScalarValue::Dictionary(..) => not_impl_err!("Unsupported scalar: {v:?}"), - } -} - -fn col_to_sql(col: &Column, dialect: Arc) -> Result { - Ok(ast::Expr::CompoundIdentifier( - [ - col.relation.as_ref().unwrap().table().to_string(), - col.name.to_string(), - ] - .iter() - .map(|i| new_ident(i.to_string(), dialect.clone())) - .collect(), - )) -} - -fn join_operator_to_sql(join_type: JoinType, constraint: ast::JoinConstraint) -> JoinOperator { - match join_type { - JoinType::Inner => JoinOperator::Inner(constraint), - JoinType::Left => JoinOperator::LeftOuter(constraint), - JoinType::Right => JoinOperator::RightOuter(constraint), - JoinType::Full => JoinOperator::FullOuter(constraint), - JoinType::LeftAnti => JoinOperator::LeftAnti(constraint), - JoinType::LeftSemi => JoinOperator::LeftSemi(constraint), - JoinType::RightAnti => JoinOperator::RightAnti(constraint), - JoinType::RightSemi => JoinOperator::RightSemi(constraint), - } -} - -fn join_conditions_to_sql( - join_conditions: &Vec<(Expr, Expr)>, - eq_op: ast::BinaryOperator, - left_schema: &DFSchemaRef, - right_schema: &DFSchemaRef, - dialect: Arc, -) -> Result> { - // Only support AND conjunction for each binary expression in join conditions - let mut exprs: Vec = vec![]; - for (left, right) in join_conditions { - // Parse left - let l = expr_to_sql(left, left_schema, 0, dialect.clone())?; - // Parse right - let r = expr_to_sql( - right, - right_schema, - left_schema.fields().len(), // offset to return the correct index - dialect.clone(), - )?; - // AND with existing expression - exprs.push(binary_op_to_sql(l, r, eq_op.clone())); - } - let join_expr: Option = exprs.into_iter().reduce(and_op_to_sql); - Ok(join_expr) -} - -pub fn and_op_to_sql(lhs: SQLExpr, rhs: SQLExpr) -> SQLExpr { - binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And) -} - -pub fn binary_op_to_sql(lhs: SQLExpr, rhs: SQLExpr, op: ast::BinaryOperator) -> SQLExpr { - SQLExpr::BinaryOp { - left: Box::new(lhs), - op, - right: Box::new(rhs), - } -} - -fn new_table_alias(alias: String, dialect: Arc) -> ast::TableAlias { - ast::TableAlias { - name: new_ident(alias, dialect.clone()), - columns: Vec::new(), - } -} - -fn new_ident(str: String, dialect: Arc) -> ast::Ident { - ast::Ident { - value: str, - quote_style: if dialect.is::() { - Some('"') - } else if dialect.is::() || dialect.is::() { - Some('`') - } else { - todo!() - }, - } -} - -fn dml_to_sql(_plan: &LogicalPlan) -> Result { - Err(DataFusionError::NotImplemented( - "dml unsupported".to_string(), - )) -} - -fn builder_error_to_df(e: BuilderError) -> DataFusionError { - DataFusionError::External(format!("{e}").into()) -} - -#[cfg(test)] -mod tests { - use datafusion::{execution::context::SessionContext, test_util::TestTableFactory}; - - use super::*; - - #[tokio::test] - async fn test_select() { - let mut state = SessionContext::new().state(); - state - .table_factories_mut() - .insert("MOCKTABLE".to_string(), Arc::new(TestTableFactory {})); - let ctx = SessionContext::new_with_state(state); - - ctx.sql("CREATE EXTERNAL TABLE table_a (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap(); - ctx.sql("CREATE EXTERNAL TABLE table_b (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap(); - ctx.sql("CREATE EXTERNAL TABLE table_c (id integer, value string) STORED AS MOCKTABLE LOCATION 'mock://path';").await.unwrap(); - - let tests: Vec<(&str, &str)> = vec![ - ( - "select ta.id from table_a ta;", - r#"SELECT `ta`.`id` FROM `table_a` AS `ta`"#, - ), - ( - "select ta.id from table_a ta order by ta.id;", - r#"SELECT `ta`.`id` FROM `table_a` AS `ta` ORDER BY `ta`.`id` ASC NULLS LAST"#, - ), - ( - "select * from table_a ta order by ta.id, ta.value desc;", - r#"SELECT `ta`.`id`, `ta`.`value` FROM `table_a` AS `ta` ORDER BY `ta`.`id` ASC NULLS LAST, `ta`.`value` DESC NULLS FIRST"#, - ), - ( - "select * from table_a limit 10;", - r#"SELECT `table_a`.`id`, `table_a`.`value` FROM `table_a` LIMIT 10"#, - ), - ( - "select ta.id from table_a ta where ta.id > 1;", - r#"SELECT `ta`.`id` FROM `table_a` AS `ta` WHERE `ta`.`id` > 1"#, - ), - ( - "select ta.id, tb.value from table_a ta join table_b tb on ta.id = tb.id;", - r#"SELECT `ta`.`id`, `tb`.`value` FROM `table_a` AS `ta` JOIN `table_b` AS `tb` ON `ta`.`id` = `tb`.`id`"#, - ), - ( - "select ta.id, tb.value from table_a ta join table_b tb on ta.id = tb.id join table_c tc on ta.id = tc.id;", - r#"SELECT `ta`.`id`, `tb`.`value` FROM `table_a` AS `ta` JOIN `table_b` AS `tb` ON `ta`.`id` = `tb`.`id` JOIN `table_c` AS `tc` ON `ta`.`id` = `tc`.`id`"#, - ), - ]; - - for (query, expected) in tests { - let dialect: Arc = Arc::new(GenericDialect {}); - let plan = ctx.sql(query).await.unwrap().into_unoptimized_plan(); - - let ast = query_to_sql(&plan, dialect); - - assert!(ast.is_ok()); - let actual = format!("{}", ast.unwrap()); - assert_eq!(actual, expected); - } - } -}