From aec20d44192afbc5ef76539059e25977b9047f6b Mon Sep 17 00:00:00 2001 From: hozan23 <119854621+hozan23@users.noreply.github.com> Date: Sun, 10 Nov 2024 15:36:26 +0100 Subject: [PATCH] update dependencies & use datafusion 43 (#75) --- Cargo.toml | 18 +++++++++--------- datafusion-federation/src/optimizer/mod.rs | 2 +- datafusion-federation/src/plan_node.rs | 15 +++++++++++++-- datafusion-federation/src/sql/mod.rs | 6 ++++-- datafusion-federation/src/sql/schema.rs | 3 +++ datafusion-federation/src/table_provider.rs | 19 ++++++++++++++++--- datafusion-flight-sql-server/src/service.rs | 4 ++-- 7 files changed, 48 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e1dd689..49a9ae9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,17 +15,17 @@ readme = "README.md" repository = "https://github.com/datafusion-contrib/datafusion-federation" [workspace.dependencies] -arrow = "53.0.0" -arrow-flight = { version = "53.0.0", features = ["flight-sql-experimental"] } -arrow-json = "53.0.0" +arrow = "53.2" +arrow-flight = { version = "53.2", features = ["flight-sql-experimental"] } +arrow-json = "53.2" async-stream = "0.3.5" -async-trait = "0.1.81" -datafusion = "42.0.0" +async-trait = "0.1.83" +datafusion = "43.0.0" datafusion-federation = { path = "./datafusion-federation", version = "0.3.0" } -datafusion-substrait = "42.0.0" -futures = "0.3.30" -tokio = { version = "1.39.3", features = ["full"] } -tonic = { version = "0.12.1", features = [ +datafusion-substrait = "43.0.0" +futures = "0.3.31" +tokio = { version = "1.41", features = ["full"] } +tonic = { version = "0.12", features = [ "tls", "transport", "codegen", diff --git a/datafusion-federation/src/optimizer/mod.rs b/datafusion-federation/src/optimizer/mod.rs index 6aa2e40..9c16dfc 100644 --- a/datafusion-federation/src/optimizer/mod.rs +++ b/datafusion-federation/src/optimizer/mod.rs @@ -22,7 +22,7 @@ use scan_result::ScanResult; /// The optimizer logic walks over the plan, look for the largest subtrees that only have /// TableScans from the same [`FederationProvider`]. There 'largest sub-trees' are passed to their /// respective [`FederationProvider::optimizer`]. -#[derive(Default)] +#[derive(Default, Debug)] pub struct FederationOptimizerRule {} impl OptimizerRule for FederationOptimizerRule { diff --git a/datafusion-federation/src/plan_node.rs b/datafusion-federation/src/plan_node.rs index 6473aca..0647f9d 100644 --- a/datafusion-federation/src/plan_node.rs +++ b/datafusion-federation/src/plan_node.rs @@ -72,8 +72,7 @@ impl UserDefinedLogicalNodeCore for FederatedPlanNode { } } -#[derive(Default)] - +#[derive(Default, Debug)] pub struct FederatedQueryPlanner {} impl FederatedQueryPlanner { @@ -110,6 +109,12 @@ pub trait FederationPlanner: Send + Sync { ) -> Result>; } +impl std::fmt::Debug for dyn FederationPlanner { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "FederationPlanner") + } +} + impl PartialEq for FederatedPlanNode { /// Comparing name, args and return_type fn eq(&self, other: &FederatedPlanNode) -> bool { @@ -117,6 +122,12 @@ impl PartialEq for FederatedPlanNode { } } +impl PartialOrd for FederatedPlanNode { + fn partial_cmp(&self, other: &FederatedPlanNode) -> Option { + self.plan.partial_cmp(&other.plan) + } +} + impl Eq for FederatedPlanNode {} impl Hash for FederatedPlanNode { diff --git a/datafusion-federation/src/sql/mod.rs b/datafusion-federation/src/sql/mod.rs index 63d933f..d63d52d 100644 --- a/datafusion-federation/src/sql/mod.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -40,6 +40,7 @@ use crate::{ // extern crate derive_builder; // SQLFederationProvider provides federation to SQL DMBSs. +#[derive(Debug)] pub struct SQLFederationProvider { optimizer: Arc, executor: Arc, @@ -70,6 +71,7 @@ impl FederationProvider for SQLFederationProvider { } } +#[derive(Debug)] struct SQLFederationOptimizerRule { planner: Arc, } @@ -926,8 +928,8 @@ mod tests { ), // different tables in single aggregation expression ( - "SELECT COUNT(CASE WHEN app_table.a > 0 THEN app_table.a ELSE foo.df_table.a END) FROM app_table, foo.df_table", - r#"SELECT count(CASE WHEN (remote_table.a > 0) THEN remote_table.a ELSE remote_table.a END) FROM remote_table JOIN remote_table ON true"#, + "SELECT COUNT(CASE WHEN appt.a > 0 THEN appt.a ELSE dft.a END) FROM app_table as appt, foo.df_table as dft", + "SELECT count(CASE WHEN (appt.a > 0) THEN appt.a ELSE dft.a END) FROM remote_table AS appt JOIN remote_table AS dft" ), ]; diff --git a/datafusion-federation/src/sql/schema.rs b/datafusion-federation/src/sql/schema.rs index 8e9cf25..1961226 100644 --- a/datafusion-federation/src/sql/schema.rs +++ b/datafusion-federation/src/sql/schema.rs @@ -12,6 +12,7 @@ use crate::{ FederationProvider, }; +#[derive(Debug)] pub struct SQLSchemaProvider { // provider: Arc, tables: Vec>, @@ -73,6 +74,7 @@ impl SchemaProvider for SQLSchemaProvider { } } +#[derive(Debug)] pub struct MultiSchemaProvider { children: Vec>, } @@ -107,6 +109,7 @@ impl SchemaProvider for MultiSchemaProvider { } } +#[derive(Debug)] pub struct SQLTableSource { provider: Arc, table_name: String, diff --git a/datafusion-federation/src/table_provider.rs b/datafusion-federation/src/table_provider.rs index ee1ab75..6da1eb1 100644 --- a/datafusion-federation/src/table_provider.rs +++ b/datafusion-federation/src/table_provider.rs @@ -7,7 +7,9 @@ use datafusion::{ common::Constraints, datasource::TableProvider, error::{DataFusionError, Result}, - logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType}, + logical_expr::{ + dml::InsertOp, Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType, + }, physical_plan::ExecutionPlan, }; @@ -15,6 +17,7 @@ use crate::FederationProvider; // FederatedTableSourceWrapper helps to recover the FederatedTableSource // from a TableScan. This wrapper may be avoidable. +#[derive(Debug)] pub struct FederatedTableProviderAdaptor { pub source: Arc, pub table_provider: Option>, @@ -124,10 +127,10 @@ impl TableProvider for FederatedTableProviderAdaptor { &self, _state: &dyn Session, input: Arc, - overwrite: bool, + insert_op: InsertOp, ) -> Result> { if let Some(table_provider) = &self.table_provider { - return table_provider.insert_into(_state, input, overwrite).await; + return table_provider.insert_into(_state, input, insert_op).await; } Err(DataFusionError::NotImplemented( @@ -143,3 +146,13 @@ pub trait FederatedTableSource: TableSource { // Return the FederationProvider associated with this Table fn federation_provider(&self) -> Arc; } + +impl std::fmt::Debug for dyn FederatedTableSource { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "FederatedTableSource: {:?}", + self.federation_provider().name() + ) + } +} diff --git a/datafusion-flight-sql-server/src/service.rs b/datafusion-flight-sql-server/src/service.rs index 734eff4..632da68 100644 --- a/datafusion-flight-sql-server/src/service.rs +++ b/datafusion-flight-sql-server/src/service.rs @@ -75,9 +75,9 @@ impl FlightSqlService { /// Replaces the sql_options with the provided options. /// These options are used to verify all SQL queries. /// When None the default [`SQLOptions`] are used. - pub fn with_sql_options(self, sql_options: Option) -> Self { + pub fn with_sql_options(self, sql_options: SQLOptions) -> Self { Self { - sql_options, + sql_options: Some(sql_options), ..self } }