Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Row-Wise Mode Functionality (axis=1) and Improve Metadata Handling in _collection.py #1137

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ bench/shakespeare.txt
.idea/
.ipynb_checkpoints/
coverage.xml
venv/
40 changes: 33 additions & 7 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3654,13 +3654,39 @@ def query(self, expr, **kwargs):
return new_collection(Query(self, expr, kwargs))

@derived_from(pd.DataFrame)
def mode(self, dropna=True, split_every=False, numeric_only=False):
modes = []
for _, col in self.items():
if numeric_only and not pd.api.types.is_numeric_dtype(col.dtype):
continue
modes.append(col.mode(dropna=dropna, split_every=split_every))
return concat(modes, axis=1)
# GH#11389 - Dask issue related to adding row-wise mode functionality
# GH#1136 - Dask-Expr specific implementation for row-wise mode functionality
# Contributor: @thyripian
def mode(self, axis=0, numeric_only=False, dropna=True, split_every=False):
if axis == 0:
# Existing logic for axis=0 (column-wise mode)
modes = []
for _, col in self.items():
if numeric_only and not pd.api.types.is_numeric_dtype(col.dtype):
continue
modes.append(col.mode(dropna=dropna, split_every=split_every))
return concat(modes, axis=1)
elif axis == 1:
# Use self._meta_nonempty to generate meta
meta = self._meta_nonempty.mode(
axis=1, numeric_only=numeric_only, dropna=dropna
)

# Determine the maximum number of modes any row can have
max_modes = len(self.columns)

# Reindex meta to have the maximum number of columns
meta = meta.reindex(columns=range(max_modes))

# Apply map_partitions using pandas' mode function directly
return self.map_partitions(
lambda df: df.mode(
axis=1, numeric_only=numeric_only, dropna=dropna
).reindex(columns=range(max_modes)),
meta=meta,
)
else:
raise ValueError(f"No axis named {axis} for object type {type(self)}")

@derived_from(pd.DataFrame)
def add_prefix(self, prefix):
Expand Down
32 changes: 20 additions & 12 deletions dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1269,9 +1269,11 @@ def _simplify_up(self, parent, dependents):
columns = _convert_to_list(columns)
frame_columns = set(self.frame.columns)
columns = [
reverse_mapping[col]
if col in reverse_mapping and reverse_mapping[col] in frame_columns
else col
(
reverse_mapping[col]
if col in reverse_mapping and reverse_mapping[col] in frame_columns
else col
)
for col in columns
]
columns = [col for col in self.frame.columns if col in columns]
Expand Down Expand Up @@ -2342,9 +2344,11 @@ class AddPrefix(Elemwise):
@functools.cached_property
def unique_partition_mapping_columns_from_shuffle(self):
return {
f"{self.prefix}{c}"
if not isinstance(c, tuple)
else tuple(self.prefix + t for t in c)
(
f"{self.prefix}{c}"
if not isinstance(c, tuple)
else tuple(self.prefix + t for t in c)
)
for c in self.frame.unique_partition_mapping_columns_from_shuffle
}

Expand Down Expand Up @@ -2373,9 +2377,11 @@ class AddSuffix(AddPrefix):
@functools.cached_property
def unique_partition_mapping_columns_from_shuffle(self):
return {
f"{c}{self.suffix}"
if not isinstance(c, tuple)
else tuple(t + self.suffix for t in c)
(
f"{c}{self.suffix}"
if not isinstance(c, tuple)
else tuple(t + self.suffix for t in c)
)
for c in self.frame.unique_partition_mapping_columns_from_shuffle
}

Expand Down Expand Up @@ -2421,9 +2427,11 @@ def _task(self, index: int):
def _simplify_down(self):
if isinstance(self.frame, Elemwise):
operands = [
Head(op, self.n, self.operand("npartitions"))
if isinstance(op, Expr) and not isinstance(op, _DelayedExpr)
else op
(
Head(op, self.n, self.operand("npartitions"))
if isinstance(op, Expr) and not isinstance(op, _DelayedExpr)
else op
)
for op in self.frame.operands
]
return type(self.frame)(*operands)
Expand Down
Loading