Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial subquery support #37

Merged
merged 10 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 0 additions & 170 deletions datafusion-federation/src/analyzer.rs

This file was deleted.

44 changes: 39 additions & 5 deletions datafusion-federation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,50 @@ use std::{
sync::Arc,
};

use datafusion::optimizer::analyzer::Analyzer;
use datafusion::{
execution::context::{SessionContext, SessionState},
optimizer::{optimizer::Optimizer, OptimizerRule},
};

mod analyzer;
pub use analyzer::*;
mod optimizer;
pub use optimizer::*;
mod table_provider;
pub use table_provider::*;

mod plan_node;
pub use plan_node::*;

pub fn default_session_state() -> SessionState {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let df_state = SessionContext::new().state();

let rules = default_optimizer_rules();
df_state
.with_optimizer_rules(rules)
.with_query_planner(Arc::new(FederatedQueryPlanner::new()))
}

pub fn default_optimizer_rules() -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
// Get the default optimizer
let df_default = Optimizer::new();
let mut default_rules = df_default.rules;

// Insert the FederationOptimizerRule after the ScalarSubqueryToJoin.
// This ensures ScalarSubquery are replaced before we try to federate.
let Some(pos) = default_rules
.iter()
.position(|x| x.name() == "scalar_subquery_to_join")
else {
panic!("Could not locate ScalarSubqueryToJoin");
};

// TODO: check if we should allow other optimizers to run before the federation rule.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScalarSubqueryToJoin is fairly early on in the optimizer list. I wonder if we could just add this to the end? PushDownLimit and PushDownFilter seem beneficial for the federation case.
Screenshot 2024-05-15 at 8 45 36 PM

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest that we take the time to think through the implications of having the optimizers run first and land it in separate PRs. Aside from that there are also federation-specific optimizations we can do (hinted at in #23). We may want to make different choices in those than in the local execution case.


let federation_rule = Arc::new(FederationOptimizerRule::new());
default_rules.insert(pos + 1, federation_rule);

default_rules
}

pub type FederationProviderRef = Arc<dyn FederationProvider>;
pub trait FederationProvider: Send + Sync {
// Returns the name of the provider, used for comparison.
Expand All @@ -23,9 +57,9 @@ pub trait FederationProvider: Send + Sync {
// will execute a query. For example: database instance & catalog.
fn compute_context(&self) -> Option<String>;

// Returns an analyzer that can cut out part of the plan
// Returns an optimizer that can cut out part of the plan
// to federate it.
fn analyzer(&self) -> Option<Arc<Analyzer>>;
fn optimizer(&self) -> Option<Arc<Optimizer>>;
}

impl fmt::Display for dyn FederationProvider {
Expand Down
Loading
Loading