Skip to content

Commit

Permalink
feat: initial subquery support (#37)
Browse files Browse the repository at this point in the history
Co-authored-by: Suriya Kandaswamy <[email protected]>
Co-authored-by: Suriya Kandaswamy <[email protected]>
  • Loading branch information
3 people authored May 22, 2024
1 parent 50fb769 commit 0567c02
Show file tree
Hide file tree
Showing 9 changed files with 584 additions and 222 deletions.
170 changes: 0 additions & 170 deletions datafusion-federation/src/analyzer.rs

This file was deleted.

44 changes: 39 additions & 5 deletions datafusion-federation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,50 @@ use std::{
sync::Arc,
};

use datafusion::optimizer::analyzer::Analyzer;
use datafusion::{
execution::context::{SessionContext, SessionState},
optimizer::{optimizer::Optimizer, OptimizerRule},
};

mod analyzer;
pub use analyzer::*;
mod optimizer;
pub use optimizer::*;
mod table_provider;
pub use table_provider::*;

mod plan_node;
pub use plan_node::*;

pub fn default_session_state() -> SessionState {
let df_state = SessionContext::new().state();

let rules = default_optimizer_rules();
df_state
.with_optimizer_rules(rules)
.with_query_planner(Arc::new(FederatedQueryPlanner::new()))
}

pub fn default_optimizer_rules() -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
// Get the default optimizer
let df_default = Optimizer::new();
let mut default_rules = df_default.rules;

// Insert the FederationOptimizerRule after the ScalarSubqueryToJoin.
// This ensures ScalarSubquery are replaced before we try to federate.
let Some(pos) = default_rules
.iter()
.position(|x| x.name() == "scalar_subquery_to_join")
else {
panic!("Could not locate ScalarSubqueryToJoin");
};

// TODO: check if we should allow other optimizers to run before the federation rule.

let federation_rule = Arc::new(FederationOptimizerRule::new());
default_rules.insert(pos + 1, federation_rule);

default_rules
}

pub type FederationProviderRef = Arc<dyn FederationProvider>;
pub trait FederationProvider: Send + Sync {
// Returns the name of the provider, used for comparison.
Expand All @@ -23,9 +57,9 @@ pub trait FederationProvider: Send + Sync {
// will execute a query. For example: database instance & catalog.
fn compute_context(&self) -> Option<String>;

// Returns an analyzer that can cut out part of the plan
// Returns an optimizer that can cut out part of the plan
// to federate it.
fn analyzer(&self) -> Option<Arc<Analyzer>>;
fn optimizer(&self) -> Option<Arc<Optimizer>>;
}

impl fmt::Display for dyn FederationProvider {
Expand Down
Loading

0 comments on commit 0567c02

Please sign in to comment.