Skip to content

Commit

Permalink
feat: Overwrite partitions mode (#3687)
Browse files Browse the repository at this point in the history
Closes #1768

Overwrite-partitions mode will only overwrite files in the partition
directories that were written into as part of the write operation. E.g.
partition "A" will be overwritten if and only if partition "A" was
written into.

This PR also refactors the test code a bit.

---------

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Jan 17, 2025
1 parent a7dd7ef commit 412cef4
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 92 deletions.
32 changes: 22 additions & 10 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ def write_parquet(
self,
root_dir: Union[str, pathlib.Path],
compression: str = "snappy",
write_mode: Literal["append", "overwrite"] = "append",
write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
partition_cols: Optional[List[ColumnInputType]] = None,
io_config: Optional[IOConfig] = None,
) -> "DataFrame":
Expand All @@ -566,7 +566,7 @@ def write_parquet(
Args:
root_dir (str): root file path to write parquet files to.
compression (str, optional): compression algorithm. Defaults to "snappy".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. Defaults to "append".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace the contents of the root directory with new data. `overwrite-partitions` will replace only the contents in the partitions that are being written to. Defaults to "append".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.
Expand All @@ -576,8 +576,12 @@ def write_parquet(
.. NOTE::
This call is **blocking** and will execute the DataFrame when called
"""
if write_mode not in ["append", "overwrite"]:
raise ValueError(f"Only support `append` or `overwrite` mode. {write_mode} is unsupported")
if write_mode not in ["append", "overwrite", "overwrite-partitions"]:
raise ValueError(
f"Only support `append`, `overwrite`, or `overwrite-partitions` mode. {write_mode} is unsupported"
)
if write_mode == "overwrite-partitions" and partition_cols is None:
raise ValueError("Partition columns must be specified to use `overwrite-partitions` mode.")

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

Expand All @@ -598,7 +602,9 @@ def write_parquet(
assert write_df._result is not None

if write_mode == "overwrite":
overwrite_files(write_df, root_dir, io_config)
overwrite_files(write_df, root_dir, io_config, False)
elif write_mode == "overwrite-partitions":
overwrite_files(write_df, root_dir, io_config, True)

if len(write_df) > 0:
# Populate and return a new disconnected DataFrame
Expand All @@ -624,7 +630,7 @@ def write_parquet(
def write_csv(
self,
root_dir: Union[str, pathlib.Path],
write_mode: Literal["append", "overwrite"] = "append",
write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
partition_cols: Optional[List[ColumnInputType]] = None,
io_config: Optional[IOConfig] = None,
) -> "DataFrame":
Expand All @@ -637,15 +643,19 @@ def write_csv(
Args:
root_dir (str): root file path to write parquet files to.
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. Defaults to "append".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace the contents of the root directory with new data. `overwrite-partitions` will replace only the contents in the partitions that are being written to. Defaults to "append".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.
Returns:
DataFrame: The filenames that were written out as strings.
"""
if write_mode not in ["append", "overwrite"]:
raise ValueError(f"Only support `append` or `overwrite` mode. {write_mode} is unsupported")
if write_mode not in ["append", "overwrite", "overwrite-partitions"]:
raise ValueError(
f"Only support `append`, `overwrite`, or `overwrite-partitions` mode. {write_mode} is unsupported"
)
if write_mode == "overwrite-partitions" and partition_cols is None:
raise ValueError("Partition columns must be specified to use `overwrite-partitions` mode.")

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

Expand All @@ -665,7 +675,9 @@ def write_csv(
assert write_df._result is not None

if write_mode == "overwrite":
overwrite_files(write_df, root_dir, io_config)
overwrite_files(write_df, root_dir, io_config, False)
elif write_mode == "overwrite-partitions":
overwrite_files(write_df, root_dir, io_config, True)

if len(write_df) > 0:
# Populate and return a new disconnected DataFrame
Expand Down
38 changes: 30 additions & 8 deletions daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,41 @@ def overwrite_files(
manifest: DataFrame,
root_dir: str | pathlib.Path,
io_config: IOConfig | None,
overwrite_partitions: bool,
) -> None:
[resolved_path], fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
file_selector = pafs.FileSelector(resolved_path, recursive=True)
try:
paths = [info.path for info in fs.get_file_info(file_selector) if info.type == pafs.FileType.File]
except FileNotFoundError:
# The root directory does not exist, so there are no files to delete.
return

all_file_paths_df = from_pydict({"path": paths})

assert manifest._result is not None
written_file_paths = manifest._result._get_merged_micropartition().get_column("path")

all_file_paths = []
if overwrite_partitions:
# Get all files in ONLY the directories that were written to.

written_dirs = set(str(pathlib.Path(path).parent) for path in written_file_paths.to_pylist())
for dir in written_dirs:
file_selector = pafs.FileSelector(dir, recursive=True)
try:
all_file_paths.extend(
[info.path for info in fs.get_file_info(file_selector) if info.type == pafs.FileType.File]
)
except FileNotFoundError:
continue
else:
# Get all files in the root directory.

file_selector = pafs.FileSelector(resolved_path, recursive=True)
try:
all_file_paths.extend(
[info.path for info in fs.get_file_info(file_selector) if info.type == pafs.FileType.File]
)
except FileNotFoundError:
# The root directory does not exist, so there are no files to delete.
return

all_file_paths_df = from_pydict({"path": all_file_paths})

# Find the files that were not written to in this run and delete them.
to_delete = all_file_paths_df.where(~(col("path").is_in(lit(written_file_paths))))

# TODO: Look into parallelizing this
Expand Down
Loading

0 comments on commit 412cef4

Please sign in to comment.