Skip to content

Commit

Permalink
Move scan task splitting and merging to physical translation time
Browse files Browse the repository at this point in the history
  • Loading branch information
desmondcheongzx committed Nov 27, 2024
1 parent f34e59c commit 72d6f34
Show file tree
Hide file tree
Showing 24 changed files with 349 additions and 376 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1792,7 +1792,7 @@ class LogicalPlanBuilder:
kwargs: dict[str, Any] | None = None,
) -> LogicalPlanBuilder: ...
def schema(self) -> PySchema: ...
def optimize(self, execution_config: PyDaftExecutionConfig | None) -> LogicalPlanBuilder: ...
def optimize(self) -> LogicalPlanBuilder: ...
def to_physical_plan_scheduler(self, cfg: PyDaftExecutionConfig) -> PhysicalPlanScheduler: ...
def to_adaptive_physical_plan_scheduler(self, cfg: PyDaftExecutionConfig) -> AdaptivePhysicalPlanScheduler: ...
def repr_ascii(self, simple: bool) -> str: ...
Expand Down
10 changes: 3 additions & 7 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,11 @@ def explain(
print_to_file("== Unoptimized Logical Plan ==\n")
print_to_file(builder.pretty_print(simple, format=format))
if show_all:
execution_config = get_context().daft_execution_config
print_to_file("\n== Optimized Logical Plan ==\n")
builder = builder.optimize(execution_config)
builder = builder.optimize()
print_to_file(builder.pretty_print(simple))
print_to_file("\n== Physical Plan ==\n")
physical_plan_scheduler = builder.to_physical_plan_scheduler(execution_config)
physical_plan_scheduler = builder.to_physical_plan_scheduler(get_context().daft_execution_config)
print_to_file(physical_plan_scheduler.pretty_print(simple, format=format))
else:
print_to_file(
Expand All @@ -186,12 +185,9 @@ def explain(
return None

def num_partitions(self) -> int:
daft_execution_config = get_context().daft_execution_config
# We need to run the optimizer since that could change the number of partitions
return (
self.__builder.optimize(daft_execution_config)
.to_physical_plan_scheduler(daft_execution_config)
.num_partitions()
self.__builder.optimize().to_physical_plan_scheduler(get_context().daft_execution_config).num_partitions()
)

@DataframePublicAPI
Expand Down
4 changes: 2 additions & 2 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ def pretty_print(self, simple: bool = False, format: str = "ascii") -> str:
def __repr__(self) -> str:
return self._builder.repr_ascii(simple=False)

def optimize(self, execution_config: PyDaftExecutionConfig | None) -> LogicalPlanBuilder:
def optimize(self) -> LogicalPlanBuilder:
"""
Optimize the underlying logical plan.
"""
builder = self._builder.optimize(execution_config)
builder = self._builder.optimize()
return LogicalPlanBuilder(builder)

@classmethod
Expand Down
4 changes: 1 addition & 3 deletions daft/runners/native_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ def run_iter(
daft_execution_config = get_context().daft_execution_config

# Optimize the logical plan.
# TODO(desmond): Currently we don't provide the execution config because this triggers
# scan task merging, but the native executor expects one source per scan task.
builder = builder.optimize(None)
builder = builder.optimize()
executor = NativeExecutor.from_logical_plan_builder(builder)
results_gen = executor.run(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()},
Expand Down
2 changes: 1 addition & 1 deletion daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def run_iter(
execution_id = str(uuid.uuid4())

# Optimize the logical plan.
builder = builder.optimize(daft_execution_config)
builder = builder.optimize()

if daft_execution_config.enable_aqe:
adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config)
Expand Down
2 changes: 1 addition & 1 deletion daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ def run_iter(
daft_execution_config = get_context().daft_execution_config

# Optimize the logical plan.
builder = builder.optimize(daft_execution_config)
builder = builder.optimize()

if daft_execution_config.enable_aqe:
adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config)
Expand Down
7 changes: 1 addition & 6 deletions src/common/scan-info/src/scan_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
sync::Arc,
};

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
use daft_schema::schema::SchemaRef;

Expand Down Expand Up @@ -33,11 +32,7 @@ pub trait ScanOperator: Send + Sync + Debug {

/// If cfg provided, `to_scan_tasks` should apply the appropriate transformations
/// (merging, splitting) to the outputted scan tasks
fn to_scan_tasks(
&self,
pushdowns: Pushdowns,
config: Option<&DaftExecutionConfig>,
) -> DaftResult<Vec<ScanTaskLikeRef>>;
fn to_scan_tasks(&self, pushdowns: Pushdowns) -> DaftResult<Vec<ScanTaskLikeRef>>;
}

impl Display for dyn ScanOperator {
Expand Down
8 changes: 8 additions & 0 deletions src/common/scan-info/src/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@ use crate::Pushdowns;

#[typetag::serde(tag = "type")]
pub trait ScanTaskLike: Debug + DisplayAs + Send + Sync {
fn is_dummy(&self) -> bool {
false
}
fn as_any(&self) -> &dyn Any;
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
fn dyn_eq(&self, other: &dyn ScanTaskLike) -> bool;
fn dyn_hash(&self, state: &mut dyn Hasher);
fn split_by_row_groups(
self: Arc<Self>,
min_size_bytes: usize,
max_size_bytes: usize,
) -> DaftResult<Vec<Arc<dyn ScanTaskLike>>>;
#[must_use]
fn materialized_schema(&self) -> SchemaRef;
#[must_use]
Expand Down
18 changes: 13 additions & 5 deletions src/common/scan-info/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub struct DummyScanOperator {

#[typetag::serde]
impl ScanTaskLike for DummyScanTask {
fn is_dummy(&self) -> bool {
true
}

fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -46,6 +50,14 @@ impl ScanTaskLike for DummyScanTask {
self.hash(&mut state);
}

fn split_by_row_groups(
self: Arc<Self>,
_: usize,
_: usize,
) -> DaftResult<Vec<Arc<dyn ScanTaskLike>>> {
Ok(vec![self])
}

fn materialized_schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down Expand Up @@ -129,11 +141,7 @@ impl ScanOperator for DummyScanOperator {
vec!["DummyScanOperator".to_string()]
}

fn to_scan_tasks(
&self,
pushdowns: Pushdowns,
_: Option<&DaftExecutionConfig>,
) -> DaftResult<Vec<ScanTaskLikeRef>> {
fn to_scan_tasks(&self, pushdowns: Pushdowns) -> DaftResult<Vec<ScanTaskLikeRef>> {
let scan_task = Arc::new(DummyScanTask {
schema: self.schema.clone(),
pushdowns,
Expand Down
2 changes: 1 addition & 1 deletion src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ impl Session {
tokio::spawn(async move {
let execution_fut = async {
let plan = translation::to_logical_plan(command)?;
let optimized_plan = plan.optimize()?;
let cfg = Arc::new(DaftExecutionConfig::default());
let optimized_plan = plan.optimize(Some(cfg.clone()))?;
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
let mut result_stream = native_executor
.run(HashMap::new(), cfg, None)?
Expand Down
2 changes: 1 addition & 1 deletion src/daft-local-plan/src/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult<LocalPhysicalPlanRef> {
// We should be able to pass the ScanOperator into the physical plan directly but we need to figure out the serialization story
let scan_tasks = match &info.scan_state {
ScanState::Operator(scan_op) => {
Arc::new(scan_op.0.to_scan_tasks(info.pushdowns.clone(), None)?)
Arc::new(scan_op.0.to_scan_tasks(info.pushdowns.clone())?)
}
ScanState::Tasks(scan_tasks) => scan_tasks.clone(),
};
Expand Down
17 changes: 6 additions & 11 deletions src/daft-logical-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
sync::Arc,
};

use common_daft_config::{DaftExecutionConfig, DaftPlanningConfig};
use common_daft_config::DaftPlanningConfig;
use common_display::mermaid::MermaidDisplayOptions;
use common_error::{DaftError, DaftResult};
use common_file_formats::FileFormat;
Expand All @@ -16,7 +16,7 @@ use daft_schema::schema::{Schema, SchemaRef};
use {
crate::sink_info::{CatalogInfo, IcebergCatalogInfo},
crate::source_info::InMemoryInfo,
common_daft_config::{PyDaftExecutionConfig, PyDaftPlanningConfig},
common_daft_config::PyDaftPlanningConfig,
daft_dsl::python::PyExpr,
// daft_scan::python::pylib::ScanOperatorHandle,
daft_schema::python::schema::PySchema,
Expand Down Expand Up @@ -589,7 +589,7 @@ impl LogicalPlanBuilder {
Ok(self.with_new_plan(logical_plan))
}

pub fn optimize(&self, execution_config: Option<Arc<DaftExecutionConfig>>) -> DaftResult<Self> {
pub fn optimize(&self) -> DaftResult<Self> {
let default_optimizer_config: OptimizerConfig = Default::default();
let optimizer_config = OptimizerConfig {
enable_actor_pool_projections: self
Expand All @@ -599,7 +599,7 @@ impl LogicalPlanBuilder {
.unwrap_or(default_optimizer_config.enable_actor_pool_projections),
..default_optimizer_config
};
let optimizer = Optimizer::new(optimizer_config, execution_config);
let optimizer = Optimizer::new(optimizer_config);

// Run LogicalPlan optimizations
let unoptimized_plan = self.build();
Expand Down Expand Up @@ -957,13 +957,8 @@ impl PyLogicalPlanBuilder {
}

/// Optimize the underlying logical plan, returning a new plan builder containing the optimized plan.
pub fn optimize(
&self,
py: Python,
execution_config: Option<PyDaftExecutionConfig>,
) -> PyResult<Self> {
let execution_config = execution_config.map(|cfg| cfg.config);
py.allow_threads(|| Ok(self.builder.optimize(execution_config)?.into()))
pub fn optimize(&self, py: Python) -> PyResult<Self> {
py.allow_threads(|| Ok(self.builder.optimize()?.into()))
}

pub fn repr_ascii(&self, simple: bool) -> PyResult<String> {
Expand Down
8 changes: 2 additions & 6 deletions src/daft-logical-plan/src/ops/source.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
use common_scan_info::{PhysicalScanInfo, ScanState};
use daft_schema::schema::SchemaRef;
Expand Down Expand Up @@ -34,16 +33,13 @@ impl Source {
// Should only be called if a Source node's source info contains PhysicalScanInfo. The PhysicalScanInfo
// should also hold a ScanState::Operator and not a ScanState::Tasks (which would indicate that we're
// materializing this physical scan node multiple times).
pub(crate) fn build_materialized_scan_source(
mut self,
execution_config: Option<&DaftExecutionConfig>,
) -> DaftResult<Self> {
pub(crate) fn build_materialized_scan_source(mut self) -> DaftResult<Self> {
let new_physical_scan_info = match Arc::unwrap_or_clone(self.source_info) {
SourceInfo::Physical(mut physical_scan_info) => {
let scan_tasks = match &physical_scan_info.scan_state {
ScanState::Operator(scan_op) => scan_op
.0
.to_scan_tasks(physical_scan_info.pushdowns.clone(), execution_config)?,
.to_scan_tasks(physical_scan_info.pushdowns.clone())?,
ScanState::Tasks(_) => {
panic!("Physical scan nodes are being materialized more than once");
}
Expand Down
8 changes: 2 additions & 6 deletions src/daft-logical-plan/src/optimization/optimizer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{ops::ControlFlow, sync::Arc};

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
use common_treenode::Transformed;

Expand Down Expand Up @@ -91,10 +90,7 @@ pub struct Optimizer {
}

impl Optimizer {
pub fn new(
config: OptimizerConfig,
execution_config: Option<Arc<DaftExecutionConfig>>,
) -> Self {
pub fn new(config: OptimizerConfig) -> Self {
let mut rule_batches = Vec::new();

// --- Split ActorPoolProjection nodes from Project nodes ---
Expand Down Expand Up @@ -142,7 +138,7 @@ impl Optimizer {

// --- Materialize scan nodes ---
rule_batches.push(RuleBatch::new(
vec![Box::new(MaterializeScans::new(execution_config))],
vec![Box::new(MaterializeScans::new())],
RuleExecutionStrategy::Once,
));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
#[derive(Default, Debug)]
pub struct MaterializeScans {
execution_config: Option<Arc<DaftExecutionConfig>>,
}
pub struct MaterializeScans {}

impl MaterializeScans {
pub fn new(execution_config: Option<Arc<DaftExecutionConfig>>) -> Self {
Self { execution_config }
pub fn new() -> Self {
Self {}
}
}
use std::sync::Arc;

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
use common_treenode::{Transformed, TreeNode};

Expand All @@ -36,9 +33,7 @@ impl MaterializeScans {
let source_plan = Arc::unwrap_or_clone(plan);
if let LogicalPlan::Source(source) = source_plan {
Ok(Transformed::yes(
source
.build_materialized_scan_source(self.execution_config.as_deref())?
.into(),
source.build_materialized_scan_source()?.into(),
))
} else {
unreachable!("This logical plan was already matched as a Source node")
Expand Down
1 change: 1 addition & 0 deletions src/daft-physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-functions = {path = "../daft-functions", default-features = false}
daft-logical-plan = {path = "../daft-logical-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
daft-schema = {path = "../daft-schema", default-features = false}
itertools = {workspace = true}
log = {workspace = true}
Expand Down
2 changes: 1 addition & 1 deletion src/daft-physical-plan/src/physical_planner/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ impl AdaptivePlanner {

self.logical_plan = result.data;

let optimizer = Optimizer::new(Default::default(), Some(self.cfg.clone()));
let optimizer = Optimizer::new(Default::default());

self.logical_plan = optimizer.optimize(
self.logical_plan.clone(),
Expand Down
16 changes: 15 additions & 1 deletion src/daft-physical-plan/src/physical_planner/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use daft_logical_plan::{
sink_info::{OutputFileInfo, SinkInfo},
source_info::{PlaceHolderInfo, SourceInfo},
};
use daft_scan::scan_task_iters::{merge_by_sizes, split_by_row_groups};

use crate::{ops::*, PhysicalPlan, PhysicalPlanRef};

Expand All @@ -49,7 +50,7 @@ pub(super) fn translate_single_logical_node(
let scan_tasks = {
match scan_state {
ScanState::Operator(scan_op) => {
Arc::new(scan_op.0.to_scan_tasks(pushdowns.clone(), Some(cfg))?)
Arc::new(scan_op.0.to_scan_tasks(pushdowns.clone())?)
}
ScanState::Tasks(scan_tasks) => scan_tasks.clone(),
}
Expand All @@ -65,6 +66,19 @@ pub(super) fn translate_single_logical_node(
))
.arced())
} else {
// Perform scan task splitting and merging if there are no dummy scan tasks.
let scan_tasks = if !scan_tasks.iter().any(|st| st.is_dummy()) {
let split_tasks = split_by_row_groups(
scan_tasks,
cfg.parquet_split_row_groups_max_files,
cfg.scan_tasks_min_size_bytes,
cfg.scan_tasks_max_size_bytes,
)?;
merge_by_sizes(split_tasks, pushdowns, cfg)?
} else {
scan_tasks
};

let clustering_spec = Arc::new(ClusteringSpec::Unknown(
UnknownClusteringConfig::new(scan_tasks.len()),
));
Expand Down
Loading

0 comments on commit 72d6f34

Please sign in to comment.