-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: initial subquery support #37
Changes from all commits
b45d0b4
4d231af
9a0d36a
53888a2
6d3b75e
5d6899d
9308c0b
78a9c42
79934df
46ccc69
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,16 +4,50 @@ use std::{ | |
sync::Arc, | ||
}; | ||
|
||
use datafusion::optimizer::analyzer::Analyzer; | ||
use datafusion::{ | ||
execution::context::{SessionContext, SessionState}, | ||
optimizer::{optimizer::Optimizer, OptimizerRule}, | ||
}; | ||
|
||
mod analyzer; | ||
pub use analyzer::*; | ||
mod optimizer; | ||
pub use optimizer::*; | ||
mod table_provider; | ||
pub use table_provider::*; | ||
|
||
mod plan_node; | ||
pub use plan_node::*; | ||
|
||
pub fn default_session_state() -> SessionState { | ||
let df_state = SessionContext::new().state(); | ||
|
||
let rules = default_optimizer_rules(); | ||
df_state | ||
.with_optimizer_rules(rules) | ||
.with_query_planner(Arc::new(FederatedQueryPlanner::new())) | ||
} | ||
|
||
pub fn default_optimizer_rules() -> Vec<Arc<dyn OptimizerRule + Send + Sync>> { | ||
// Get the default optimizer | ||
let df_default = Optimizer::new(); | ||
let mut default_rules = df_default.rules; | ||
|
||
// Insert the FederationOptimizerRule after the ScalarSubqueryToJoin. | ||
// This ensures ScalarSubquery are replaced before we try to federate. | ||
let Some(pos) = default_rules | ||
.iter() | ||
.position(|x| x.name() == "scalar_subquery_to_join") | ||
else { | ||
panic!("Could not locate ScalarSubqueryToJoin"); | ||
}; | ||
|
||
// TODO: check if we should allow other optimizers to run before the federation rule. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest that we take the time to think through the implications of having the optimizers run first and land it in separate PRs. Aside from that there are also federation-specific optimizations we can do (hinted at in #23). We may want to make different choices in those than in the local execution case. |
||
|
||
let federation_rule = Arc::new(FederationOptimizerRule::new()); | ||
default_rules.insert(pos + 1, federation_rule); | ||
|
||
default_rules | ||
} | ||
|
||
pub type FederationProviderRef = Arc<dyn FederationProvider>; | ||
pub trait FederationProvider: Send + Sync { | ||
// Returns the name of the provider, used for comparison. | ||
|
@@ -23,9 +57,9 @@ pub trait FederationProvider: Send + Sync { | |
// will execute a query. For example: database instance & catalog. | ||
fn compute_context(&self) -> Option<String>; | ||
|
||
// Returns an analyzer that can cut out part of the plan | ||
// Returns an optimizer that can cut out part of the plan | ||
// to federate it. | ||
fn analyzer(&self) -> Option<Arc<Analyzer>>; | ||
fn optimizer(&self) -> Option<Arc<Optimizer>>; | ||
} | ||
|
||
impl fmt::Display for dyn FederationProvider { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍