Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): Implement greedy join reordering #3538

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/common/scan-info/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ use crate::{PartitionField, Pushdowns, ScanOperator, ScanTaskLike, ScanTaskLikeR
struct DummyScanTask {
pub schema: SchemaRef,
pub pushdowns: Pushdowns,
pub in_memory_size: Option<usize>,
}

#[derive(Debug)]
pub struct DummyScanOperator {
pub schema: SchemaRef,
pub num_scan_tasks: u32,
pub in_memory_size_per_task: Option<usize>,
}

#[typetag::serde]
Expand Down Expand Up @@ -67,7 +69,7 @@ impl ScanTaskLike for DummyScanTask {
}

fn estimate_in_memory_size_bytes(&self, _: Option<&DaftExecutionConfig>) -> Option<usize> {
None
self.in_memory_size
}

fn file_format_config(&self) -> Arc<FileFormatConfig> {
Expand Down Expand Up @@ -136,6 +138,7 @@ impl ScanOperator for DummyScanOperator {
let scan_task = Arc::new(DummyScanTask {
schema: self.schema.clone(),
pushdowns,
in_memory_size: self.in_memory_size_per_task,
});

Ok((0..self.num_scan_tasks)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::{collections::HashMap, sync::Arc};

use common_error::DaftResult;
use daft_dsl::{col, ExprRef};

use super::join_graph::{JoinCondition, JoinGraph};
use crate::{LogicalPlanBuilder, LogicalPlanRef};

// This is an implementation of the Greedy Operator Ordering algorithm (GOO) [1] for join selection. This algorithm
// selects join edges greedily by picking the edge with the smallest cost at each step. This is similar to Kruskal's
// minimum spanning tree algorithm, with the caveat that edge costs update at each step, due to changing cardinalities
// and selectivities between join nodes.
//
// Compared to DP-based algorithms, GOO is not always optimal. However, GOO has a complexity of O(n^3) and is more viable
// than DP-based algorithms when performing join ordering on many relations. DP Connected subgraph Complement Pairs (DPccp) [2]
// is the DP-based algorithm widely used in database systems today and has a O(3^n) complexity, although the latest
// literature does offer a super-polynomially faster DP-algorithm but that still has a O(2^n) to O(2^n * n^3) complexity [3].
//
// For this reason, we maintain a greedy-based join ordering algorithm to use when the number of relations is large, and default
// to DP-based algorithms otherwise.
//
// [1]: Fegaras, L. (1998). A New Heuristic for Optimizing Large Queries. International Conference on Database and Expert Systems Applications.
// [2]: Moerkotte, G., & Neumann, T. (2006). Analysis of two existing and one new dynamic programming algorithm for the generation of optimal bushy join trees without cross products. Very Large Data Bases Conference.
// [3]: Stoian, M., & Kipf, A. (2024). DPconv: Super-Polynomially Faster Join Ordering. ArXiv, abs/2409.08013.
pub(crate) struct GreedyJoinOrderer {}

impl GreedyJoinOrderer {
/// Consumes the join graph and transforms it into a logical plan with joins reordered.
pub(crate) fn compute_join_order(join_graph: &mut JoinGraph) -> DaftResult<LogicalPlanRef> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is doing too much (reordering and creating the logical plan) and isn't really that modular.

Instead what I would recommend is the following:

Trait JoinReorderer {
  fn reorder(&self, &mut graph: JoinGraph)
}

LogicalPlan -> JoinGraphBuilder -> JoinGraph -> JoinReorderer.reorder(JoinGraph) -> JoinGraph -> LogicalPlan

This way you get two things:

  1. must easier to test that the JoinGraph roundtrip is working correctly
  2. much easier to plug in a JoinReorderer (just implement that Trait)

// While the join graph consists of more than one join node, select the edge that has the smallest cost,
// then join the left and right nodes connected by this edge.
while join_graph.adj_list.0.len() > 1 {
let selected_pair = GreedyJoinOrderer::find_minimum_cost_join(&join_graph.adj_list.0);
if let Some((left, right, join_conds)) = selected_pair {
// Join the left and right relations using the given join conditions.
let (left_on, right_on) = join_conds
.iter()
.map(|join_cond| {
(
col(join_cond.left_on.clone()),
col(join_cond.right_on.clone()),
)
})
.collect::<(Vec<ExprRef>, Vec<ExprRef>)>();
let left_builder = LogicalPlanBuilder::from(left.clone());
let join = left_builder
.inner_join(right.clone(), left_on, right_on)?
.build();
let join = Arc::new(Arc::unwrap_or_clone(join).with_materialized_stats());

// Add the new node into the adjacency list.
let left_neighbors = join_graph.adj_list.0.remove(&left).unwrap();
let right_neighbors = join_graph.adj_list.0.remove(&right).unwrap();
let mut new_join_edges = HashMap::new();

// Helper function that takes in neighbors to the left and right nodes, then combines edges that point
// back to the left and/or right nodes into edges that point to the new join node.
let mut update_neighbors =
|neighbors: HashMap<LogicalPlanRef, Vec<JoinCondition>>| {
for (neighbor, _) in neighbors {
if neighbor == right || neighbor == left {
// Skip the nodes that we just joined.
continue;
}
let mut join_conditions = Vec::new();
// If this neighbor was connected to left or right nodes, collect the join conditions.
let neighbor_edges = join_graph
.adj_list
.0
.get_mut(&neighbor)
.expect("The neighbor should still be in the join graph");
if let Some(left_conds) = neighbor_edges.remove(&left) {
join_conditions.extend(left_conds);
}
if let Some(right_conds) = neighbor_edges.remove(&right) {
join_conditions.extend(right_conds);
}
// If this neighbor had any connections to left or right, create a new edge to the new join node.
if !join_conditions.is_empty() {
neighbor_edges.insert(join.clone(), join_conditions.clone());
new_join_edges.insert(
neighbor.clone(),
join_conditions.iter().map(|cond| cond.flip()).collect(),
);
}
}
};

// Process all neighbors from both the left and right sides.
update_neighbors(left_neighbors);
update_neighbors(right_neighbors);

// Add the new join node and its edges to the graph.
join_graph.adj_list.0.insert(join, new_join_edges);
} else {
panic!(
"No valid join edge selected despite join graph containing more than one relation"
);

Check warning on line 98 in src/daft-logical-plan/src/optimization/rules/reorder_joins/greedy_join_order.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-logical-plan/src/optimization/rules/reorder_joins/greedy_join_order.rs#L96-L98

Added lines #L96 - L98 were not covered by tests
}
}
// Apply projections and filters on top of the fully joined plan.
if let Some(joined_plan) = join_graph.adj_list.0.drain().map(|(plan, _)| plan).last() {
join_graph.apply_projections_and_filters_to_plan(joined_plan)
} else {
panic!("No valid logical plan after join reordering")

Check warning on line 105 in src/daft-logical-plan/src/optimization/rules/reorder_joins/greedy_join_order.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-logical-plan/src/optimization/rules/reorder_joins/greedy_join_order.rs#L105

Added line #L105 was not covered by tests
}
}

/// Helper functions that finds the next join edge in the adjacency list that has the smallest cost.
/// Currently cost is determined based on the max size in bytes of the candidate left and right relations.
fn find_minimum_cost_join(
adj_list: &HashMap<LogicalPlanRef, HashMap<LogicalPlanRef, Vec<JoinCondition>>>,
) -> Option<(LogicalPlanRef, LogicalPlanRef, Vec<JoinCondition>)> {
let mut min_cost = None;
let mut selected_pair = None;

for (candidate_left, neighbors) in adj_list {
for (candidate_right, join_conds) in neighbors {
let left_stats = candidate_left.materialized_stats();
let right_stats = candidate_right.materialized_stats();

// Assume primary key foreign key join which would have a size bounded by the foreign key relation,
// which is typically larger.
let cur_cost = left_stats
.approx_stats
.upper_bound_bytes
.max(right_stats.approx_stats.upper_bound_bytes);

if let Some(existing_min) = min_cost {
if let Some(current) = cur_cost {
if current < existing_min {
min_cost = Some(current);
selected_pair = Some((
candidate_left.clone(),
candidate_right.clone(),
join_conds.clone(),
));
}
}

Check warning on line 139 in src/daft-logical-plan/src/optimization/rules/reorder_joins/greedy_join_order.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-logical-plan/src/optimization/rules/reorder_joins/greedy_join_order.rs#L139

Added line #L139 was not covered by tests
} else {
min_cost = cur_cost;
selected_pair = Some((
candidate_left.clone(),
candidate_right.clone(),
join_conds.clone(),
));
}
}
}

selected_pair
}
}
Loading
Loading