Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/merge datasets #221

Merged
merged 21 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 252 additions & 24 deletions luxonis_ml/data/datasets/luxonis_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import shutil
import tempfile
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from contextlib import suppress
from functools import cached_property
from pathlib import Path
Expand Down Expand Up @@ -221,6 +222,222 @@
self.dataset_name,
)

def _save_df_offline(self, pl_df: pl.DataFrame) -> None:
"""Saves the given Polars DataFrame into multiple Parquet files
using ParquetFileManager. Ensures the same structure as the
original dataset.

@type pl_df: pl.DataFrame
@param pl_df: The Polars DataFrame to save.
"""
annotations_path = Path(self.annotations_path)

for old_file in annotations_path.glob("*.parquet"):
old_file.unlink()

rows = pl_df.to_dicts()

with ParquetFileManager(annotations_path, num_rows=100_000_000) as pfm:
for row in rows:
uuid_val = row.get("uuid")
if uuid_val is None:
raise ValueError("Missing 'uuid' in row!")

Check warning on line 244 in luxonis_ml/data/datasets/luxonis_dataset.py

View check run for this annotation

Codecov / codecov/patch

luxonis_ml/data/datasets/luxonis_dataset.py#L244

Added line #L244 was not covered by tests

data_dict = dict(row)
data_dict.pop("uuid", None)

pfm.write(uuid_val, data_dict)

logger.info(
f"Saved merged DataFrame to Parquet files in '{annotations_path}'."
)

def _merge_metadata_with(self, other: "LuxonisDataset") -> None:
"""Merges relevant metadata from `other` into `self`."""
for key, value in other.metadata.items():
if key not in self.metadata:
self.metadata[key] = value

Check warning on line 259 in luxonis_ml/data/datasets/luxonis_dataset.py

View check run for this annotation

Codecov / codecov/patch

luxonis_ml/data/datasets/luxonis_dataset.py#L259

Added line #L259 was not covered by tests
else:
existing_val = self.metadata[key]

if isinstance(existing_val, dict) and isinstance(value, dict):
if key == "classes":
for task_name, class_list in value.items():
if task_name not in existing_val:
existing_val[task_name] = class_list

Check warning on line 267 in luxonis_ml/data/datasets/luxonis_dataset.py

View check run for this annotation

Codecov / codecov/patch

luxonis_ml/data/datasets/luxonis_dataset.py#L267

Added line #L267 was not covered by tests
else:
existing_val[task_name] = list(
set(existing_val[task_name]).union(
class_list
)
)
else:
existing_val.update(value)

elif (
key == "tasks"
and isinstance(existing_val, list)
and isinstance(value, list)
):
combined = set(existing_val).union(value)
self.metadata[key] = list(combined)
else:
self.metadata[key] = value
self._write_metadata()

def clone(
self, new_dataset_name: str, push_to_cloud: bool = True
) -> "LuxonisDataset":
"""Create a new LuxonisDataset that is a local copy of the
current dataset.

@type new_dataset_name: str
@param new_dataset_name: Name of the newly created dataset.
@type push_to_cloud: bool
@param push_to_cloud: Whether to push the new dataset to the
cloud. Only if the current dataset is remote.
"""

new_dataset = LuxonisDataset(
dataset_name=new_dataset_name,
team_id=self.team_id,
bucket_type=self.bucket_type,
bucket_storage=self.bucket_storage,
delete_existing=True,
delete_remote=True,
)

if self.is_remote:
self.sync_from_cloud()

new_dataset_path = Path(new_dataset.local_path)
new_dataset_path.mkdir(parents=True, exist_ok=True)
shutil.copytree(
self.local_path, new_dataset.local_path, dirs_exist_ok=True
)

new_dataset._init_paths()
new_dataset.metadata = defaultdict(dict, self._get_metadata())

new_dataset.metadata["original_dataset"] = self.dataset_name

if self.is_remote and push_to_cloud:
new_dataset.sync_to_cloud()

path = self.metadata_path / "metadata.json"
path.write_text(json.dumps(self.metadata, indent=4))

return new_dataset

def sync_to_cloud(self) -> None:
"""Uploads data to a remote cloud bucket."""
if not self.is_remote:
logger.warning("This is a local dataset! Cannot sync")
return

Check warning on line 336 in luxonis_ml/data/datasets/luxonis_dataset.py

View check run for this annotation

Codecov / codecov/patch

luxonis_ml/data/datasets/luxonis_dataset.py#L336

Added line #L336 was not covered by tests

logger.info("Syncing to cloud...")
self.fs.put_dir(
local_paths=self.local_path, remote_dir="", copy_contents=True
)

def merge_with(
self,
other: "LuxonisDataset",
inplace: bool = True,
new_dataset_name: str = None,
) -> "LuxonisDataset":
"""Merge all data from `other` LuxonisDataset into the current
dataset (in-place or in a new dataset).

@type other: LuxonisDataset
@param other: The dataset to merge into the current dataset.
@type inplace: bool
@param inplace: Whether to merge into the current dataset (True)
or create a new dataset (False).
@type new_dataset_name: str
@param new_dataset_name: The name of the new dataset to create
if inplace is False.
"""
if not inplace:
if not new_dataset_name:
raise ValueError(

Check warning on line 363 in luxonis_ml/data/datasets/luxonis_dataset.py

View check run for this annotation

Codecov / codecov/patch

luxonis_ml/data/datasets/luxonis_dataset.py#L363

Added line #L363 was not covered by tests
"You must specify a name for the new dataset when inplace is False."
)
new_dataset = self.clone(new_dataset_name, push_to_cloud=False)
new_dataset.merge_with(other, inplace=True)
return new_dataset

if other.is_remote != self.is_remote:
raise ValueError(

Check warning on line 371 in luxonis_ml/data/datasets/luxonis_dataset.py

View check run for this annotation

Codecov / codecov/patch

luxonis_ml/data/datasets/luxonis_dataset.py#L371

Added line #L371 was not covered by tests
"Merging is only supported for datasets with the same bucket storage."
)

if self.is_remote:
other.sync_from_cloud(update_mode=UpdateMode.ALWAYS)
self.sync_from_cloud(update_mode=UpdateMode.IF_EMPTY)

df_self = self._load_df_offline()
df_other = other._load_df_offline()
duplicate_uuids = set(df_self["uuid"]).intersection(df_other["uuid"])
if duplicate_uuids: # skip duplicate uuids
df_other = df_other.filter(

Check warning on line 383 in luxonis_ml/data/datasets/luxonis_dataset.py

View check run for this annotation

Codecov / codecov/patch

luxonis_ml/data/datasets/luxonis_dataset.py#L383

Added line #L383 was not covered by tests
~df_other["uuid"].is_in(duplicate_uuids)
)
df_merged = pl.concat([df_self, df_other])

file_index_self = self._get_file_index()
file_index_other = other._get_file_index()
file_index_duplicates = set(file_index_self["uuid"]).intersection(
file_index_other["uuid"]
)
if file_index_duplicates: # skip duplicate uuids
file_index_other = file_index_other.filter(

Check warning on line 394 in luxonis_ml/data/datasets/luxonis_dataset.py

View check run for this annotation

Codecov / codecov/patch

luxonis_ml/data/datasets/luxonis_dataset.py#L394

Added line #L394 was not covered by tests
~file_index_other["uuid"].is_in(file_index_duplicates)
)
merged_file_index = pl.concat([file_index_self, file_index_other])

if merged_file_index is not None:
file_index_path = self.metadata_path / "file_index.parquet"
merged_file_index.write_parquet(file_index_path)

splits_self = self._load_splits(self.metadata_path)
splits_other = self._load_splits(other.metadata_path)
self._merge_splits(splits_self, splits_other)

self._save_df_offline(df_merged)
self._save_splits(splits_self)

if self.is_remote:
shutil.copytree(
other.media_path, self.media_path, dirs_exist_ok=True
)
self.sync_to_cloud()

self._merge_metadata_with(other)

return self

def _load_splits(self, path: Path) -> Dict[str, List[str]]:
splits_path = path / "splits.json"
with open(splits_path, "r") as f:
return json.load(f)

def _merge_splits(
self,
splits_self: Dict[str, List[str]],
splits_other: Dict[str, List[str]],
) -> None:
for split_name, uuids_other in splits_other.items():
if split_name not in splits_self:
splits_self[split_name] = []

Check warning on line 432 in luxonis_ml/data/datasets/luxonis_dataset.py

View check run for this annotation

Codecov / codecov/patch

luxonis_ml/data/datasets/luxonis_dataset.py#L432

Added line #L432 was not covered by tests
combined_uuids = set(splits_self[split_name]).union(uuids_other)
splits_self[split_name] = list(combined_uuids)

def _save_splits(self, splits: Dict[str, List[str]]) -> None:
splits_path_self = self.metadata_path / "splits.json"
with open(splits_path_self, "w") as f:
json.dump(splits, f, indent=4)

@overload
def _load_df_offline(
self, lazy: Literal[False] = ...
Expand Down Expand Up @@ -284,25 +501,33 @@

@overload
def _get_file_index(
self, lazy: Literal[False] = ...
self, lazy: Literal[False] = ..., sync_from_cloud: bool = ...
) -> Optional[pl.DataFrame]: ...

@overload
def _get_file_index(
self, lazy: Literal[True] = ...
self, lazy: Literal[True] = ..., sync_from_cloud: bool = ...
) -> Optional[pl.LazyFrame]: ...

def _get_file_index(
self, lazy: bool = False
self, lazy: bool = False, sync_from_cloud: bool = False
) -> Optional[Union[pl.DataFrame, pl.LazyFrame]]:
"""Loads the file index DataFrame from the local storage or the
cloud.

If loads from cloud it always downloads before loading.
cloud if sync_from_cloud.

@type lazy: bool
@param lazy: Whether to return a LazyFrame instead of a
DataFrame
@type sync_from_cloud: bool
@param sync_from_cloud: Whether to sync from cloud before
loading the index
"""
path = get_file(
self.fs, "metadata/file_index.parquet", self.metadata_path
)
if sync_from_cloud:
get_file(
self.fs, "metadata/file_index.parquet", self.metadata_path
)

path = self.metadata_path / "file_index.parquet"
if path is not None and path.exists():
if not lazy:
df = pl.read_parquet(path)
Expand Down Expand Up @@ -599,7 +824,7 @@
def add(
self, generator: DatasetIterator, batch_size: int = 1_000_000
) -> Self:
index = self._get_file_index()
index = self._get_file_index(sync_from_cloud=True)
new_index = {"uuid": [], "file": [], "original_filepath": []}
processed_uuids = set()

Expand Down Expand Up @@ -828,7 +1053,7 @@
lower_bound = upper_bound

else:
index = self._get_file_index()
index = self._get_file_index(sync_from_cloud=True)
if index is None:
raise FileNotFoundError("File index not found")
for split, filepaths in definitions.items():
Expand Down Expand Up @@ -883,22 +1108,20 @@
bucket_storage: BucketStorage = BucketStorage.LOCAL,
bucket: Optional[str] = None,
) -> List[str]:
"""Returns a dictionary of all datasets.
"""Returns a list of all datasets.

@type team_id: Optional[str]
@param team_id: Optional team identifier
@type bucket_storage: BucketStorage
@param bucket_storage: Underlying bucket storage from C{local},
C{S3}, or C{GCS}. Default is C{local}.
@param bucket_storage: Underlying bucket storage (local, S3, or
GCS). Default is local.
@type bucket: Optional[str]
@param bucket: Name of the bucket. Default is C{None}.
@param bucket: Name of the bucket. Default is None.
@rtype: List[str]
@return: List of all dataset names.
"""
base_path = environ.LUXONISML_BASE_PATH

team_id = team_id or environ.LUXONISML_TEAM_ID
names = []

if bucket_storage == BucketStorage.LOCAL:
fs = LuxonisFileSystem(
Expand All @@ -918,13 +1141,18 @@
if not fs.exists():
return []

for path in fs.walk_dir("", recursive=False, typ="directory"):
def process_directory(path: str) -> Optional[str]:
path = Path(path)
metadata_path = path / "metadata" / "metadata.json"
if not fs.exists(metadata_path):
continue
metadata_text = fs.read_text(metadata_path)
if isinstance(metadata_text, bytes):
metadata_text = metadata_text.decode()
names.append(path.name)
if fs.exists(metadata_path):
return path.name
return None

# Collect directory paths and process them in parallel
paths = list(fs.walk_dir("", recursive=False, typ="directory"))
with ThreadPoolExecutor() as executor:
names = [
name for name in executor.map(process_directory, paths) if name
]

return names
11 changes: 10 additions & 1 deletion luxonis_ml/utils/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ def put_dir(
remote_dir: PathType,
uuid_dict: Optional[Dict[str, str]] = None,
mlflow_instance: Optional[ModuleType] = None,
copy_contents: bool = False, # New argument
) -> Optional[Dict[str, str]]:
"""Uploads files to remote storage.

Expand All @@ -246,6 +247,9 @@ def put_dir(
@type mlflow_instance: Optional[L{ModuleType}]
@param mlflow_instance: MLFlow instance if uploading to active
run. Defaults to None.
@type copy_contents: bool
@param copy_contents: If True, only copy the content of the
folder specified in local_paths. Defaults to False.
@rtype: Optional[Dict[str, str]]
@return: When local_paths is a list, this maps local_paths to
remote_paths
Expand All @@ -257,8 +261,13 @@ def put_dir(
local_paths = Path(local_paths)
if not Path(local_paths).is_dir():
raise ValueError("Path must be a directory.")
source_path = (
str(local_paths) + "/"
if copy_contents
else str(local_paths)
)
self.fs.put(
str(local_paths),
source_path,
str(self.path / remote_dir),
recursive=True,
)
Expand Down
Loading
Loading