From b07405d7961c4288309a582d87b566c3394cb30c Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 16 Jan 2025 12:44:10 -0600 Subject: [PATCH 1/5] wip --- Cargo.lock | 1 + src/daft-catalog/python-catalog/src/python.rs | 1 + src/daft-catalog/src/data_catalog.rs | 2 +- src/daft-catalog/src/lib.rs | 27 +++++++++++- src/daft-sql/Cargo.toml | 42 ++++++++++-------- src/daft-sql/src/catalog.rs | 43 ------------------- src/daft-sql/src/lib.rs | 11 +++-- src/daft-sql/src/planner.rs | 15 ++++--- src/daft-sql/src/python.rs | 16 ++++--- 9 files changed, 77 insertions(+), 81 deletions(-) delete mode 100644 src/daft-sql/src/catalog.rs diff --git a/Cargo.lock b/Cargo.lock index e4eb914800..e53cc48c30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2587,6 +2587,7 @@ dependencies = [ "common-io-config", "common-runtime", "daft-algebra", + "daft-catalog", "daft-core", "daft-dsl", "daft-functions", diff --git a/src/daft-catalog/python-catalog/src/python.rs b/src/daft-catalog/python-catalog/src/python.rs index 5d2ae500d6..8eab98ffd1 100644 --- a/src/daft-catalog/python-catalog/src/python.rs +++ b/src/daft-catalog/python-catalog/src/python.rs @@ -86,6 +86,7 @@ impl DataCatalogTable for PythonTable { } /// Wrapper around a `daft.catalog.python_catalog.PythonCatalog` +#[derive(Debug)] pub struct PythonCatalog { python_catalog_pyobj: PyObject, } diff --git a/src/daft-catalog/src/data_catalog.rs b/src/daft-catalog/src/data_catalog.rs index 0cf6d45708..59f6f0f491 100644 --- a/src/daft-catalog/src/data_catalog.rs +++ b/src/daft-catalog/src/data_catalog.rs @@ -4,7 +4,7 @@ use crate::{data_catalog_table::DataCatalogTable, errors::Result}; /// /// It allows registering and retrieving data sources, as well as querying their schemas. /// The catalog is used by the query planner to resolve table references in queries. -pub trait DataCatalog: Sync + Send { +pub trait DataCatalog: Sync + Send + std::fmt::Debug { /// Lists the fully-qualified names of tables in the catalog with the specified prefix fn list_tables(&self, prefix: &str) -> Result>; diff --git a/src/daft-catalog/src/lib.rs b/src/daft-catalog/src/lib.rs index 73f75864c8..f84e4a6dcf 100644 --- a/src/daft-catalog/src/lib.rs +++ b/src/daft-catalog/src/lib.rs @@ -50,6 +50,7 @@ static DEFAULT_CATALOG_NAME: &str = "default"; /// /// Users of Daft can register various [`DataCatalog`] with Daft, enabling /// discovery of tables across various [`DataCatalog`] implementations. +#[derive(Debug, Clone)] pub struct DaftMetaCatalog { /// Map of catalog names to the DataCatalog impls. /// @@ -60,6 +61,15 @@ pub struct DaftMetaCatalog { named_tables: HashMap, } +impl Default for DaftMetaCatalog { + fn default() -> Self { + Self { + data_catalogs: default::Default::default(), + named_tables: default::Default::default(), + } + } +} + impl DaftMetaCatalog { /// Create a `DaftMetaCatalog` from the current environment pub fn new_from_env() -> Self { @@ -95,13 +105,17 @@ impl DaftMetaCatalog { } /// Registers a LogicalPlan with a name in the DaftMetaCatalog - pub fn register_named_table(&mut self, name: &str, view: LogicalPlanBuilder) -> Result<()> { + pub fn register_named_table( + &mut self, + name: &str, + view: impl Into, + ) -> Result<()> { if !name.chars().all(|c| c.is_alphanumeric() || c == '_') { return Err(Error::InvalidTableName { name: name.to_string(), }); } - self.named_tables.insert(name.to_string(), view); + self.named_tables.insert(name.to_string(), view.into()); Ok(()) } @@ -146,6 +160,15 @@ impl DaftMetaCatalog { table_id: searched_table_name.to_string(), }) } + /// Copy from another catalog, using tables from other in case of conflict + pub fn copy_from(&mut self, other: &Self) { + for (name, plan) in &other.named_tables { + self.named_tables.insert(name.clone(), plan.clone()); + } + for (name, catalog) in &other.data_catalogs { + self.data_catalogs.insert(name.clone(), catalog.clone()); + } + } } #[cfg(test)] diff --git a/src/daft-sql/Cargo.toml b/src/daft-sql/Cargo.toml index a402235011..913a2e42d6 100644 --- a/src/daft-sql/Cargo.toml +++ b/src/daft-sql/Cargo.toml @@ -1,26 +1,34 @@ [dependencies] -common-daft-config = {path = "../common/daft-config"} -common-error = {path = "../common/error"} -common-io-config = {path = "../common/io-config", default-features = false} -common-runtime = {workspace = true} -daft-algebra = {path = "../daft-algebra"} -daft-core = {path = "../daft-core"} -daft-dsl = {path = "../daft-dsl"} -daft-functions = {path = "../daft-functions"} -daft-functions-json = {path = "../daft-functions-json"} -daft-logical-plan = {path = "../daft-logical-plan"} -daft-scan = {path = "../daft-scan"} -once_cell = {workspace = true} -pyo3 = {workspace = true, optional = true} -sqlparser = {workspace = true} +common-daft-config = { path = "../common/daft-config" } +common-error = { path = "../common/error" } +common-io-config = { path = "../common/io-config", default-features = false } +common-runtime = { workspace = true } +daft-algebra = { path = "../daft-algebra" } +daft-core = { path = "../daft-core" } +daft-dsl = { path = "../daft-dsl" } +daft-functions = { path = "../daft-functions" } +daft-functions-json = { path = "../daft-functions-json" } +daft-logical-plan = { path = "../daft-logical-plan" } +daft-scan = { path = "../daft-scan" } +once_cell = { workspace = true } +pyo3 = { workspace = true, optional = true } +sqlparser = { workspace = true } regex.workspace = true snafu.workspace = true +daft-catalog = { path = "../daft-catalog" } [dev-dependencies] -rstest = {workspace = true} +rstest = { workspace = true } [features] -python = ["dep:pyo3", "common-error/python", "daft-functions/python", "daft-functions-json/python", "daft-scan/python"] +python = [ + "dep:pyo3", + "common-error/python", + "daft-functions/python", + "daft-functions-json/python", + "daft-scan/python", + "daft-catalog/python", +] [lints] workspace = true @@ -28,4 +36,4 @@ workspace = true [package] name = "daft-sql" edition.workspace = true -version.workspace = true +version.workspace = true \ No newline at end of file diff --git a/src/daft-sql/src/catalog.rs b/src/daft-sql/src/catalog.rs deleted file mode 100644 index 0b5634da36..0000000000 --- a/src/daft-sql/src/catalog.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use daft_logical_plan::{LogicalPlan, LogicalPlanRef}; - -/// A simple map of table names to logical plans -#[derive(Debug, Clone)] -pub struct SQLCatalog { - tables: HashMap>, -} - -impl SQLCatalog { - /// Create an empty catalog - #[must_use] - pub fn new() -> Self { - Self { - tables: HashMap::new(), - } - } - - /// Register a table with the catalog - pub fn register_table(&mut self, name: &str, plan: LogicalPlanRef) { - self.tables.insert(name.to_string(), plan); - } - - /// Get a table from the catalog - #[must_use] - pub fn get_table(&self, name: &str) -> Option { - self.tables.get(name).cloned() - } - - /// Copy from another catalog, using tables from other in case of conflict - pub fn copy_from(&mut self, other: &Self) { - for (name, plan) in &other.tables { - self.tables.insert(name.clone(), plan.clone()); - } - } -} - -impl Default for SQLCatalog { - fn default() -> Self { - Self::new() - } -} diff --git a/src/daft-sql/src/lib.rs b/src/daft-sql/src/lib.rs index 211973f0d3..2615d0fe4d 100644 --- a/src/daft-sql/src/lib.rs +++ b/src/daft-sql/src/lib.rs @@ -1,6 +1,5 @@ #![feature(let_chains)] -pub mod catalog; pub mod error; pub mod functions; mod modules; @@ -28,7 +27,7 @@ pub fn register_modules(parent: &Bound) -> PyResult<()> { mod tests { use std::sync::Arc; - use catalog::SQLCatalog; + use daft_catalog::DaftMetaCatalog; use daft_core::prelude::*; use daft_dsl::{col, lit, Expr, OuterReferenceColumn, Subquery}; use daft_logical_plan::{ @@ -113,11 +112,11 @@ mod tests { #[fixture] fn planner() -> SQLPlanner<'static> { - let mut catalog = SQLCatalog::new(); + let mut catalog = DaftMetaCatalog::default(); - catalog.register_table("tbl1", tbl_1()); - catalog.register_table("tbl2", tbl_2()); - catalog.register_table("tbl3", tbl_3()); + catalog.register_named_table("tbl1", tbl_1()); + catalog.register_named_table("tbl2", tbl_2()); + catalog.register_named_table("tbl3", tbl_3()); SQLPlanner::new(catalog) } diff --git a/src/daft-sql/src/planner.rs b/src/daft-sql/src/planner.rs index 3355365629..e3c7e67acf 100644 --- a/src/daft-sql/src/planner.rs +++ b/src/daft-sql/src/planner.rs @@ -8,6 +8,7 @@ use std::{ use common_error::{DaftError, DaftResult}; use daft_algebra::boolean::combine_conjunction; +use daft_catalog::DaftMetaCatalog; use daft_core::prelude::*; use daft_dsl::{ col, @@ -34,8 +35,7 @@ use sqlparser::{ }; use crate::{ - catalog::SQLCatalog, column_not_found_err, error::*, invalid_operation_err, - table_not_found_err, unsupported_sql_err, + column_not_found_err, error::*, invalid_operation_err, table_not_found_err, unsupported_sql_err, }; /// A named logical plan @@ -74,14 +74,14 @@ impl Relation { /// Context that is shared across a query and its subqueries struct PlannerContext { - catalog: SQLCatalog, + catalog: DaftMetaCatalog, cte_map: HashMap, } impl Default for PlannerContext { fn default() -> Self { Self { - catalog: SQLCatalog::new(), + catalog: DaftMetaCatalog::default(), cte_map: Default::default(), } } @@ -100,7 +100,7 @@ pub struct SQLPlanner<'a> { } impl<'a> SQLPlanner<'a> { - pub fn new(catalog: SQLCatalog) -> Self { + pub fn new(catalog: DaftMetaCatalog) -> Self { let context = Rc::new(RefCell::new(PlannerContext { catalog, ..Default::default() @@ -146,7 +146,7 @@ impl<'a> SQLPlanner<'a> { Ref::map(self.context.borrow(), |i| &i.cte_map) } - fn catalog(&self) -> Ref<'_, SQLCatalog> { + fn catalog(&self) -> Ref<'_, DaftMetaCatalog> { Ref::map(self.context.borrow(), |i| &i.catalog) } @@ -1088,7 +1088,8 @@ impl<'a> SQLPlanner<'a> { .or_else(|| self.cte_map().get(&table_name).cloned()) .or_else(|| { self.catalog() - .get_table(&table_name) + .read_table(&table_name) + .ok() .map(|table| Relation::new(table.into(), table_name.clone())) }) else { diff --git a/src/daft-sql/src/python.rs b/src/daft-sql/src/python.rs index 69fc3426fc..d83fb9b943 100644 --- a/src/daft-sql/src/python.rs +++ b/src/daft-sql/src/python.rs @@ -1,9 +1,10 @@ use common_daft_config::PyDaftPlanningConfig; +use daft_catalog::DaftMetaCatalog; use daft_dsl::python::PyExpr; use daft_logical_plan::{LogicalPlanBuilder, PyLogicalPlanBuilder}; use pyo3::prelude::*; -use crate::{catalog::SQLCatalog, functions::SQL_FUNCTIONS, planner::SQLPlanner}; +use crate::{functions::SQL_FUNCTIONS, planner::SQLPlanner}; #[pyclass] pub struct SQLFunctionStub { @@ -68,7 +69,7 @@ pub fn list_sql_functions() -> Vec { #[pyclass(module = "daft.daft")] #[derive(Debug, Clone)] pub struct PyCatalog { - catalog: SQLCatalog, + catalog: DaftMetaCatalog, } #[pymethods] @@ -77,14 +78,19 @@ impl PyCatalog { #[staticmethod] pub fn new() -> Self { Self { - catalog: SQLCatalog::new(), + catalog: DaftMetaCatalog::default(), } } /// Register a table with the catalog. - pub fn register_table(&mut self, name: &str, dataframe: &mut PyLogicalPlanBuilder) { + pub fn register_table( + &mut self, + name: &str, + dataframe: &mut PyLogicalPlanBuilder, + ) -> PyResult<()> { let plan = dataframe.builder.build(); - self.catalog.register_table(name, plan); + self.catalog.register_named_table(name, plan)?; + Ok(()) } /// Copy from another catalog, using tables from other in case of conflict From 274c7a6dcdd64b47380d7264f6bb149275d19c2f Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 16 Jan 2025 13:27:41 -0600 Subject: [PATCH 2/5] feat: connect sql --- Cargo.lock | 1 + src/daft-catalog/src/lib.rs | 17 ++- src/daft-connect/Cargo.toml | 4 +- src/daft-connect/src/connect_service.rs | 17 ++- src/daft-connect/src/execute.rs | 135 +++++++++++++++++++++++- src/daft-connect/src/session.rs | 8 +- src/daft-connect/src/spark_analyzer.rs | 38 ++++++- src/daft-sql/src/catalog.rs | 43 ++++++++ src/daft-sql/src/planner.rs | 12 +-- tests/connect/test_sql.py | 21 ++++ 10 files changed, 266 insertions(+), 30 deletions(-) create mode 100644 src/daft-sql/src/catalog.rs create mode 100644 tests/connect/test_sql.py diff --git a/Cargo.lock b/Cargo.lock index e53cc48c30..25c94975be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2011,6 +2011,7 @@ dependencies = [ "common-daft-config", "common-error", "common-file-formats", + "daft-catalog", "daft-core", "daft-dsl", "daft-local-execution", diff --git a/src/daft-catalog/src/lib.rs b/src/daft-catalog/src/lib.rs index f84e4a6dcf..4f59745560 100644 --- a/src/daft-catalog/src/lib.rs +++ b/src/daft-catalog/src/lib.rs @@ -50,7 +50,7 @@ static DEFAULT_CATALOG_NAME: &str = "default"; /// /// Users of Daft can register various [`DataCatalog`] with Daft, enabling /// discovery of tables across various [`DataCatalog`] implementations. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct DaftMetaCatalog { /// Map of catalog names to the DataCatalog impls. /// @@ -61,15 +61,6 @@ pub struct DaftMetaCatalog { named_tables: HashMap, } -impl Default for DaftMetaCatalog { - fn default() -> Self { - Self { - data_catalogs: default::Default::default(), - named_tables: default::Default::default(), - } - } -} - impl DaftMetaCatalog { /// Create a `DaftMetaCatalog` from the current environment pub fn new_from_env() -> Self { @@ -119,6 +110,12 @@ impl DaftMetaCatalog { Ok(()) } + /// Check if a named table is registered in the DaftMetaCatalog + /// + pub fn contains_named_table(&self, name: &str) -> bool { + self.named_tables.contains_key(name) + } + /// Provides high-level functionality for reading a table of data against a [`DaftMetaCatalog`] /// /// Resolves the provided table_identifier against the catalog: diff --git a/src/daft-connect/Cargo.toml b/src/daft-connect/Cargo.toml index 2e09d1d55f..710d5ee472 100644 --- a/src/daft-connect/Cargo.toml +++ b/src/daft-connect/Cargo.toml @@ -4,6 +4,7 @@ async-stream = "0.3.6" common-daft-config = {workspace = true, optional = true, features = ["python"]} common-error = {workspace = true, optional = true, features = ["python"]} common-file-formats = {workspace = true, optional = true, features = ["python"]} +daft-catalog = {path = "../daft-catalog", optional = true, features = ["python"]} daft-core = {workspace = true, optional = true, features = ["python"]} daft-dsl = {workspace = true, optional = true, features = ["python"]} daft-local-execution = {workspace = true, optional = true, features = ["python"]} @@ -43,7 +44,8 @@ python = [ "dep:daft-scan", "dep:daft-schema", "dep:daft-sql", - "dep:daft-table" + "dep:daft-table", + "dep:daft-catalog" ] [lints] diff --git a/src/daft-connect/src/connect_service.rs b/src/daft-connect/src/connect_service.rs index 9071aed949..8b71d65e86 100644 --- a/src/daft-connect/src/connect_service.rs +++ b/src/daft-connect/src/connect_service.rs @@ -79,17 +79,24 @@ impl SparkConnectService for DaftSparkConnectService { } OpType::Command(command) => { let command = command.command_type.required("command_type")?; - match command { CommandType::WriteOperation(op) => { let result = session.execute_write_operation(op, rb).await?; Ok(Response::new(result)) } + CommandType::RegisterFunction(_) => todo!(), + CommandType::CreateDataframeView(create_dataframe) => { + let result = session + .execute_create_dataframe_view(create_dataframe, rb) + .await?; + Ok(Response::new(result)) + } + CommandType::SqlCommand(sql) => { + let result = session.execute_sql_command(sql, rb).await?; + Ok(Response::new(result)) + } other => { - return not_yet_implemented!( - "Command type: {}", - command_type_to_str(&other) - ) + not_yet_implemented!("CommandType '{:?}'", command_type_to_str(&other)) } } } diff --git a/src/daft-connect/src/execute.rs b/src/daft-connect/src/execute.rs index 28b4598565..59af957b22 100644 --- a/src/daft-connect/src/execute.rs +++ b/src/daft-connect/src/execute.rs @@ -9,7 +9,7 @@ use daft_logical_plan::LogicalPlanBuilder; use daft_micropartition::MicroPartition; use daft_ray_execution::RayEngine; use daft_table::Table; -use eyre::bail; +use eyre::{bail, Context}; use futures::{ stream::{self, BoxStream}, StreamExt, TryFutureExt, TryStreamExt, @@ -19,7 +19,8 @@ use pyo3::Python; use spark_connect::{ relation::RelType, write_operation::{SaveMode, SaveType}, - ExecutePlanResponse, Relation, ShowString, WriteOperation, + CreateDataFrameViewCommand, ExecutePlanResponse, Relation, ShowString, SqlCommand, + WriteOperation, }; use tonic::{codegen::tokio_stream::wrappers::ReceiverStream, Status}; use tracing::debug; @@ -236,6 +237,136 @@ impl Session { Ok(Box::pin(stream)) } + pub async fn execute_create_dataframe_view( + &self, + create_dataframe: CreateDataFrameViewCommand, + rb: ResponseBuilder, + ) -> Result { + let CreateDataFrameViewCommand { + input, + name, + is_global, + replace, + } = create_dataframe; + + if is_global { + return not_yet_implemented!("Global dataframe view"); + } + + let input = input.required("input")?; + let input = SparkAnalyzer::new(self) + .to_logical_plan(input) + .await + .map_err(|e| { + Status::internal( + textwrap::wrap(&format!("Error in Daft server: {e}"), 120).join("\n"), + ) + })?; + + { + let catalog = self.catalog.read().unwrap(); + if !replace && catalog.contains_named_table(&name) { + return Err(Status::internal("Dataframe view already exists")); + } + } + + let mut catalog = self.catalog.write().unwrap(); + + catalog.register_named_table(&name, input).map_err(|e| { + Status::internal(textwrap::wrap(&format!("Error in Daft server: {e}"), 120).join("\n")) + })?; + + let response = rb.result_complete_response(); + let stream = stream::once(ready(Ok(response))); + Ok(Box::pin(stream)) + } + + #[allow(deprecated)] + pub async fn execute_sql_command( + &self, + SqlCommand { + sql, + args, + pos_args, + named_arguments, + pos_arguments, + input, + }: SqlCommand, + res: ResponseBuilder, + ) -> Result { + if !args.is_empty() { + return not_yet_implemented!("Named arguments"); + } + if !pos_args.is_empty() { + return not_yet_implemented!("Positional arguments"); + } + if !named_arguments.is_empty() { + return not_yet_implemented!("Named arguments"); + } + if !pos_arguments.is_empty() { + return not_yet_implemented!("Positional arguments"); + } + + if input.is_some() { + return not_yet_implemented!("Input"); + } + + let catalog = self.catalog.read().unwrap(); + let catalog = catalog.clone(); + + let mut planner = daft_sql::SQLPlanner::new(catalog); + + let plan = planner + .plan_sql(&sql) + .wrap_err("Error planning SQL") + .map_err(|e| { + Status::internal( + textwrap::wrap(&format!("Error in Daft server: {e}"), 120).join("\n"), + ) + })?; + + let plan = LogicalPlanBuilder::from(plan); + + // TODO: code duplication + let result_complete = res.result_complete_response(); + + let (tx, rx) = tokio::sync::mpsc::channel::>(1); + + let this = self.clone(); + + tokio::spawn(async move { + let execution_fut = async { + let mut result_stream = this.run_query(plan).await?; + while let Some(result) = result_stream.next().await { + let result = result?; + let tables = result.get_tables()?; + for table in tables.as_slice() { + let response = res.arrow_batch_response(table)?; + if tx.send(Ok(response)).await.is_err() { + return Ok(()); + } + } + } + Ok(()) + }; + if let Err(e) = execution_fut.await { + let _ = tx.send(Err(e)).await; + } + }); + + let stream = ReceiverStream::new(rx); + + let stream = stream + .map_err(|e| { + Status::internal( + textwrap::wrap(&format!("Error in Daft server: {e}"), 120).join("\n"), + ) + }) + .chain(stream::once(ready(Ok(result_complete)))); + + Ok(Box::pin(stream)) + } + async fn show_string( &self, show_string: ShowString, diff --git a/src/daft-connect/src/session.rs b/src/daft-connect/src/session.rs index ae618e4c8b..bb07556db0 100644 --- a/src/daft-connect/src/session.rs +++ b/src/daft-connect/src/session.rs @@ -1,5 +1,9 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{ + collections::BTreeMap, + sync::{Arc, RwLock}, +}; +use daft_catalog::DaftMetaCatalog; use daft_micropartition::partitioning::InMemoryPartitionSetCache; use uuid::Uuid; @@ -15,6 +19,7 @@ pub struct Session { /// MicroPartitionSet associated with this session /// this will be filled up as the user runs queries pub(crate) psets: Arc, + pub(crate) catalog: Arc>, } impl Session { @@ -34,6 +39,7 @@ impl Session { id, server_side_session_id, psets: Arc::new(InMemoryPartitionSetCache::empty()), + catalog: Arc::new(RwLock::new(DaftMetaCatalog::default())), } } diff --git a/src/daft-connect/src/spark_analyzer.rs b/src/daft-connect/src/spark_analyzer.rs index 3519725ff7..f98cbec714 100644 --- a/src/daft-connect/src/spark_analyzer.rs +++ b/src/daft-connect/src/spark_analyzer.rs @@ -18,6 +18,7 @@ use daft_micropartition::{ }; use daft_scan::builder::{CsvScanBuilder, ParquetScanBuilder}; use daft_schema::schema::{Schema, SchemaRef}; +use daft_sql::SQLPlanner; use daft_table::Table; use datatype::to_daft_datatype; pub use datatype::to_spark_datatype; @@ -36,7 +37,7 @@ use spark_connect::{ }, read::ReadType, relation::RelType, - Deduplicate, Expression, Limit, Range, Relation, Sort, + Deduplicate, Expression, Limit, Range, Relation, Sort, Sql, }; use tracing::debug; @@ -144,6 +145,7 @@ impl SparkAnalyzer<'_> { RelType::ShowString(_) => unreachable!("should already be handled in execute"), RelType::Deduplicate(rel) => self.deduplicate(*rel).await, RelType::Sort(rel) => self.sort(*rel).await, + RelType::Sql(sql) => self.sql(sql).await, plan => not_yet_implemented!("relation type: \"{}\"", rel_name(&plan))?, } } @@ -644,6 +646,40 @@ impl SparkAnalyzer<'_> { Ok(result) } + #[allow(deprecated)] + async fn sql(&self, sql: Sql) -> eyre::Result { + let Sql { + query, + args, + pos_args, + named_arguments, + pos_arguments, + } = sql; + if !args.is_empty() { + not_yet_implemented!("args")?; + } + if !pos_args.is_empty() { + not_yet_implemented!("pos_args")?; + } + if !named_arguments.is_empty() { + not_yet_implemented!("named_arguments")?; + } + if !pos_arguments.is_empty() { + not_yet_implemented!("pos_arguments")?; + } + + let catalog = self + .session + .catalog + .read() + .map_err(|e| eyre::eyre!("Failed to read catalog: {e}"))?; + let catalog = catalog.clone(); + + let mut planner = SQLPlanner::new(catalog); + let plan = planner.plan_sql(&query)?; + Ok(plan.into()) + } + pub fn to_daft_expr(&self, expression: &Expression) -> eyre::Result { if let Some(common) = &expression.common { if common.origin.is_some() { diff --git a/src/daft-sql/src/catalog.rs b/src/daft-sql/src/catalog.rs new file mode 100644 index 0000000000..42b8d4b725 --- /dev/null +++ b/src/daft-sql/src/catalog.rs @@ -0,0 +1,43 @@ +use std::{collections::HashMap, sync::Arc}; + +use daft_logical_plan::{LogicalPlan, LogicalPlanRef}; + +/// A simple map of table names to logical plans +#[derive(Debug, Clone)] +pub struct InMemoryCatalog { + tables: HashMap>, +} + +impl InMemoryCatalog { + /// Create an empty catalog + #[must_use] + pub fn new() -> Self { + Self { + tables: HashMap::new(), + } + } + + /// Register a table with the catalog + pub fn register_table(&mut self, name: &str, plan: LogicalPlanRef) { + self.tables.insert(name.to_string(), plan); + } + + /// Get a table from the catalog + #[must_use] + pub fn get_table(&self, name: &str) -> Option { + self.tables.get(name).cloned() + } + + /// Copy from another catalog, using tables from other in case of conflict + pub fn copy_from(&mut self, other: &Self) { + for (name, plan) in &other.tables { + self.tables.insert(name.clone(), plan.clone()); + } + } +} + +impl Default for InMemoryCatalog { + fn default() -> Self { + Self::new() + } +} diff --git a/src/daft-sql/src/planner.rs b/src/daft-sql/src/planner.rs index e3c7e67acf..348e0a0cf8 100644 --- a/src/daft-sql/src/planner.rs +++ b/src/daft-sql/src/planner.rs @@ -73,20 +73,12 @@ impl Relation { } /// Context that is shared across a query and its subqueries +#[derive(Default)] struct PlannerContext { catalog: DaftMetaCatalog, cte_map: HashMap, } -impl Default for PlannerContext { - fn default() -> Self { - Self { - catalog: DaftMetaCatalog::default(), - cte_map: Default::default(), - } - } -} - #[derive(Default)] pub struct SQLPlanner<'a> { current_relation: Option, @@ -1090,7 +1082,7 @@ impl<'a> SQLPlanner<'a> { self.catalog() .read_table(&table_name) .ok() - .map(|table| Relation::new(table.into(), table_name.clone())) + .map(|table| Relation::new(table, table_name.clone())) }) else { table_not_found_err!(table_name) diff --git a/tests/connect/test_sql.py b/tests/connect/test_sql.py new file mode 100644 index 0000000000..636e92b263 --- /dev/null +++ b/tests/connect/test_sql.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import pytest + + +def test_create_or_replace_temp_view(spark_session): + df = spark_session.createDataFrame([(1, "foo")], ["id", "name"]) + try: + df.createOrReplaceTempView("test_view") + except Exception as e: + pytest.fail(f"createOrReplaceTempView failed: {e}") + + +def test_sql(spark_session): + df = spark_session.createDataFrame([(1, "foo")], ["id", "name"]) + df.createOrReplaceTempView("test_view") + try: + result = spark_session.sql("SELECT * FROM test_view") + except Exception as e: + pytest.fail(f"sql failed: {e}") + assert result.collect() == [(1, "foo")] From 4fbd13975c1ebef51ee16863d1b3b78548e818ee Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 16 Jan 2025 13:41:48 -0600 Subject: [PATCH 3/5] fix ci issues --- src/daft-sql/Cargo.toml | 46 +++++++++---------- .../{test_sql.py => test_spark_sql.py} | 0 2 files changed, 23 insertions(+), 23 deletions(-) rename tests/connect/{test_sql.py => test_spark_sql.py} (100%) diff --git a/src/daft-sql/Cargo.toml b/src/daft-sql/Cargo.toml index 913a2e42d6..42b7163666 100644 --- a/src/daft-sql/Cargo.toml +++ b/src/daft-sql/Cargo.toml @@ -1,33 +1,33 @@ [dependencies] -common-daft-config = { path = "../common/daft-config" } -common-error = { path = "../common/error" } -common-io-config = { path = "../common/io-config", default-features = false } -common-runtime = { workspace = true } -daft-algebra = { path = "../daft-algebra" } -daft-core = { path = "../daft-core" } -daft-dsl = { path = "../daft-dsl" } -daft-functions = { path = "../daft-functions" } -daft-functions-json = { path = "../daft-functions-json" } -daft-logical-plan = { path = "../daft-logical-plan" } -daft-scan = { path = "../daft-scan" } -once_cell = { workspace = true } -pyo3 = { workspace = true, optional = true } -sqlparser = { workspace = true } +common-daft-config = {path = "../common/daft-config"} +common-error = {path = "../common/error"} +common-io-config = {path = "../common/io-config", default-features = false} +common-runtime = {workspace = true} +daft-algebra = {path = "../daft-algebra"} +daft-catalog = {path = "../daft-catalog"} +daft-core = {path = "../daft-core"} +daft-dsl = {path = "../daft-dsl"} +daft-functions = {path = "../daft-functions"} +daft-functions-json = {path = "../daft-functions-json"} +daft-logical-plan = {path = "../daft-logical-plan"} +daft-scan = {path = "../daft-scan"} +once_cell = {workspace = true} +pyo3 = {workspace = true, optional = true} +sqlparser = {workspace = true} regex.workspace = true snafu.workspace = true -daft-catalog = { path = "../daft-catalog" } [dev-dependencies] -rstest = { workspace = true } +rstest = {workspace = true} [features] python = [ - "dep:pyo3", - "common-error/python", - "daft-functions/python", - "daft-functions-json/python", - "daft-scan/python", - "daft-catalog/python", + "dep:pyo3", + "common-error/python", + "daft-functions/python", + "daft-functions-json/python", + "daft-scan/python", + "daft-catalog/python" ] [lints] @@ -36,4 +36,4 @@ workspace = true [package] name = "daft-sql" edition.workspace = true -version.workspace = true \ No newline at end of file +version.workspace = true diff --git a/tests/connect/test_sql.py b/tests/connect/test_spark_sql.py similarity index 100% rename from tests/connect/test_sql.py rename to tests/connect/test_spark_sql.py From f8df09c16cd77a2113b367a7f85a22fc006b110e Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 16 Jan 2025 14:49:22 -0600 Subject: [PATCH 4/5] pr feedback --- daft/catalog/__init__.py | 2 +- src/daft-catalog/src/lib.rs | 31 +++++++++++------------- src/daft-catalog/src/python.rs | 2 +- src/daft-connect/src/execute.rs | 4 +-- src/daft-connect/src/session.rs | 6 ++--- src/daft-sql/src/catalog.rs | 43 --------------------------------- src/daft-sql/src/lib.rs | 10 ++++---- src/daft-sql/src/planner.rs | 8 +++--- src/daft-sql/src/python.rs | 8 +++--- 9 files changed, 34 insertions(+), 80 deletions(-) delete mode 100644 src/daft-sql/src/catalog.rs diff --git a/daft/catalog/__init__.py b/daft/catalog/__init__.py index 5e74fd4c08..b6ac2d26fa 100644 --- a/daft/catalog/__init__.py +++ b/daft/catalog/__init__.py @@ -27,7 +27,7 @@ ```python df = daft.from_pydict({"foo": [1, 2, 3]}) -daft.catalog.register_named_table( +daft.catalog.register_table( "my_table", df, ) diff --git a/src/daft-catalog/src/lib.rs b/src/daft-catalog/src/lib.rs index 4f59745560..87faed0c19 100644 --- a/src/daft-catalog/src/lib.rs +++ b/src/daft-catalog/src/lib.rs @@ -19,11 +19,11 @@ pub mod global_catalog { use lazy_static::lazy_static; - use crate::{DaftMetaCatalog, DataCatalog}; + use crate::{DaftCatalog, DataCatalog}; lazy_static! { - pub(crate) static ref GLOBAL_DAFT_META_CATALOG: RwLock = - RwLock::new(DaftMetaCatalog::new_from_env()); + pub(crate) static ref GLOBAL_DAFT_META_CATALOG: RwLock = + RwLock::new(DaftCatalog::new_from_env()); } /// Register a DataCatalog with the global DaftMetaCatalog @@ -51,7 +51,7 @@ static DEFAULT_CATALOG_NAME: &str = "default"; /// Users of Daft can register various [`DataCatalog`] with Daft, enabling /// discovery of tables across various [`DataCatalog`] implementations. #[derive(Debug, Clone, Default)] -pub struct DaftMetaCatalog { +pub struct DaftCatalog { /// Map of catalog names to the DataCatalog impls. /// /// NOTE: The default catalog is always named "default" @@ -61,11 +61,11 @@ pub struct DaftMetaCatalog { named_tables: HashMap, } -impl DaftMetaCatalog { +impl DaftCatalog { /// Create a `DaftMetaCatalog` from the current environment pub fn new_from_env() -> Self { // TODO: Parse a YAML file to produce the catalog - DaftMetaCatalog { + DaftCatalog { data_catalogs: default::Default::default(), named_tables: default::Default::default(), } @@ -96,7 +96,7 @@ impl DaftMetaCatalog { } /// Registers a LogicalPlan with a name in the DaftMetaCatalog - pub fn register_named_table( + pub fn register_table( &mut self, name: &str, view: impl Into, @@ -110,9 +110,8 @@ impl DaftMetaCatalog { Ok(()) } - /// Check if a named table is registered in the DaftMetaCatalog - /// - pub fn contains_named_table(&self, name: &str) -> bool { + /// Check if a named table is registered in the DaftCatalog + pub fn contains_table(&self, name: &str) -> bool { self.named_tables.contains_key(name) } @@ -201,26 +200,24 @@ mod tests { #[test] fn test_register_and_unregister_named_table() { - let mut catalog = DaftMetaCatalog::new_from_env(); + let mut catalog = DaftCatalog::new_from_env(); let plan = LogicalPlanBuilder::from(mock_plan()); // Register a table - assert!(catalog - .register_named_table("test_table", plan.clone()) - .is_ok()); + assert!(catalog.register_table("test_table", plan.clone()).is_ok()); // Try to register a table with invalid name assert!(catalog - .register_named_table("invalid name", plan.clone()) + .register_table("invalid name", plan.clone()) .is_err()); } #[test] fn test_read_registered_table() { - let mut catalog = DaftMetaCatalog::new_from_env(); + let mut catalog = DaftCatalog::new_from_env(); let plan = LogicalPlanBuilder::from(mock_plan()); - catalog.register_named_table("test_table", plan).unwrap(); + catalog.register_table("test_table", plan).unwrap(); assert!(catalog.read_table("test_table").is_ok()); assert!(catalog.read_table("non_existent_table").is_err()); diff --git a/src/daft-catalog/src/python.rs b/src/daft-catalog/src/python.rs index a4896402ec..9f4381bd09 100644 --- a/src/daft-catalog/src/python.rs +++ b/src/daft-catalog/src/python.rs @@ -61,7 +61,7 @@ fn py_register_table( global_catalog::GLOBAL_DAFT_META_CATALOG .write() .unwrap() - .register_named_table(table_identifier, logical_plan.builder.clone())?; + .register_table(table_identifier, logical_plan.builder.clone())?; Ok(table_identifier.to_string()) } diff --git a/src/daft-connect/src/execute.rs b/src/daft-connect/src/execute.rs index 59af957b22..23caca66b9 100644 --- a/src/daft-connect/src/execute.rs +++ b/src/daft-connect/src/execute.rs @@ -265,14 +265,14 @@ impl Session { { let catalog = self.catalog.read().unwrap(); - if !replace && catalog.contains_named_table(&name) { + if !replace && catalog.contains_table(&name) { return Err(Status::internal("Dataframe view already exists")); } } let mut catalog = self.catalog.write().unwrap(); - catalog.register_named_table(&name, input).map_err(|e| { + catalog.register_table(&name, input).map_err(|e| { Status::internal(textwrap::wrap(&format!("Error in Daft server: {e}"), 120).join("\n")) })?; diff --git a/src/daft-connect/src/session.rs b/src/daft-connect/src/session.rs index bb07556db0..25a8024f76 100644 --- a/src/daft-connect/src/session.rs +++ b/src/daft-connect/src/session.rs @@ -3,7 +3,7 @@ use std::{ sync::{Arc, RwLock}, }; -use daft_catalog::DaftMetaCatalog; +use daft_catalog::DaftCatalog; use daft_micropartition::partitioning::InMemoryPartitionSetCache; use uuid::Uuid; @@ -19,7 +19,7 @@ pub struct Session { /// MicroPartitionSet associated with this session /// this will be filled up as the user runs queries pub(crate) psets: Arc, - pub(crate) catalog: Arc>, + pub(crate) catalog: Arc>, } impl Session { @@ -39,7 +39,7 @@ impl Session { id, server_side_session_id, psets: Arc::new(InMemoryPartitionSetCache::empty()), - catalog: Arc::new(RwLock::new(DaftMetaCatalog::default())), + catalog: Arc::new(RwLock::new(DaftCatalog::default())), } } diff --git a/src/daft-sql/src/catalog.rs b/src/daft-sql/src/catalog.rs deleted file mode 100644 index 42b8d4b725..0000000000 --- a/src/daft-sql/src/catalog.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use daft_logical_plan::{LogicalPlan, LogicalPlanRef}; - -/// A simple map of table names to logical plans -#[derive(Debug, Clone)] -pub struct InMemoryCatalog { - tables: HashMap>, -} - -impl InMemoryCatalog { - /// Create an empty catalog - #[must_use] - pub fn new() -> Self { - Self { - tables: HashMap::new(), - } - } - - /// Register a table with the catalog - pub fn register_table(&mut self, name: &str, plan: LogicalPlanRef) { - self.tables.insert(name.to_string(), plan); - } - - /// Get a table from the catalog - #[must_use] - pub fn get_table(&self, name: &str) -> Option { - self.tables.get(name).cloned() - } - - /// Copy from another catalog, using tables from other in case of conflict - pub fn copy_from(&mut self, other: &Self) { - for (name, plan) in &other.tables { - self.tables.insert(name.clone(), plan.clone()); - } - } -} - -impl Default for InMemoryCatalog { - fn default() -> Self { - Self::new() - } -} diff --git a/src/daft-sql/src/lib.rs b/src/daft-sql/src/lib.rs index 2615d0fe4d..82c4cc6f93 100644 --- a/src/daft-sql/src/lib.rs +++ b/src/daft-sql/src/lib.rs @@ -27,7 +27,7 @@ pub fn register_modules(parent: &Bound) -> PyResult<()> { mod tests { use std::sync::Arc; - use daft_catalog::DaftMetaCatalog; + use daft_catalog::DaftCatalog; use daft_core::prelude::*; use daft_dsl::{col, lit, Expr, OuterReferenceColumn, Subquery}; use daft_logical_plan::{ @@ -112,11 +112,11 @@ mod tests { #[fixture] fn planner() -> SQLPlanner<'static> { - let mut catalog = DaftMetaCatalog::default(); + let mut catalog = DaftCatalog::default(); - catalog.register_named_table("tbl1", tbl_1()); - catalog.register_named_table("tbl2", tbl_2()); - catalog.register_named_table("tbl3", tbl_3()); + catalog.register_table("tbl1", tbl_1()); + catalog.register_table("tbl2", tbl_2()); + catalog.register_table("tbl3", tbl_3()); SQLPlanner::new(catalog) } diff --git a/src/daft-sql/src/planner.rs b/src/daft-sql/src/planner.rs index 348e0a0cf8..87cdece093 100644 --- a/src/daft-sql/src/planner.rs +++ b/src/daft-sql/src/planner.rs @@ -8,7 +8,7 @@ use std::{ use common_error::{DaftError, DaftResult}; use daft_algebra::boolean::combine_conjunction; -use daft_catalog::DaftMetaCatalog; +use daft_catalog::DaftCatalog; use daft_core::prelude::*; use daft_dsl::{ col, @@ -75,7 +75,7 @@ impl Relation { /// Context that is shared across a query and its subqueries #[derive(Default)] struct PlannerContext { - catalog: DaftMetaCatalog, + catalog: DaftCatalog, cte_map: HashMap, } @@ -92,7 +92,7 @@ pub struct SQLPlanner<'a> { } impl<'a> SQLPlanner<'a> { - pub fn new(catalog: DaftMetaCatalog) -> Self { + pub fn new(catalog: DaftCatalog) -> Self { let context = Rc::new(RefCell::new(PlannerContext { catalog, ..Default::default() @@ -138,7 +138,7 @@ impl<'a> SQLPlanner<'a> { Ref::map(self.context.borrow(), |i| &i.cte_map) } - fn catalog(&self) -> Ref<'_, DaftMetaCatalog> { + fn catalog(&self) -> Ref<'_, DaftCatalog> { Ref::map(self.context.borrow(), |i| &i.catalog) } diff --git a/src/daft-sql/src/python.rs b/src/daft-sql/src/python.rs index d83fb9b943..32a9d8a46c 100644 --- a/src/daft-sql/src/python.rs +++ b/src/daft-sql/src/python.rs @@ -1,5 +1,5 @@ use common_daft_config::PyDaftPlanningConfig; -use daft_catalog::DaftMetaCatalog; +use daft_catalog::DaftCatalog; use daft_dsl::python::PyExpr; use daft_logical_plan::{LogicalPlanBuilder, PyLogicalPlanBuilder}; use pyo3::prelude::*; @@ -69,7 +69,7 @@ pub fn list_sql_functions() -> Vec { #[pyclass(module = "daft.daft")] #[derive(Debug, Clone)] pub struct PyCatalog { - catalog: DaftMetaCatalog, + catalog: DaftCatalog, } #[pymethods] @@ -78,7 +78,7 @@ impl PyCatalog { #[staticmethod] pub fn new() -> Self { Self { - catalog: DaftMetaCatalog::default(), + catalog: DaftCatalog::default(), } } @@ -89,7 +89,7 @@ impl PyCatalog { dataframe: &mut PyLogicalPlanBuilder, ) -> PyResult<()> { let plan = dataframe.builder.build(); - self.catalog.register_named_table(name, plan)?; + self.catalog.register_table(name, plan)?; Ok(()) } From 20704df5f1beb04d871f3ef538c31772b366ecb3 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 16 Jan 2025 15:54:52 -0600 Subject: [PATCH 5/5] fix bad merge --- src/daft-connect/src/connect_service.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/daft-connect/src/connect_service.rs b/src/daft-connect/src/connect_service.rs index 8b71d65e86..6cf907ff73 100644 --- a/src/daft-connect/src/connect_service.rs +++ b/src/daft-connect/src/connect_service.rs @@ -84,7 +84,6 @@ impl SparkConnectService for DaftSparkConnectService { let result = session.execute_write_operation(op, rb).await?; Ok(Response::new(result)) } - CommandType::RegisterFunction(_) => todo!(), CommandType::CreateDataframeView(create_dataframe) => { let result = session .execute_create_dataframe_view(create_dataframe, rb)