Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT]: expr simplifier #3393

Merged
merged 15 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def __init__(
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "DeltaLakeScanOperator"

def display_name(self) -> str:
return f"DeltaLakeScanOperator({self._table.metadata().name})"

Expand Down
3 changes: 3 additions & 0 deletions daft/hudi/hudi_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "HudiScanOperator"

def display_name(self) -> str:
return f"HudiScanOperator({self._table.props.name})"

Expand Down
3 changes: 3 additions & 0 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ def __init__(self, iceberg_table: Table, snapshot_id: int | None, storage_config
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "IcebergScanOperator"

def display_name(self) -> str:
return f"IcebergScanOperator({'.'.join(self._table.name())})"

Expand Down
3 changes: 3 additions & 0 deletions daft/io/_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def __init__(
self._generators = generators
self._schema = schema

def name(self) -> str:
return self.display_name()

def display_name(self) -> str:
return "GeneratorScanOperator"

Expand Down
3 changes: 3 additions & 0 deletions daft/io/_lance.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class LanceDBScanOperator(ScanOperator):
def __init__(self, ds: "lance.LanceDataset"):
self._ds = ds

def name(self) -> str:
return "LanceDBScanOperator"

def display_name(self) -> str:
return f"LanceDBScanOperator({self._ds.uri})"

Expand Down
3 changes: 3 additions & 0 deletions daft/sql/sql_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def __init__(
def schema(self) -> Schema:
return self._schema

def name(self) -> str:
return "SQLScanOperator"

def display_name(self) -> str:
return f"SQLScanOperator(sql={self.sql}, conn={self.conn})"

Expand Down
2 changes: 2 additions & 0 deletions src/common/scan-info/src/scan_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use daft_schema::schema::SchemaRef;
use crate::{PartitionField, Pushdowns, ScanTaskLikeRef};

pub trait ScanOperator: Send + Sync + Debug {
fn name(&self) -> &str;

fn schema(&self) -> SchemaRef;
fn partitioning_keys(&self) -> &[PartitionField];
fn file_path_column(&self) -> Option<&str>;
Expand Down
3 changes: 3 additions & 0 deletions src/common/scan-info/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ Pushdowns: {pushdowns}
}

impl ScanOperator for DummyScanOperator {
fn name(&self) -> &'static str {
"dummy"
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
13 changes: 12 additions & 1 deletion src/daft-logical-plan/src/optimization/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use super::{
logical_plan_tracker::LogicalPlanTracker,
rules::{
DropRepartition, EliminateCrossJoin, EnrichWithStats, LiftProjectFromAgg, MaterializeScans,
OptimizerRule, PushDownFilter, PushDownLimit, PushDownProjection, SplitActorPoolProjects,
OptimizerRule, PushDownFilter, PushDownLimit, PushDownProjection, SimplifyExpressionsRule,
SplitActorPoolProjects,
},
};
use crate::LogicalPlan;
Expand Down Expand Up @@ -97,6 +98,11 @@ impl Optimizer {
],
RuleExecutionStrategy::Once,
),
// we want to simplify expressions first to make the rest of the rules easier
RuleBatch::new(
vec![Box::new(SimplifyExpressionsRule::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
),
// --- Bulk of our rules ---
RuleBatch::new(
vec![
Expand Down Expand Up @@ -129,6 +135,11 @@ impl Optimizer {
vec![Box::new(EnrichWithStats::new())],
RuleExecutionStrategy::Once,
),
// try to simplify expressions again as other rules could introduce new exprs
RuleBatch::new(
vec![Box::new(SimplifyExpressionsRule::new())],
RuleExecutionStrategy::FixedPoint(Some(3)),
),
];

Self::with_rule_batches(rule_batches, config)
Expand Down
2 changes: 2 additions & 0 deletions src/daft-logical-plan/src/optimization/rules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod push_down_filter;
mod push_down_limit;
mod push_down_projection;
mod rule;
mod simplify_expressions;
mod split_actor_pool_projects;

pub use drop_repartition::DropRepartition;
Expand All @@ -18,4 +19,5 @@ pub use push_down_filter::PushDownFilter;
pub use push_down_limit::PushDownLimit;
pub use push_down_projection::PushDownProjection;
pub use rule::OptimizerRule;
pub use simplify_expressions::SimplifyExpressionsRule;
pub use split_actor_pool_projects::SplitActorPoolProjects;
Loading
Loading