Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(table): implement list_unique and Set aggregation #3710

Open
wants to merge 20 commits into
base: main
Choose a base branch
from

Conversation

f4t4nt
Copy link
Contributor

@f4t4nt f4t4nt commented Jan 17, 2025

Implements agg_set aggregation expression requested in #3661. This aggregation is similar to agg_list but only keeps distinct elements in the resulting list.

Key features:

  1. agg_set(): Creates a list of distinct elements in a group

    • Similar interface to existing agg_list()
    • Automatically deduplicates non-null elements
    • Preserves null lists (when entire list is null)
  2. Implementation details:

    • Built on top of agg_list() and new list_unique() operation
    • Handles both regular and fixed-size lists
    • Maintains order of first occurrence for each unique element

Example usage:

# Similar to agg_list() but with distinct elements
df.groupby("key").agg([
    df["values"].agg_set()  # Creates list of unique elements per group
])

# Before: agg_list()
df.agg([col("values").agg_list()])
# {"values": [[1, None, 2, 2, 1]]}

# After: agg_set()
df.agg([col("values").agg_set()])
# {"values": [[1, 2]]}  # Nulls within lists are excluded

@github-actions github-actions bot added the feat label Jan 17, 2025
@f4t4nt f4t4nt linked an issue Jan 17, 2025 that may be closed by this pull request
Copy link

codspeed-hq bot commented Jan 17, 2025

CodSpeed Performance Report

Merging #3710 will not alter performance

Comparing nishant-set-agg (aa31d66) with main (63ffd5e)

Summary

✅ 27 untouched benchmarks

Copy link

codecov bot commented Jan 17, 2025

Codecov Report

Attention: Patch coverage is 89.16155% with 53 lines in your changes missing coverage. Please review.

Project coverage is 77.79%. Comparing base (a8d63dd) to head (aa31d66).
Report is 14 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-core/src/array/ops/set_agg.rs 93.04% 16 Missing ⚠️
src/daft-functions/src/list/unique.rs 85.43% 15 Missing ⚠️
daft/dataframe/dataframe.py 46.15% 7 Missing ⚠️
src/daft-core/src/series/ops/hash.rs 55.55% 4 Missing ⚠️
src/daft-core/src/python/series.rs 0.00% 3 Missing ⚠️
src/daft-logical-plan/src/ops/project.rs 0.00% 3 Missing ⚠️
daft/expressions/expressions.py 88.88% 1 Missing ⚠️
...rc/daft-core/src/series/array_impl/nested_array.rs 90.00% 1 Missing ⚠️
src/daft-core/src/series/mod.rs 97.87% 1 Missing ⚠️
src/daft-dsl/src/expr/mod.rs 88.88% 1 Missing ⚠️
... and 1 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3710      +/-   ##
==========================================
+ Coverage   77.58%   77.79%   +0.20%     
==========================================
  Files         729      735       +6     
  Lines       92035    93060    +1025     
==========================================
+ Hits        71409    72398     +989     
- Misses      20626    20662      +36     
Files with missing lines Coverage Δ
src/daft-core/src/series/array_impl/data_array.rs 96.33% <100.00%> (+0.37%) ⬆️
...c/daft-core/src/series/array_impl/logical_array.rs 95.16% <100.00%> (+0.71%) ⬆️
src/daft-core/src/series/ops/agg.rs 78.02% <100.00%> (+0.36%) ⬆️
src/daft-dsl/src/python.rs 91.23% <100.00%> (+0.16%) ⬆️
src/daft-functions/src/python/list.rs 100.00% <100.00%> (ø)
src/daft-functions/src/python/mod.rs 100.00% <100.00%> (ø)
...ft-physical-plan/src/physical_planner/translate.rs 94.18% <100.00%> (+0.16%) ⬆️
src/daft-table/src/lib.rs 84.27% <100.00%> (+0.12%) ⬆️
daft/expressions/expressions.py 93.96% <88.88%> (+0.32%) ⬆️
...rc/daft-core/src/series/array_impl/nested_array.rs 70.09% <90.00%> (+5.14%) ⬆️
... and 9 more

... and 69 files with indirect coverage changes

@f4t4nt f4t4nt requested a review from kevinzwang January 29, 2025 22:27
Copy link
Member

@kevinzwang kevinzwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good so far. Two general notes in addition to my comments:

  1. Should list_unique be called list_distinct instead? It aligns better with the naming of DataFrame.distinct function. The naming in other engines vary, but PySpark and DuckDB do call it distinct.
  2. Does it make sense to have ignore_nulls at all? I feel like the behavior of ignore_nulls=False is ambiguous, not sure if is sensible to keep a single null value if there are any nulls anyway. Might just be better to say that these functions simply discard all the nulls and have the user deal with nulls themselves. Apologies for bringing this up after you've done all this work already

@@ -982,6 +983,11 @@ def agg_list(self) -> Expression:
expr = self._expr.agg_list()
return Expression._from_pyexpr(expr)

def agg_set(self, ignore_nulls: bool = True) -> Expression:
"""Aggregates the values in the expression into a set."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some examples here, especially with behavior on nulls

(Showing first 3 of 3 rows)

Args:
ignore_nulls: Whether to ignore null values in the result. Defaults to True.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add some examples here too. Does ignore_nulls=True mean nulls are considered the same or unique? I'm okay with either but good to clarify in the docs

docs/sphinx/source/expressions.rst Show resolved Hide resolved
src/daft-core/src/array/ops/set_agg.rs Outdated Show resolved Hide resolved
Comment on lines +53 to +56
if ignore_nulls {
let not_null_mask = DaftNotNull::not_null(self)?.into_series();
child_series = child_series.filter(not_null_mask.bool()?)?;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative to this is to let deduplicate_series handle the null filtering. It's going through all of the elements anyway.

}
}

fn evaluate(&self, inputs: &[Series]) -> DaftResult<Series> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move the implementation of evaluate (the stuff inside the match) out of this file and into maybe daft-core/src/array/ops to match the other functions. This file is kind of like the definition (naming, typing, etc) of the function, instead of the actual implementation.

Comment on lines +60 to +71
let fixed_list = input.fixed_size_list()?;
let arrow_array = fixed_list.to_arrow();
let list_type = ArrowDataType::List(Box::new(arrow2::datatypes::Field::new(
"item",
inner_type.as_ref().to_arrow()?,
true,
)));
let list_array = cast(arrow_array.as_ref(), &list_type, Default::default())?;
Series::try_from_field_and_arrow_array(
Field::new(input.name(), DataType::List(inner_type.clone())),
list_array,
)?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can straight up cast a FixedSizeList type series into a List type instead of implementing this

Comment on lines +133 to +135
for (i, series) in result.iter().enumerate() {
growable.extend(i, 0, series.len());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way you're using the growable here still requires two copies of the data, once to construct result and once at growable.build(). Happy to chat about how you could do this more efficiently!

Comment on lines +1102 to +1111
let list_agg_id =
add_to_stage(AggExpr::List, e.clone(), schema, &mut first_stage_aggs);
let list_concat_id = add_to_stage(
AggExpr::Concat,
col(list_agg_id.clone()),
schema,
&mut second_stage_aggs,
);
let result = unique(col(list_concat_id.clone()), *ignore_nulls).alias(output_name);
final_exprs.push(result);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Come to think of it, since the translation here converts it into a two-stage aggregation that uses list and then concat+unique, are we ever actually using Series::agg_set during an execution of agg_set? If we don't intend to, there's actually no need to implement that at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is pretty confusing how the expression that is called is not always the one that we actually execute in an agg. Let me know if you need me to explain any of this.

Comment on lines +526 to +530
// AggExpr::Set(expr) => {
// let list = self.eval_expression(expr)?.agg_list(groups)?;
// let unique_expr = unique(col(list.name()), false);
// self.eval_expression(&unique_expr)
// }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

agg_set aggregation expression
2 participants