Skip to content

Commit

Permalink
overwrite partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Jan 15, 2025
1 parent 5702720 commit 9bef3ae
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 86 deletions.
32 changes: 22 additions & 10 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ def write_parquet(
self,
root_dir: Union[str, pathlib.Path],
compression: str = "snappy",
write_mode: Literal["append", "overwrite"] = "append",
write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
partition_cols: Optional[List[ColumnInputType]] = None,
io_config: Optional[IOConfig] = None,
) -> "DataFrame":
Expand All @@ -566,7 +566,7 @@ def write_parquet(
Args:
root_dir (str): root file path to write parquet files to.
compression (str, optional): compression algorithm. Defaults to "snappy".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. Defaults to "append".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. `overwrite-partitions` will overwrite only the partitions that are being written. Defaults to "append".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.
Expand All @@ -576,8 +576,12 @@ def write_parquet(
.. NOTE::
This call is **blocking** and will execute the DataFrame when called
"""
if write_mode not in ["append", "overwrite"]:
raise ValueError(f"Only support `append` or `overwrite` mode. {write_mode} is unsupported")
if write_mode not in ["append", "overwrite", "overwrite-partitions"]:
raise ValueError(

Check warning on line 580 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L580

Added line #L580 was not covered by tests
f"Only support `append`, `overwrite`, or `overwrite-partitions` mode. {write_mode} is unsupported"
)
if write_mode == "overwrite-partitions" and partition_cols is None:
raise ValueError("Partition columns must be specified to use `overwrite-partitions` mode.")

Check warning on line 584 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L584

Added line #L584 was not covered by tests

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

Expand All @@ -598,7 +602,9 @@ def write_parquet(
assert write_df._result is not None

if write_mode == "overwrite":
overwrite_files(write_df, root_dir, io_config)
overwrite_files(write_df, root_dir, io_config, False)
elif write_mode == "overwrite-partitions":
overwrite_files(write_df, root_dir, io_config, True)

if len(write_df) > 0:
# Populate and return a new disconnected DataFrame
Expand All @@ -624,7 +630,7 @@ def write_parquet(
def write_csv(
self,
root_dir: Union[str, pathlib.Path],
write_mode: Literal["append", "overwrite"] = "append",
write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
partition_cols: Optional[List[ColumnInputType]] = None,
io_config: Optional[IOConfig] = None,
) -> "DataFrame":
Expand All @@ -637,15 +643,19 @@ def write_csv(
Args:
root_dir (str): root file path to write parquet files to.
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. Defaults to "append".
write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data. `overwrite-partitions` will overwrite only the partitions that are being written. Defaults to "append".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.
Returns:
DataFrame: The filenames that were written out as strings.
"""
if write_mode not in ["append", "overwrite"]:
raise ValueError(f"Only support `append` or `overwrite` mode. {write_mode} is unsupported")
if write_mode not in ["append", "overwrite", "overwrite-partitions"]:
raise ValueError(

Check warning on line 654 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L654

Added line #L654 was not covered by tests
f"Only support `append`, `overwrite`, or `overwrite-partitions` mode. {write_mode} is unsupported"
)
if write_mode == "overwrite-partitions" and partition_cols is None:
raise ValueError("Partition columns must be specified to use `overwrite-partitions` mode.")

Check warning on line 658 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L658

Added line #L658 was not covered by tests

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

Expand All @@ -665,7 +675,9 @@ def write_csv(
assert write_df._result is not None

if write_mode == "overwrite":
overwrite_files(write_df, root_dir, io_config)
overwrite_files(write_df, root_dir, io_config, False)
elif write_mode == "overwrite-partitions":
overwrite_files(write_df, root_dir, io_config, True)

if len(write_df) > 0:
# Populate and return a new disconnected DataFrame
Expand Down
20 changes: 18 additions & 2 deletions daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from daft.convert import from_pydict
from daft.daft import FileFormat, FileInfos, IOConfig, io_glob
from daft.datatype import DataType
from daft.dependencies import fsspec, pafs
from daft.expressions.expressions import col, lit
from daft.table import MicroPartition
Expand Down Expand Up @@ -368,6 +369,7 @@ def overwrite_files(
manifest: DataFrame,
root_dir: str | pathlib.Path,
io_config: IOConfig | None,
overwrite_partitions: bool,
) -> None:
[resolved_path], fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
file_selector = pafs.FileSelector(resolved_path, recursive=True)
Expand All @@ -376,12 +378,26 @@ def overwrite_files(
except FileNotFoundError:
# The root directory does not exist, so there are no files to delete.
return

all_file_paths_df = from_pydict({"path": paths})

assert manifest._result is not None
written_file_paths = manifest._result._get_merged_micropartition().get_column("path")
to_delete = all_file_paths_df.where(~(col("path").is_in(lit(written_file_paths))))
if overwrite_partitions:
# Extract directories of written files
written_dirs = set(str(pathlib.Path(path).parent) for path in written_file_paths.to_pylist())

# Filter existing files to only those in directories where we wrote new files
to_delete = all_file_paths_df.where(
(~(col("path").is_in(lit(written_file_paths))))
& (
col("path")
.apply(lambda x: str(pathlib.Path(x).parent), return_dtype=DataType.string())
.is_in(list(written_dirs))
)
)
else:
# If we are not overwriting partitions, we only want to delete files that were written
to_delete = all_file_paths_df.where(~(col("path").is_in(lit(written_file_paths))))

# TODO: Look into parallelizing this
for entry in to_delete:
Expand Down
Loading

0 comments on commit 9bef3ae

Please sign in to comment.