From 9b13f2e393da0bb339a209c6dba26020b4100bb8 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Tue, 14 May 2024 15:30:03 +0900 Subject: [PATCH 1/2] Add fallback TableProvider to FederatedTableProviderAdaptor --- datafusion-federation/src/table_provider.rs | 72 +++++++++++++++++++-- 1 file changed, 67 insertions(+), 5 deletions(-) diff --git a/datafusion-federation/src/table_provider.rs b/datafusion-federation/src/table_provider.rs index 6a9afaa..c01ec9b 100644 --- a/datafusion-federation/src/table_provider.rs +++ b/datafusion-federation/src/table_provider.rs @@ -17,11 +17,28 @@ use crate::FederationProvider; // from a TableScan. This wrapper may be avoidable. pub struct FederatedTableProviderAdaptor { pub source: Arc, + pub table_provider: Option>, } impl FederatedTableProviderAdaptor { pub fn new(source: Arc) -> Self { - Self { source } + Self { + source, + table_provider: None, + } + } + + /// Creates a new FederatedTableProviderAdaptor that falls back to the + /// provided TableProvider. This is useful if used within a DataFusion + /// context without the federation optimizer. + pub fn new_with_provider( + source: Arc, + table_provider: Arc, + ) -> Self { + Self { + source, + table_provider: Some(table_provider), + } } } @@ -31,18 +48,44 @@ impl TableProvider for FederatedTableProviderAdaptor { self } fn schema(&self) -> SchemaRef { + if let Some(table_provider) = &self.table_provider { + return table_provider.schema(); + } + self.source.schema() } fn constraints(&self) -> Option<&Constraints> { + if let Some(table_provider) = &self.table_provider { + return table_provider + .constraints() + .or_else(|| self.source.constraints()); + } + self.source.constraints() } fn table_type(&self) -> TableType { + if let Some(table_provider) = &self.table_provider { + return table_provider.table_type(); + } + self.source.table_type() } fn get_logical_plan(&self) -> Option<&LogicalPlan> { + if let Some(table_provider) = &self.table_provider { + return table_provider + .get_logical_plan() + .or_else(|| self.source.get_logical_plan()); + } + self.source.get_logical_plan() } fn get_column_default(&self, column: &str) -> Option<&Expr> { + if let Some(table_provider) = &self.table_provider { + return table_provider + .get_column_default(column) + .or_else(|| self.source.get_column_default(column)); + } + self.source.get_column_default(column) } @@ -50,15 +93,34 @@ impl TableProvider for FederatedTableProviderAdaptor { // with a virtual TableProvider that provides federation for a sub-plan. async fn scan( &self, - _state: &SessionState, - _projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, ) -> Result> { + if let Some(table_provider) = &self.table_provider { + return table_provider.scan(state, projection, filters, limit).await; + } + Err(DataFusionError::NotImplemented( "FederatedTableProviderAdaptor cannot scan".to_string(), )) } + + async fn insert_into( + &self, + _state: &SessionState, + input: Arc, + overwrite: bool, + ) -> Result> { + if let Some(table_provider) = &self.table_provider { + return table_provider.insert_into(_state, input, overwrite).await; + } + + Err(DataFusionError::NotImplemented( + "FederatedTableProviderAdaptor cannot insert_into".to_string(), + )) + } } // FederatedTableProvider extends DataFusion's TableProvider trait From f29942867ec0050a6851f3b16986e37f2a87e638 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Sat, 25 May 2024 00:04:18 +0900 Subject: [PATCH 2/2] Fix the fallback to the table provider for supports_filters_pushdown --- datafusion-federation/src/table_provider.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/datafusion-federation/src/table_provider.rs b/datafusion-federation/src/table_provider.rs index c01ec9b..a1acc30 100644 --- a/datafusion-federation/src/table_provider.rs +++ b/datafusion-federation/src/table_provider.rs @@ -7,7 +7,7 @@ use datafusion::{ datasource::TableProvider, error::{DataFusionError, Result}, execution::context::SessionState, - logical_expr::{Expr, LogicalPlan, TableSource, TableType}, + logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType}, physical_plan::ExecutionPlan, }; @@ -88,6 +88,19 @@ impl TableProvider for FederatedTableProviderAdaptor { self.source.get_column_default(column) } + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + if let Some(table_provider) = &self.table_provider { + return table_provider.supports_filters_pushdown(filters); + } + + Ok(vec![ + TableProviderFilterPushDown::Unsupported; + filters.len() + ]) + } // Scan is not supported; the adaptor should be replaced // with a virtual TableProvider that provides federation for a sub-plan.