Skip to content

Commit

Permalink
[PERF] Speed Up MicroPartition Ops when we know the result is empty (#…
Browse files Browse the repository at this point in the history
…1604)

Return empty micropartitions when we know the result is going to be
empty. This allows us to avoid IO in places where it is not needed
  • Loading branch information
samster25 authored Nov 14, 2023
1 parent 321f78e commit aadeb69
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 4 deletions.
3 changes: 2 additions & 1 deletion src/daft-core/src/series/ops/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ impl Series {
}

pub fn slice(&self, start: usize, end: usize) -> DaftResult<Series> {
self.inner.slice(start, end)
let l = self.len();
self.inner.slice(start.min(l), end.min(l))
}

pub fn take(&self, idx: &Series) -> DaftResult<Series> {
Expand Down
7 changes: 6 additions & 1 deletion src/daft-micropartition/src/ops/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ use daft_stats::TruthValue;

impl MicroPartition {
pub fn join(&self, right: &Self, left_on: &[Expr], right_on: &[Expr]) -> DaftResult<Self> {
let io_stats = IOStatsContext::new("MicroPartition::join");
let join_schema = infer_join_schema(&self.schema, &right.schema, left_on, right_on)?;

if self.len() == 0 || right.len() == 0 {
return Ok(Self::empty(Some(join_schema.into())));
}

let tv = match (&self.statistics, &right.statistics) {
(_, None) => TruthValue::Maybe,
(None, _) => TruthValue::Maybe,
Expand All @@ -34,7 +40,6 @@ impl MicroPartition {
if let TruthValue::False = tv {
return Ok(Self::empty(Some(join_schema.into())));
}
let io_stats = IOStatsContext::new("MicroPartition::join");

let lt = self.concat_or_get(io_stats.clone())?;
let rt = right.concat_or_get(io_stats)?;
Expand Down
13 changes: 11 additions & 2 deletions src/daft-micropartition/src/ops/slice.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use common_error::DaftResult;
use common_error::{DaftError, DaftResult};
use daft_io::IOStatsContext;

use crate::micropartition::MicroPartition;
Expand All @@ -7,9 +7,18 @@ impl MicroPartition {
pub fn slice(&self, start: usize, end: usize) -> DaftResult<Self> {
let io_stats = IOStatsContext::new(format!("MicroPartition::slice {start}-{end}"));

let mut rows_needed = end.min(self.len()) - start.min(self.len());

if start > end {
return Err(DaftError::ValueError(format!(
"Trying to slice MicroPartition with negative length, start: {start} vs end: {end}"
)));
} else if rows_needed == 0 {
return Ok(Self::empty(Some(self.schema.clone())));
}

let tables = self.tables_or_read(io_stats)?;
let mut slices_tables = vec![];
let mut rows_needed = (end - start).max(0);
let mut offset_so_far = start;

for tab in tables.iter() {
Expand Down
12 changes: 12 additions & 0 deletions src/daft-micropartition/src/ops/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ impl MicroPartition {
pub fn take(&self, idx: &Series) -> DaftResult<Self> {
let io_stats = IOStatsContext::new("MicroPartition::take");

if idx.is_empty() {
return Ok(Self::empty(Some(self.schema.clone())));
}

let tables = self.concat_or_get(io_stats)?;
match tables.as_slice() {
// Fallback onto `[empty_table]` behavior
Expand Down Expand Up @@ -38,6 +42,10 @@ impl MicroPartition {
pub fn sample(&self, num: usize) -> DaftResult<Self> {
let io_stats = IOStatsContext::new(format!("MicroPartition::sample({num})"));

if num == 0 {
return Ok(Self::empty(Some(self.schema.clone())));
}

let tables = self.concat_or_get(io_stats)?;

match tables.as_slice() {
Expand All @@ -57,6 +65,10 @@ impl MicroPartition {
pub fn quantiles(&self, num: usize) -> DaftResult<Self> {
let io_stats = IOStatsContext::new(format!("MicroPartition::quantiles({num})"));

if num <= 1 {
return Ok(Self::empty(Some(self.schema.clone())));
}

let tables = self.concat_or_get(io_stats)?;
match tables.as_slice() {
[] => Ok(Self::empty(Some(self.schema.clone()))),
Expand Down

0 comments on commit aadeb69

Please sign in to comment.