diff --git a/datafusion-federation/src/analyzer.rs b/datafusion-federation/src/analyzer.rs deleted file mode 100644 index 644b034..0000000 --- a/datafusion-federation/src/analyzer.rs +++ /dev/null @@ -1,210 +0,0 @@ -use std::sync::Arc; - -use datafusion::{ - common::{tree_node::TreeNode, Column}, - config::ConfigOptions, - datasource::source_as_provider, - error::Result, - logical_expr::{Expr, LogicalPlan, Projection, TableScan, TableSource}, - optimizer::analyzer::AnalyzerRule, - sql::TableReference, -}; - -use crate::{ - optimize::Optimizer, FederatedTableProviderAdaptor, FederatedTableSource, FederationProviderRef, -}; - -#[derive(Default, Debug)] -pub struct FederationAnalyzerRule { - optimizer: Optimizer, -} - -impl AnalyzerRule for FederationAnalyzerRule { - // Walk over the plan, look for the largest subtrees that only have - // TableScans from the same FederationProvider. - // There 'largest sub-trees' are passed to their respective FederationProvider.optimizer. - fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result { - if !contains_federated_table(&plan)? { - return Ok(plan); - } - - let plan = self.optimizer.optimize_plan(plan)?; - - let (optimized, _) = self.optimize_recursively(&plan, None, config)?; - if let Some(result) = optimized { - return Ok(result); - } - Ok(plan.clone()) - } - - /// A human readable name for this optimizer rule - fn name(&self) -> &str { - "federation_optimizer_rule" - } -} - -fn contains_federated_table(plan: &LogicalPlan) -> Result { - let federated_table_exists = plan.exists(|x| { - if let Some(provider) = get_federation_provider(x)? { - // federated table provider should have an analyzer - return Ok(provider.analyzer().is_some()); - } - Ok(false) - })?; - - Ok(federated_table_exists) -} - -impl FederationAnalyzerRule { - pub fn new() -> Self { - Self::default() - } - - // optimize_recursively recursively finds the largest sub-plans that can be federated - // to a single FederationProvider. - // Returns a plan if a sub-tree was federated, otherwise None. - // Returns a FederationProvider if it covers the entire sub-tree, otherwise None. - fn optimize_recursively( - &self, - plan: &LogicalPlan, - parent: Option<&LogicalPlan>, - _config: &ConfigOptions, - ) -> Result<(Option, Option)> { - // Check if this node determines the FederationProvider - let sole_provider = get_federation_provider(plan)?; - if sole_provider.is_some() { - return Ok((None, sole_provider)); - } - - // optimize_inputs - let inputs = plan.inputs(); - if inputs.is_empty() { - return Ok((None, None)); - } - - let (new_inputs, providers): (Vec<_>, Vec<_>) = inputs - .iter() - .map(|i| self.optimize_recursively(i, Some(plan), _config)) - .collect::>>()? - .into_iter() - .unzip(); - - // Note: assumes provider is None if ambiguous - let first_provider = providers.first().unwrap(); - let is_singular = providers.iter().all(|p| p.is_some() && p == first_provider); - - if is_singular { - if parent.is_none() { - // federate the entire plan - if let Some(provider) = first_provider { - if let Some(optimizer) = provider.analyzer() { - let optimized = - optimizer.execute_and_check(plan.clone(), _config, |_, _| {})?; - return Ok((Some(optimized), None)); - } - return Ok((None, None)); - } - return Ok((None, None)); - } - // The largest sub-plan is higher up. - return Ok((None, first_provider.clone())); - } - - // The plan is ambiguous, any inputs that are not federated and - // have a sole provider, should be federated. - let new_inputs = new_inputs - .into_iter() - .enumerate() - .map(|(i, new_sub_plan)| { - if let Some(sub_plan) = new_sub_plan { - // Already federated - return Ok(sub_plan); - } - let sub_plan = inputs.get(i).unwrap(); - // Check if the input has a sole provider and can be federated. - if let Some(provider) = providers.get(i).unwrap() { - if let Some(optimizer) = provider.analyzer() { - let wrapped = wrap_projection((*sub_plan).clone())?; - - let optimized = optimizer.execute_and_check(wrapped, _config, |_, _| {})?; - return Ok(optimized); - } - // No federation for this sub-plan (no analyzer) - return Ok((*sub_plan).clone()); - } - // No federation for this sub-plan (no provider) - Ok((*sub_plan).clone()) - }) - .collect::>>()?; - - let new_plan = match plan { - // Unnest returns columns to unnest as `expressions` but does not support passing them back to `with_new_exprs`. - // Instead, it uses data from its internal representation to create a new plan. - LogicalPlan::Unnest(_) => plan.with_new_exprs(vec![], new_inputs)?, - _ => plan.with_new_exprs(plan.expressions(), new_inputs)?, - }; - - Ok((Some(new_plan), None)) - } -} - -fn get_federation_provider(plan: &LogicalPlan) -> Result> { - match plan { - LogicalPlan::TableScan(TableScan { ref source, .. }) => { - let Some(federated_source) = get_table_source(source)? else { - return Ok(None); - }; - let provider = federated_source.federation_provider(); - Ok(Some(provider)) - } - _ => Ok(None), - } -} - -fn wrap_projection(plan: LogicalPlan) -> Result { - // TODO: minimize requested columns - match plan { - LogicalPlan::Projection(_) => Ok(plan), - _ => { - let expr = plan - .schema() - .fields() - .iter() - .enumerate() - .map(|(i, f)| { - Expr::Column(Column::from_qualified_name(format!( - "{}.{}", - plan.schema() - .qualified_field(i) - .0 - .map(TableReference::table) - .unwrap_or_default(), - f.name() - ))) - }) - .collect::>(); - Ok(LogicalPlan::Projection(Projection::try_new( - expr, - Arc::new(plan), - )?)) - } - } -} - -pub fn get_table_source( - source: &Arc, -) -> Result>> { - // Unwrap TableSource - let source = source_as_provider(source)?; - - // Get FederatedTableProviderAdaptor - let Some(wrapper) = source - .as_any() - .downcast_ref::() - else { - return Ok(None); - }; - - // Return original FederatedTableSource - Ok(Some(Arc::clone(&wrapper.source))) -} diff --git a/datafusion-federation/src/analyzer/mod.rs b/datafusion-federation/src/analyzer/mod.rs new file mode 100644 index 0000000..74d954a --- /dev/null +++ b/datafusion-federation/src/analyzer/mod.rs @@ -0,0 +1,514 @@ +mod scan_result; + +use crate::FederationProvider; +use crate::{ + optimize::Optimizer, FederatedTableProviderAdaptor, FederatedTableSource, FederationProviderRef, +}; +use datafusion::error::DataFusionError; +use datafusion::logical_expr::{col, expr::InSubquery, LogicalPlanBuilder}; +use datafusion::{ + common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}, + config::ConfigOptions, + datasource::source_as_provider, + error::Result, + logical_expr::{Expr, Extension, LogicalPlan, Projection, TableScan, TableSource}, + optimizer::analyzer::AnalyzerRule, + sql::TableReference, +}; +use scan_result::ScanResult; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::RwLock; + +#[derive(Debug)] +pub struct FederationAnalyzerRule { + optimizer: Optimizer, + provider_map: Arc>>, +} + +impl Default for FederationAnalyzerRule { + fn default() -> Self { + Self { + optimizer: Optimizer::default(), + provider_map: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +impl AnalyzerRule for FederationAnalyzerRule { + // Walk over the plan, look for the largest subtrees that only have + // TableScans from the same FederationProvider. + // There 'largest sub-trees' are passed to their respective FederationProvider.optimizer. + fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result { + if !contains_federated_table(&plan)? { + return Ok(plan); + } + // Run selected optimizer rules before federation + let plan = self.optimizer.optimize_plan(plan)?; + + // Find all federation providers for TableReference appeared in the plan + let providers = get_plan_provider_recursively(&plan)?; + let mut write_map = self.provider_map.write().map_err(|_| { + DataFusionError::External( + "Failed to create federated plan: failed to find all federated providers.".into(), + ) + })?; + write_map.extend(providers); + drop(write_map); + + match self.optimize_plan_recursively(&plan, true, config)? { + (Some(optimized_plan), _) => Ok(optimized_plan), + (None, _) => Ok(plan), + } + } + + /// A human readable name for this optimizer rule + fn name(&self) -> &str { + "federation_optimizer_rule" + } +} + +impl FederationAnalyzerRule { + pub fn new() -> Self { + Self::default() + } + + /// Scans a plan to see if it belongs to a single [`FederationProvider`]. + fn scan_plan_recursively(&self, plan: &LogicalPlan) -> Result { + let mut sole_provider: ScanResult = ScanResult::None; + + plan.apply(&mut |p: &LogicalPlan| -> Result { + let exprs_provider = self.scan_plan_exprs(p)?; + sole_provider.merge(exprs_provider); + + if sole_provider.is_ambiguous() { + return Ok(TreeNodeRecursion::Stop); + } + + let (sub_provider, _) = get_leaf_provider(p)?; + sole_provider.add(sub_provider); + + Ok(sole_provider.check_recursion()) + })?; + + Ok(sole_provider) + } + + /// Scans a plan's expressions to see if it belongs to a single [`FederationProvider`]. + fn scan_plan_exprs(&self, plan: &LogicalPlan) -> Result { + let mut sole_provider: ScanResult = ScanResult::None; + + let exprs = plan.expressions(); + for expr in &exprs { + let expr_result = self.scan_expr_recursively(expr)?; + sole_provider.merge(expr_result); + + if sole_provider.is_ambiguous() { + return Ok(sole_provider); + } + } + + Ok(sole_provider) + } + + /// scans an expression to see if it belongs to a single [`FederationProvider`] + fn scan_expr_recursively(&self, expr: &Expr) -> Result { + let mut sole_provider: ScanResult = ScanResult::None; + + expr.apply(&mut |e: &Expr| -> Result { + // TODO: Support other types of sub-queries + match e { + Expr::ScalarSubquery(ref subquery) => { + let plan_result = self.scan_plan_recursively(&subquery.subquery)?; + + sole_provider.merge(plan_result); + Ok(sole_provider.check_recursion()) + } + Expr::InSubquery(ref insubquery) => { + let plan_result = self.scan_plan_recursively(&insubquery.subquery.subquery)?; + + sole_provider.merge(plan_result); + Ok(sole_provider.check_recursion()) + } + Expr::OuterReferenceColumn(_, ref col) => { + if let Some(table) = &col.relation { + let map = self.provider_map.read().map_err(|_| { + DataFusionError::External( + "Failed to create federated plan: failed to obtain a read lock on federated providers.".into(), + ) + })?; + if let Some(plan_result) = map.get(table) { + sole_provider.merge(plan_result.clone()); + return Ok(sole_provider.check_recursion()); + } + } + // Subqueries that reference outer columns are not supported + // for now. We handle this here as ambiguity to force + // federation lower in the plan tree. + sole_provider = ScanResult::Ambiguous; + Ok(TreeNodeRecursion::Stop) + } + _ => Ok(TreeNodeRecursion::Continue), + } + })?; + + Ok(sole_provider) + } + + /// Recursively finds the largest sub-plans that can be federated + /// to a single FederationProvider. + /// + /// Returns a plan if a sub-tree was federated, otherwise None. + /// + /// Returns a ScanResult of all FederationProviders in the subtree. + fn optimize_plan_recursively( + &self, + plan: &LogicalPlan, + is_root: bool, + _config: &ConfigOptions, + ) -> Result<(Option, ScanResult)> { + let mut sole_provider: ScanResult = ScanResult::None; + + if let LogicalPlan::Extension(Extension { ref node }) = plan { + if node.name() == "Federated" { + // Avoid attempting double federation + return Ok((None, ScanResult::Ambiguous)); + } + } + + // Check if this plan node is a leaf that determines the FederationProvider + let (leaf_provider, _) = get_leaf_provider(plan)?; + + // Check if the expressions contain, a potentially different, FederationProvider + let exprs_result = self.scan_plan_exprs(plan)?; + + // Return early if this is a leaf and there is no ambiguity with the expressions. + if leaf_provider.is_some() && (exprs_result.is_none() || exprs_result == leaf_provider) { + return Ok((None, leaf_provider.into())); + } + // Aggregate leaf & expression providers + sole_provider.add(leaf_provider); + sole_provider.merge(exprs_result.clone()); + + let inputs = plan.inputs(); + // Return early if there are no sources. + if inputs.is_empty() && sole_provider.is_none() { + return Ok((None, ScanResult::None)); + } + + // Recursively optimize inputs + let input_results = inputs + .iter() + .map(|i| self.optimize_plan_recursively(i, false, _config)) + .collect::>>()?; + + // Aggregate the input providers + input_results.iter().for_each(|(_, scan_result)| { + sole_provider.merge(scan_result.clone()); + }); + + if sole_provider.is_none() { + // No providers found + // TODO: Is/should this be reachable? + return Ok((None, ScanResult::None)); + } + + // Federate Exprs when Exprs provider is ambiguous or Exprs provider differs from the sole_provider of current plan + // When Exprs provider is the same as sole_provider and non-ambiguous, the larger sub-plan is higher up + let optimize_expressions = exprs_result.is_some() + && (!(sole_provider == exprs_result) || exprs_result.is_ambiguous()); + + // If all sources are federated to the same provider + if let ScanResult::Distinct(provider) = sole_provider { + if !is_root { + // The largest sub-plan is higher up. + return Ok((None, ScanResult::Distinct(provider))); + } + + let Some(optimizer) = provider.analyzer() else { + // No optimizer provided + return Ok((None, ScanResult::None)); + }; + + // If this is the root plan node; federate the entire plan + let optimized = optimizer.execute_and_check(plan.clone(), _config, |_, _| {})?; + return Ok((Some(optimized), ScanResult::None)); + } + + // The plan is ambiguous; any input that is not yet optimized and has a + // sole provider represents a largest sub-plan and should be federated. + // + // We loop over the input optimization results, federate where needed and + // return a complete list of new inputs for the optimized plan. + let new_inputs = input_results + .into_iter() + .enumerate() + .map(|(i, (input_plan, input_result))| { + if let Some(federated_plan) = input_plan { + // Already federated deeper in the plan tree + return Ok(federated_plan); + } + + let original_input = (*inputs.get(i).unwrap()).clone(); + if input_result.is_ambiguous() { + // Can happen if the input is already federated, so use + // the original input. + return Ok(original_input); + } + + let provider = input_result.unwrap()?; + let Some(provider) = provider else { + // No provider for this input; use the original input. + return Ok(original_input); + }; + + let Some(optimizer) = provider.analyzer() else { + // No optimizer for this input; use the original input. + return Ok(original_input); + }; + + // Replace the input with the federated counterpart + let wrapped = wrap_projection(original_input)?; + let optimized = optimizer.execute_and_check(wrapped, _config, |_, _| {})?; + + Ok(optimized) + }) + .collect::>>()?; + + // Optimize expressions if needed + let new_expressions = if optimize_expressions { + self.optimize_plan_exprs(plan, _config)? + } else { + plan.expressions() + }; + + // Construct the optimized plan + let new_plan = plan.with_new_exprs(new_expressions, new_inputs)?; + + // Return the federated plan + Ok((Some(new_plan), ScanResult::Ambiguous)) + } + + /// Optimizes all exprs of a plan + fn optimize_plan_exprs( + &self, + plan: &LogicalPlan, + _config: &ConfigOptions, + ) -> Result> { + plan.expressions() + .iter() + .map(|expr| { + let transformed = expr + .clone() + .transform(&|e| self.optimize_expr_recursively(e, _config))?; + Ok(transformed.data) + }) + .collect::>>() + } + + /// recursively optimize expressions + /// Current logic: individually federate every sub-query. + fn optimize_expr_recursively( + &self, + expr: Expr, + _config: &ConfigOptions, + ) -> Result> { + match expr { + Expr::ScalarSubquery(ref subquery) => { + // Optimize as root to force federating the sub-query + let (new_subquery, _) = + self.optimize_plan_recursively(&subquery.subquery, true, _config)?; + let Some(new_subquery) = new_subquery else { + return Ok(Transformed::no(expr)); + }; + + // ScalarSubqueryToJoin optimizer rule doesn't support federated node (LogicalPlan::Extension(_)) as subquery + // Wrap a `non-op` Projection LogicalPlan outside the federated node to facilitate ScalarSubqueryToJoin optimization + if matches!(new_subquery, LogicalPlan::Extension(_)) { + let all_columns = new_subquery + .schema() + .fields() + .iter() + .map(|field| col(field.name())) + .collect::>(); + + let projection_plan = LogicalPlanBuilder::from(new_subquery) + .project(all_columns)? + .build()?; + + return Ok(Transformed::yes(Expr::ScalarSubquery( + subquery.with_plan(projection_plan.into()), + ))); + } + + Ok(Transformed::yes(Expr::ScalarSubquery( + subquery.with_plan(new_subquery.into()), + ))) + } + Expr::InSubquery(ref in_subquery) => { + let (new_subquery, _) = + self.optimize_plan_recursively(&in_subquery.subquery.subquery, true, _config)?; + let Some(new_subquery) = new_subquery else { + return Ok(Transformed::no(expr)); + }; + + // DecorrelatePredicateSubquery optimizer rule doesn't support federated node (LogicalPlan::Extension(_)) as subquery + // Wrap a `non-op` Projection LogicalPlan outside the federated node to facilitate DecorrelatePredicateSubquery optimization + if matches!(new_subquery, LogicalPlan::Extension(_)) { + let all_columns = new_subquery + .schema() + .fields() + .iter() + .map(|field| col(field.name())) + .collect::>(); + + let projection_plan = LogicalPlanBuilder::from(new_subquery) + .project(all_columns)? + .build()?; + + return Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( + in_subquery.expr.clone(), + in_subquery.subquery.with_plan(projection_plan.into()), + in_subquery.negated, + )))); + } + + Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( + in_subquery.expr.clone(), + in_subquery.subquery.with_plan(new_subquery.into()), + in_subquery.negated, + )))) + } + _ => Ok(Transformed::no(expr)), + } + } +} + +/// NopFederationProvider is used to represent tables that are not federated, but +/// are resolved by DataFusion. This simplifies the logic of the optimizer rule. +struct NopFederationProvider {} + +impl FederationProvider for NopFederationProvider { + fn name(&self) -> &str { + "nop" + } + + fn compute_context(&self) -> Option { + None + } + + fn analyzer(&self) -> Option> { + None + } +} + +/// Recursively find the [`FederationProvider`] for all [`TableReference`] instances in the plan. +/// This information is used to resolve the federation provider for [`Expr::OuterReferenceColumn`]. +fn get_plan_provider_recursively( + plan: &LogicalPlan, +) -> Result> { + let mut providers: HashMap = HashMap::new(); + + plan.apply(&mut |p: &LogicalPlan| -> Result { + // LogicalPlan::SubqueryAlias can also be referred by OuterReferenceColumn + // Get the federation provider for TableReference representing LogicalPlan::SubqueryAlias + if let LogicalPlan::SubqueryAlias(a) = p { + let subquery_alias_providers = get_plan_provider_recursively(&Arc::clone(&a.input))?; + let mut provider: ScanResult = ScanResult::None; + for (_, i) in subquery_alias_providers { + provider.merge(i); + } + providers.insert(a.alias.clone(), provider); + } + + let (federation_provider, table_reference) = get_leaf_provider(p)?; + if let Some(table_reference) = table_reference { + providers.insert(table_reference, federation_provider.into()); + } + + let _ = p.apply_subqueries(|sub_query| { + let subquery_providers = get_plan_provider_recursively(sub_query)?; + providers.extend(subquery_providers); + Ok(TreeNodeRecursion::Continue) + }); + + Ok(TreeNodeRecursion::Continue) + })?; + + Ok(providers) +} + +fn wrap_projection(plan: LogicalPlan) -> Result { + // TODO: minimize requested columns + match plan { + LogicalPlan::Projection(_) => Ok(plan), + _ => { + let expr = plan + .schema() + .columns() + .iter() + .map(|c| Expr::Column(c.clone())) + .collect::>(); + Ok(LogicalPlan::Projection(Projection::try_new( + expr, + Arc::new(plan), + )?)) + } + } +} + +fn contains_federated_table(plan: &LogicalPlan) -> Result { + let federated_table_exists = plan.exists(|x| { + if let (Some(provider), _) = get_leaf_provider(x)? { + // federated table provider should have an analyzer + return Ok(provider.analyzer().is_some()); + } + Ok(false) + })?; + + Ok(federated_table_exists) +} + +fn get_leaf_provider( + plan: &LogicalPlan, +) -> Result<(Option, Option)> { + match plan { + LogicalPlan::TableScan(TableScan { + ref table_name, + ref source, + .. + }) => { + let table_reference = table_name.clone(); + let Some(federated_source) = get_table_source(source)? else { + // Table is not federated but provided by a standard table provider. + // We use a placeholder federation provider to simplify the logic. + return Ok(( + Some(Arc::new(NopFederationProvider {})), + Some(table_reference), + )); + }; + let provider = federated_source.federation_provider(); + Ok((Some(provider), Some(table_reference))) + } + _ => Ok((None, None)), + } +} + +#[allow(clippy::missing_errors_doc)] +pub fn get_table_source( + source: &Arc, +) -> Result>> { + // Unwrap TableSource + let source = source_as_provider(source)?; + + // Get FederatedTableProviderAdaptor + let Some(wrapper) = source + .as_any() + .downcast_ref::() + else { + return Ok(None); + }; + + // Return original FederatedTableSource + Ok(Some(Arc::clone(&wrapper.source))) +} diff --git a/datafusion-federation/src/analyzer/scan_result.rs b/datafusion-federation/src/analyzer/scan_result.rs new file mode 100644 index 0000000..dc01906 --- /dev/null +++ b/datafusion-federation/src/analyzer/scan_result.rs @@ -0,0 +1,113 @@ +use crate::FederationProviderRef; +use datafusion::common::tree_node::TreeNodeRecursion; +use datafusion::error::{DataFusionError, Result}; + +/// Used to track if all sources, including tableScan, plan inputs and +/// expressions, represents an un-ambiguous, none or a sole' [`crate::FederationProvider`]. +pub enum ScanResult { + None, + Distinct(FederationProviderRef), + Ambiguous, +} + +impl ScanResult { + pub fn merge(&mut self, other: Self) { + match (&self, &other) { + (_, ScanResult::None) => {} + (ScanResult::None, _) => *self = other, + (ScanResult::Ambiguous, _) | (_, ScanResult::Ambiguous) => { + *self = ScanResult::Ambiguous; + } + (ScanResult::Distinct(provider), ScanResult::Distinct(other_provider)) => { + if provider != other_provider { + *self = ScanResult::Ambiguous; + } + } + } + } + + pub fn add(&mut self, provider: Option) { + self.merge(ScanResult::from(provider)) + } + + pub fn is_ambiguous(&self) -> bool { + matches!(self, ScanResult::Ambiguous) + } + + pub fn is_none(&self) -> bool { + matches!(self, ScanResult::None) + } + pub fn is_some(&self) -> bool { + !self.is_none() + } + + pub fn unwrap(self) -> Result> { + match self { + ScanResult::None => Ok(None), + ScanResult::Distinct(provider) => Ok(Some(provider)), + ScanResult::Ambiguous => Err(DataFusionError::External( + "called `ScanResult::unwrap()` on a `Ambiguous` value".into(), + )), + } + } + + pub fn check_recursion(&self) -> TreeNodeRecursion { + if self.is_ambiguous() { + TreeNodeRecursion::Stop + } else { + TreeNodeRecursion::Continue + } + } +} + +impl From> for ScanResult { + fn from(provider: Option) -> Self { + match provider { + Some(provider) => ScanResult::Distinct(provider), + None => ScanResult::None, + } + } +} + +impl PartialEq> for ScanResult { + fn eq(&self, other: &Option) -> bool { + match (self, other) { + (ScanResult::None, None) => true, + (ScanResult::Distinct(provider), Some(other_provider)) => provider == other_provider, + _ => false, + } + } +} + +impl PartialEq for ScanResult { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (ScanResult::None, ScanResult::None) => true, + (ScanResult::Distinct(provider1), ScanResult::Distinct(provider2)) => { + provider1 == provider2 + } + (ScanResult::Ambiguous, ScanResult::Ambiguous) => true, + _ => false, + } + } +} + +impl std::fmt::Debug for ScanResult { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::None => write!(f, "ScanResult::None"), + Self::Distinct(provider) => write!(f, "ScanResult::Distinct({})", provider.name()), + Self::Ambiguous => write!(f, "ScanResult::Ambiguous"), + } + } +} + +impl Clone for ScanResult { + fn clone(&self) -> Self { + match self { + ScanResult::None => ScanResult::None, + ScanResult::Distinct(provider) => ScanResult::Distinct(provider.clone()), + ScanResult::Ambiguous => ScanResult::Ambiguous, + } + } +} diff --git a/datafusion-federation/src/optimize.rs b/datafusion-federation/src/optimize.rs index b669210..5f543ae 100644 --- a/datafusion-federation/src/optimize.rs +++ b/datafusion-federation/src/optimize.rs @@ -3,7 +3,8 @@ use datafusion::{ error::Result, logical_expr::LogicalPlan, optimizer::{ - optimizer::ApplyOrder, push_down_filter::PushDownFilter, OptimizerConfig, OptimizerContext, OptimizerRule + optimizer::ApplyOrder, push_down_filter::PushDownFilter, OptimizerConfig, OptimizerContext, + OptimizerRule, }, }; use optimize_projections::OptimizeProjections; diff --git a/datafusion-federation/src/schema_cast.rs b/datafusion-federation/src/schema_cast.rs index 502d4b8..5b1ba5a 100644 --- a/datafusion-federation/src/schema_cast.rs +++ b/datafusion-federation/src/schema_cast.rs @@ -4,8 +4,7 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, }; use futures::StreamExt; use std::any::Any; @@ -15,8 +14,8 @@ use std::sync::Arc; mod intervals_cast; mod lists_cast; -mod struct_cast; pub mod record_convert; +mod struct_cast; #[derive(Debug)] #[allow(clippy::module_name_repetitions)] @@ -70,7 +69,7 @@ impl ExecutionPlan for SchemaCastScanExec { vec![&self.input] } - /// Prevents the introduction of additional `RepartitionExec` and processing input in parallel. + /// Prevents the introduction of additional `RepartitionExec` and processing input in parallel. /// This guarantees that the input is processed as a single stream, preserving the order of the data. fn benefits_from_input_partitioning(&self) -> Vec { vec![false]