Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Overwrite partitions mode #3687

Merged
merged 3 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@
self,
root_dir: Union[str, pathlib.Path],
compression: str = "snappy",
write_mode: Literal["append", "overwrite"] = "append",
write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
partition_cols: Optional[List[ColumnInputType]] = None,
io_config: Optional[IOConfig] = None,
) -> "DataFrame":
Expand All @@ -566,7 +566,7 @@
Args:
root_dir (str): root file path to write parquet files to.
compression (str, optional): compression algorithm. Defaults to "snappy".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. Defaults to "append".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. `overwrite-partitions` will overwrite only the partitions that are being written. Defaults to "append".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.

Expand All @@ -576,8 +576,12 @@
.. NOTE::
This call is **blocking** and will execute the DataFrame when called
"""
if write_mode not in ["append", "overwrite"]:
raise ValueError(f"Only support `append` or `overwrite` mode. {write_mode} is unsupported")
if write_mode not in ["append", "overwrite", "overwrite-partitions"]:
raise ValueError(

Check warning on line 580 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L580

Added line #L580 was not covered by tests
f"Only support `append`, `overwrite`, or `overwrite-partitions` mode. {write_mode} is unsupported"
)
if write_mode == "overwrite-partitions" and partition_cols is None:
raise ValueError("Partition columns must be specified to use `overwrite-partitions` mode.")

Check warning on line 584 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L584

Added line #L584 was not covered by tests

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

Expand All @@ -598,7 +602,9 @@
assert write_df._result is not None

if write_mode == "overwrite":
overwrite_files(write_df, root_dir, io_config)
overwrite_files(write_df, root_dir, io_config, False)
elif write_mode == "overwrite-partitions":
overwrite_files(write_df, root_dir, io_config, True)

if len(write_df) > 0:
# Populate and return a new disconnected DataFrame
Expand All @@ -624,7 +630,7 @@
def write_csv(
self,
root_dir: Union[str, pathlib.Path],
write_mode: Literal["append", "overwrite"] = "append",
write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
partition_cols: Optional[List[ColumnInputType]] = None,
io_config: Optional[IOConfig] = None,
) -> "DataFrame":
Expand All @@ -637,15 +643,19 @@

Args:
root_dir (str): root file path to write parquet files to.
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. Defaults to "append".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. `overwrite-partitions` will overwrite only the partitions that are being written. Defaults to "append".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.

Returns:
DataFrame: The filenames that were written out as strings.
"""
if write_mode not in ["append", "overwrite"]:
raise ValueError(f"Only support `append` or `overwrite` mode. {write_mode} is unsupported")
if write_mode not in ["append", "overwrite", "overwrite-partitions"]:
raise ValueError(

Check warning on line 654 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L654

Added line #L654 was not covered by tests
f"Only support `append`, `overwrite`, or `overwrite-partitions` mode. {write_mode} is unsupported"
)
if write_mode == "overwrite-partitions" and partition_cols is None:
raise ValueError("Partition columns must be specified to use `overwrite-partitions` mode.")

Check warning on line 658 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L658

Added line #L658 was not covered by tests

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

Expand All @@ -665,7 +675,9 @@
assert write_df._result is not None

if write_mode == "overwrite":
overwrite_files(write_df, root_dir, io_config)
overwrite_files(write_df, root_dir, io_config, False)
elif write_mode == "overwrite-partitions":
overwrite_files(write_df, root_dir, io_config, True)

if len(write_df) > 0:
# Populate and return a new disconnected DataFrame
Expand Down
20 changes: 18 additions & 2 deletions daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from daft.convert import from_pydict
from daft.daft import FileFormat, FileInfos, IOConfig, io_glob
from daft.datatype import DataType
from daft.dependencies import fsspec, pafs
from daft.expressions.expressions import col, lit
from daft.table import MicroPartition
Expand Down Expand Up @@ -368,6 +369,7 @@ def overwrite_files(
manifest: DataFrame,
root_dir: str | pathlib.Path,
io_config: IOConfig | None,
overwrite_partitions: bool,
) -> None:
[resolved_path], fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
file_selector = pafs.FileSelector(resolved_path, recursive=True)
Expand All @@ -376,12 +378,26 @@ def overwrite_files(
except FileNotFoundError:
# The root directory does not exist, so there are no files to delete.
return

all_file_paths_df = from_pydict({"path": paths})

assert manifest._result is not None
written_file_paths = manifest._result._get_merged_micropartition().get_column("path")
to_delete = all_file_paths_df.where(~(col("path").is_in(lit(written_file_paths))))
if overwrite_partitions:
# Extract directories of written files
written_dirs = set(str(pathlib.Path(path).parent) for path in written_file_paths.to_pylist())

# Filter existing files to only those in directories where we wrote new files
to_delete = all_file_paths_df.where(
(~(col("path").is_in(lit(written_file_paths))))
& (
col("path")
.apply(lambda x: str(pathlib.Path(x).parent), return_dtype=DataType.string())
.is_in(list(written_dirs))
)
)
colin-ho marked this conversation as resolved.
Show resolved Hide resolved
else:
# If we are not overwriting partitions, we only want to delete files that were written
to_delete = all_file_paths_df.where(~(col("path").is_in(lit(written_file_paths))))

# TODO: Look into parallelizing this
for entry in to_delete:
Expand Down
Loading
Loading