From aadeb69b32db8f0d2bc16a1f0ea8a0033b7d3920 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 14 Nov 2023 08:19:27 +0530 Subject: [PATCH] [PERF] Speed Up MicroPartition Ops when we know the result is empty (#1604) Return empty micropartitions when we know the result is going to be empty. This allows us to avoid IO in places where it is not needed --- src/daft-core/src/series/ops/take.rs | 3 ++- src/daft-micropartition/src/ops/join.rs | 7 ++++++- src/daft-micropartition/src/ops/slice.rs | 13 +++++++++++-- src/daft-micropartition/src/ops/take.rs | 12 ++++++++++++ 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/daft-core/src/series/ops/take.rs b/src/daft-core/src/series/ops/take.rs index c54f695c7e..0383db2e80 100644 --- a/src/daft-core/src/series/ops/take.rs +++ b/src/daft-core/src/series/ops/take.rs @@ -11,7 +11,8 @@ impl Series { } pub fn slice(&self, start: usize, end: usize) -> DaftResult { - self.inner.slice(start, end) + let l = self.len(); + self.inner.slice(start.min(l), end.min(l)) } pub fn take(&self, idx: &Series) -> DaftResult { diff --git a/src/daft-micropartition/src/ops/join.rs b/src/daft-micropartition/src/ops/join.rs index 235c9c44ae..f8d5efbe5f 100644 --- a/src/daft-micropartition/src/ops/join.rs +++ b/src/daft-micropartition/src/ops/join.rs @@ -10,7 +10,13 @@ use daft_stats::TruthValue; impl MicroPartition { pub fn join(&self, right: &Self, left_on: &[Expr], right_on: &[Expr]) -> DaftResult { + let io_stats = IOStatsContext::new("MicroPartition::join"); let join_schema = infer_join_schema(&self.schema, &right.schema, left_on, right_on)?; + + if self.len() == 0 || right.len() == 0 { + return Ok(Self::empty(Some(join_schema.into()))); + } + let tv = match (&self.statistics, &right.statistics) { (_, None) => TruthValue::Maybe, (None, _) => TruthValue::Maybe, @@ -34,7 +40,6 @@ impl MicroPartition { if let TruthValue::False = tv { return Ok(Self::empty(Some(join_schema.into()))); } - let io_stats = IOStatsContext::new("MicroPartition::join"); let lt = self.concat_or_get(io_stats.clone())?; let rt = right.concat_or_get(io_stats)?; diff --git a/src/daft-micropartition/src/ops/slice.rs b/src/daft-micropartition/src/ops/slice.rs index b16d5e7846..e9e6fa2161 100644 --- a/src/daft-micropartition/src/ops/slice.rs +++ b/src/daft-micropartition/src/ops/slice.rs @@ -1,4 +1,4 @@ -use common_error::DaftResult; +use common_error::{DaftError, DaftResult}; use daft_io::IOStatsContext; use crate::micropartition::MicroPartition; @@ -7,9 +7,18 @@ impl MicroPartition { pub fn slice(&self, start: usize, end: usize) -> DaftResult { let io_stats = IOStatsContext::new(format!("MicroPartition::slice {start}-{end}")); + let mut rows_needed = end.min(self.len()) - start.min(self.len()); + + if start > end { + return Err(DaftError::ValueError(format!( + "Trying to slice MicroPartition with negative length, start: {start} vs end: {end}" + ))); + } else if rows_needed == 0 { + return Ok(Self::empty(Some(self.schema.clone()))); + } + let tables = self.tables_or_read(io_stats)?; let mut slices_tables = vec![]; - let mut rows_needed = (end - start).max(0); let mut offset_so_far = start; for tab in tables.iter() { diff --git a/src/daft-micropartition/src/ops/take.rs b/src/daft-micropartition/src/ops/take.rs index bc412b42c7..8bff2dcff4 100644 --- a/src/daft-micropartition/src/ops/take.rs +++ b/src/daft-micropartition/src/ops/take.rs @@ -11,6 +11,10 @@ impl MicroPartition { pub fn take(&self, idx: &Series) -> DaftResult { let io_stats = IOStatsContext::new("MicroPartition::take"); + if idx.is_empty() { + return Ok(Self::empty(Some(self.schema.clone()))); + } + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { // Fallback onto `[empty_table]` behavior @@ -38,6 +42,10 @@ impl MicroPartition { pub fn sample(&self, num: usize) -> DaftResult { let io_stats = IOStatsContext::new(format!("MicroPartition::sample({num})")); + if num == 0 { + return Ok(Self::empty(Some(self.schema.clone()))); + } + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { @@ -57,6 +65,10 @@ impl MicroPartition { pub fn quantiles(&self, num: usize) -> DaftResult { let io_stats = IOStatsContext::new(format!("MicroPartition::quantiles({num})")); + if num <= 1 { + return Ok(Self::empty(Some(self.schema.clone()))); + } + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { [] => Ok(Self::empty(Some(self.schema.clone()))),