Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Overwrite partitions mode #3687

Merged
merged 3 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@
self,
root_dir: Union[str, pathlib.Path],
compression: str = "snappy",
write_mode: Literal["append", "overwrite"] = "append",
write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
partition_cols: Optional[List[ColumnInputType]] = None,
io_config: Optional[IOConfig] = None,
) -> "DataFrame":
Expand All @@ -566,7 +566,7 @@
Args:
root_dir (str): root file path to write parquet files to.
compression (str, optional): compression algorithm. Defaults to "snappy".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. Defaults to "append".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace the contents of the root directory with new data. `overwrite-partitions` will replace only the contents in the partitions that are being written to. Defaults to "append".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.

Expand All @@ -576,8 +576,12 @@
.. NOTE::
This call is **blocking** and will execute the DataFrame when called
"""
if write_mode not in ["append", "overwrite"]:
raise ValueError(f"Only support `append` or `overwrite` mode. {write_mode} is unsupported")
if write_mode not in ["append", "overwrite", "overwrite-partitions"]:
raise ValueError(

Check warning on line 580 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L580

Added line #L580 was not covered by tests
f"Only support `append`, `overwrite`, or `overwrite-partitions` mode. {write_mode} is unsupported"
)
if write_mode == "overwrite-partitions" and partition_cols is None:
raise ValueError("Partition columns must be specified to use `overwrite-partitions` mode.")

Check warning on line 584 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L584

Added line #L584 was not covered by tests

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

Expand All @@ -598,7 +602,9 @@
assert write_df._result is not None

if write_mode == "overwrite":
overwrite_files(write_df, root_dir, io_config)
overwrite_files(write_df, root_dir, io_config, False)
elif write_mode == "overwrite-partitions":
overwrite_files(write_df, root_dir, io_config, True)

if len(write_df) > 0:
# Populate and return a new disconnected DataFrame
Expand All @@ -624,7 +630,7 @@
def write_csv(
self,
root_dir: Union[str, pathlib.Path],
write_mode: Literal["append", "overwrite"] = "append",
write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
partition_cols: Optional[List[ColumnInputType]] = None,
io_config: Optional[IOConfig] = None,
) -> "DataFrame":
Expand All @@ -637,15 +643,19 @@

Args:
root_dir (str): root file path to write parquet files to.
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. Defaults to "append".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace the contents of the root directory with new data. `overwrite-partitions` will replace only the contents in the partitions that are being written to. Defaults to "append".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.

Returns:
DataFrame: The filenames that were written out as strings.
"""
if write_mode not in ["append", "overwrite"]:
raise ValueError(f"Only support `append` or `overwrite` mode. {write_mode} is unsupported")
if write_mode not in ["append", "overwrite", "overwrite-partitions"]:
raise ValueError(

Check warning on line 654 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L654

Added line #L654 was not covered by tests
f"Only support `append`, `overwrite`, or `overwrite-partitions` mode. {write_mode} is unsupported"
)
if write_mode == "overwrite-partitions" and partition_cols is None:
raise ValueError("Partition columns must be specified to use `overwrite-partitions` mode.")

Check warning on line 658 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L658

Added line #L658 was not covered by tests

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

Expand All @@ -665,7 +675,9 @@
assert write_df._result is not None

if write_mode == "overwrite":
overwrite_files(write_df, root_dir, io_config)
overwrite_files(write_df, root_dir, io_config, False)
elif write_mode == "overwrite-partitions":
overwrite_files(write_df, root_dir, io_config, True)

if len(write_df) > 0:
# Populate and return a new disconnected DataFrame
Expand Down
38 changes: 30 additions & 8 deletions daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,41 @@
manifest: DataFrame,
root_dir: str | pathlib.Path,
io_config: IOConfig | None,
overwrite_partitions: bool,
) -> None:
[resolved_path], fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
file_selector = pafs.FileSelector(resolved_path, recursive=True)
try:
paths = [info.path for info in fs.get_file_info(file_selector) if info.type == pafs.FileType.File]
except FileNotFoundError:
# The root directory does not exist, so there are no files to delete.
return

all_file_paths_df = from_pydict({"path": paths})

assert manifest._result is not None
written_file_paths = manifest._result._get_merged_micropartition().get_column("path")

all_file_paths = []
if overwrite_partitions:
# Get all files in ONLY the directories that were written to.

written_dirs = set(str(pathlib.Path(path).parent) for path in written_file_paths.to_pylist())
for dir in written_dirs:
file_selector = pafs.FileSelector(dir, recursive=True)
try:
all_file_paths.extend(
[info.path for info in fs.get_file_info(file_selector) if info.type == pafs.FileType.File]
)
except FileNotFoundError:
continue

Check warning on line 390 in daft/filesystem.py

View check run for this annotation

Codecov / codecov/patch

daft/filesystem.py#L389-L390

Added lines #L389 - L390 were not covered by tests
Comment on lines +378 to +390
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@desmondcheongzx new implementation ready for review, where we only look for files to delete IF they are in the partition directories.

else:
# Get all files in the root directory.

file_selector = pafs.FileSelector(resolved_path, recursive=True)
try:
all_file_paths.extend(
[info.path for info in fs.get_file_info(file_selector) if info.type == pafs.FileType.File]
)
except FileNotFoundError:
# The root directory does not exist, so there are no files to delete.
return

all_file_paths_df = from_pydict({"path": all_file_paths})

# Find the files that were not written to in this run and delete them.
to_delete = all_file_paths_df.where(~(col("path").is_in(lit(written_file_paths))))

# TODO: Look into parallelizing this
Expand Down
Loading
Loading