diff --git a/.gitignore b/.gitignore index 12e933d2b..b32bb6a90 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ bench/shakespeare.txt .idea/ .ipynb_checkpoints/ coverage.xml +venv/ diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 326f1f496..9230b2903 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -3654,13 +3654,39 @@ def query(self, expr, **kwargs): return new_collection(Query(self, expr, kwargs)) @derived_from(pd.DataFrame) - def mode(self, dropna=True, split_every=False, numeric_only=False): - modes = [] - for _, col in self.items(): - if numeric_only and not pd.api.types.is_numeric_dtype(col.dtype): - continue - modes.append(col.mode(dropna=dropna, split_every=split_every)) - return concat(modes, axis=1) + # GH#11389 - Dask issue related to adding row-wise mode functionality + # GH#1136 - Dask-Expr specific implementation for row-wise mode functionality + # Contributor: @thyripian + def mode(self, axis=0, numeric_only=False, dropna=True, split_every=False): + if axis == 0: + # Existing logic for axis=0 (column-wise mode) + modes = [] + for _, col in self.items(): + if numeric_only and not pd.api.types.is_numeric_dtype(col.dtype): + continue + modes.append(col.mode(dropna=dropna, split_every=split_every)) + return concat(modes, axis=1) + elif axis == 1: + # Use self._meta_nonempty to generate meta + meta = self._meta_nonempty.mode( + axis=1, numeric_only=numeric_only, dropna=dropna + ) + + # Determine the maximum number of modes any row can have + max_modes = len(self.columns) + + # Reindex meta to have the maximum number of columns + meta = meta.reindex(columns=range(max_modes)) + + # Apply map_partitions using pandas' mode function directly + return self.map_partitions( + lambda df: df.mode( + axis=1, numeric_only=numeric_only, dropna=dropna + ).reindex(columns=range(max_modes)), + meta=meta, + ) + else: + raise ValueError(f"No axis named {axis} for object type {type(self)}") @derived_from(pd.DataFrame) def add_prefix(self, prefix): diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index aa807eabd..ab2b8b74c 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1269,9 +1269,11 @@ def _simplify_up(self, parent, dependents): columns = _convert_to_list(columns) frame_columns = set(self.frame.columns) columns = [ - reverse_mapping[col] - if col in reverse_mapping and reverse_mapping[col] in frame_columns - else col + ( + reverse_mapping[col] + if col in reverse_mapping and reverse_mapping[col] in frame_columns + else col + ) for col in columns ] columns = [col for col in self.frame.columns if col in columns] @@ -2342,9 +2344,11 @@ class AddPrefix(Elemwise): @functools.cached_property def unique_partition_mapping_columns_from_shuffle(self): return { - f"{self.prefix}{c}" - if not isinstance(c, tuple) - else tuple(self.prefix + t for t in c) + ( + f"{self.prefix}{c}" + if not isinstance(c, tuple) + else tuple(self.prefix + t for t in c) + ) for c in self.frame.unique_partition_mapping_columns_from_shuffle } @@ -2373,9 +2377,11 @@ class AddSuffix(AddPrefix): @functools.cached_property def unique_partition_mapping_columns_from_shuffle(self): return { - f"{c}{self.suffix}" - if not isinstance(c, tuple) - else tuple(t + self.suffix for t in c) + ( + f"{c}{self.suffix}" + if not isinstance(c, tuple) + else tuple(t + self.suffix for t in c) + ) for c in self.frame.unique_partition_mapping_columns_from_shuffle } @@ -2421,9 +2427,11 @@ def _task(self, index: int): def _simplify_down(self): if isinstance(self.frame, Elemwise): operands = [ - Head(op, self.n, self.operand("npartitions")) - if isinstance(op, Expr) and not isinstance(op, _DelayedExpr) - else op + ( + Head(op, self.n, self.operand("npartitions")) + if isinstance(op, Expr) and not isinstance(op, _DelayedExpr) + else op + ) for op in self.frame.operands ] return type(self.frame)(*operands)