-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(table): implement list_unique and Set aggregation #3710
base: main
Are you sure you want to change the base?
Conversation
CodSpeed Performance ReportMerging #3710 will not alter performanceComparing Summary
|
71f24f1
to
a232665
Compare
…e.rs and cleanup style
- Add hash-based deduplication in set_agg.rs - Add proper null value handling in set operations - Add debug logging for set operations - Improve error messaging for unhashable types - Update tests to verify set aggregation behavior
- Remove daft-functions from daft-table dependencies - Remove daft-functions/python from python feature
a232665
to
7512dd4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good so far. Two general notes in addition to my comments:
- Should
list_unique
be calledlist_distinct
instead? It aligns better with the naming ofDataFrame.distinct
function. The naming in other engines vary, but PySpark and DuckDB do call it distinct. - Does it make sense to have
ignore_nulls
at all? I feel like the behavior ofignore_nulls=False
is ambiguous, not sure if is sensible to keep a single null value if there are any nulls anyway. Might just be better to say that these functions simply discard all the nulls and have the user deal with nulls themselves. Apologies for bringing this up after you've done all this work already
@@ -982,6 +983,11 @@ def agg_list(self) -> Expression: | |||
expr = self._expr.agg_list() | |||
return Expression._from_pyexpr(expr) | |||
|
|||
def agg_set(self, ignore_nulls: bool = True) -> Expression: | |||
"""Aggregates the values in the expression into a set.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some examples here, especially with behavior on nulls
(Showing first 3 of 3 rows) | ||
|
||
Args: | ||
ignore_nulls: Whether to ignore null values in the result. Defaults to True. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add some examples here too. Does ignore_nulls=True mean nulls are considered the same or unique? I'm okay with either but good to clarify in the docs
if ignore_nulls { | ||
let not_null_mask = DaftNotNull::not_null(self)?.into_series(); | ||
child_series = child_series.filter(not_null_mask.bool()?)?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative to this is to let deduplicate_series
handle the null filtering. It's going through all of the elements anyway.
} | ||
} | ||
|
||
fn evaluate(&self, inputs: &[Series]) -> DaftResult<Series> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move the implementation of evaluate (the stuff inside the match) out of this file and into maybe daft-core/src/array/ops
to match the other functions. This file is kind of like the definition (naming, typing, etc) of the function, instead of the actual implementation.
let fixed_list = input.fixed_size_list()?; | ||
let arrow_array = fixed_list.to_arrow(); | ||
let list_type = ArrowDataType::List(Box::new(arrow2::datatypes::Field::new( | ||
"item", | ||
inner_type.as_ref().to_arrow()?, | ||
true, | ||
))); | ||
let list_array = cast(arrow_array.as_ref(), &list_type, Default::default())?; | ||
Series::try_from_field_and_arrow_array( | ||
Field::new(input.name(), DataType::List(inner_type.clone())), | ||
list_array, | ||
)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you can straight up cast a FixedSizeList
type series into a List
type instead of implementing this
for (i, series) in result.iter().enumerate() { | ||
growable.extend(i, 0, series.len()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way you're using the growable here still requires two copies of the data, once to construct result
and once at growable.build()
. Happy to chat about how you could do this more efficiently!
let list_agg_id = | ||
add_to_stage(AggExpr::List, e.clone(), schema, &mut first_stage_aggs); | ||
let list_concat_id = add_to_stage( | ||
AggExpr::Concat, | ||
col(list_agg_id.clone()), | ||
schema, | ||
&mut second_stage_aggs, | ||
); | ||
let result = unique(col(list_concat_id.clone()), *ignore_nulls).alias(output_name); | ||
final_exprs.push(result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Come to think of it, since the translation here converts it into a two-stage aggregation that uses list and then concat+unique, are we ever actually using Series::agg_set
during an execution of agg_set? If we don't intend to, there's actually no need to implement that at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is pretty confusing how the expression that is called is not always the one that we actually execute in an agg. Let me know if you need me to explain any of this.
// AggExpr::Set(expr) => { | ||
// let list = self.eval_expression(expr)?.agg_list(groups)?; | ||
// let unique_expr = unique(col(list.name()), false); | ||
// self.eval_expression(&unique_expr) | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
Implements
agg_set
aggregation expression requested in #3661. This aggregation is similar toagg_list
but only keeps distinct elements in the resulting list.Key features:
agg_set()
: Creates a list of distinct elements in a groupagg_list()
Implementation details:
agg_list()
and newlist_unique()
operationExample usage: