Skip to content

Commit

Permalink
Feat/merge datasets (#221)
Browse files Browse the repository at this point in the history
Co-authored-by: KlemenSkrlj <[email protected]>
  • Loading branch information
JSabadin and klemen1999 authored Jan 13, 2025
1 parent 11ee765 commit 539fde4
Show file tree
Hide file tree
Showing 5 changed files with 401 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,4 @@ jobs:
with:
name: coverage
path: coverage.xml
overwrite: true
overwrite: true
280 changes: 256 additions & 24 deletions luxonis_ml/data/datasets/luxonis_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import shutil
import tempfile
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from contextlib import suppress
from functools import cached_property
from pathlib import Path
Expand Down Expand Up @@ -221,6 +222,226 @@ def _init_paths(self) -> None:
self.dataset_name,
)

def _save_df_offline(self, pl_df: pl.DataFrame) -> None:
"""Saves the given Polars DataFrame into multiple Parquet files
using ParquetFileManager. Ensures the same structure as the
original dataset.
@type pl_df: pl.DataFrame
@param pl_df: The Polars DataFrame to save.
"""
annotations_path = Path(self.annotations_path)

for old_file in annotations_path.glob("*.parquet"):
old_file.unlink()

rows = pl_df.to_dicts()

with ParquetFileManager(annotations_path) as pfm:
for row in rows:
uuid_val = row.get("uuid")
if uuid_val is None:
raise ValueError("Missing 'uuid' in row!")

data_dict = dict(row)
data_dict.pop("uuid", None)

pfm.write(uuid_val, data_dict)

logger.info(
f"Saved merged DataFrame to Parquet files in '{annotations_path}'."
)

def _merge_metadata_with(self, other: "LuxonisDataset") -> None:
"""Merges relevant metadata from `other` into `self`."""
for key, value in other.metadata.items():
if key not in self.metadata:
self.metadata[key] = value
else:
existing_val = self.metadata[key]

if isinstance(existing_val, dict) and isinstance(value, dict):
if key == "classes":
for task_name, class_list in value.items():
if task_name not in existing_val:
existing_val[task_name] = class_list
else:
existing_val[task_name] = list(
set(existing_val[task_name]).union(
class_list
)
)
else:
existing_val.update(value)

elif (
key == "tasks"
and isinstance(existing_val, list)
and isinstance(value, list)
):
combined = set(existing_val).union(value)
self.metadata[key] = list(combined)
else:
self.metadata[key] = value
self._write_metadata()

def clone(
self, new_dataset_name: str, push_to_cloud: bool = True
) -> "LuxonisDataset":
"""Create a new LuxonisDataset that is a local copy of the
current dataset. Cloned dataset will overwrite the existing
dataset with the same name.
@type new_dataset_name: str
@param new_dataset_name: Name of the newly created dataset.
@type push_to_cloud: bool
@param push_to_cloud: Whether to push the new dataset to the
cloud. Only if the current dataset is remote.
"""

new_dataset = LuxonisDataset(
dataset_name=new_dataset_name,
team_id=self.team_id,
bucket_type=self.bucket_type,
bucket_storage=self.bucket_storage,
delete_existing=True,
delete_remote=True,
)

if self.is_remote:
self.sync_from_cloud()

new_dataset_path = Path(new_dataset.local_path)
new_dataset_path.mkdir(parents=True, exist_ok=True)
shutil.copytree(
self.local_path, new_dataset.local_path, dirs_exist_ok=True
)

new_dataset._init_paths()
new_dataset.metadata = defaultdict(dict, self._get_metadata())

new_dataset.metadata["original_dataset"] = self.dataset_name

if self.is_remote and push_to_cloud:
new_dataset.sync_to_cloud()

path = self.metadata_path / "metadata.json"
path.write_text(json.dumps(self.metadata, indent=4))

return new_dataset

def sync_to_cloud(self) -> None:
"""Uploads data to a remote cloud bucket."""
if not self.is_remote:
logger.warning("This is a local dataset! Cannot sync")
return

logger.info("Syncing to cloud...")
self.fs.put_dir(
local_paths=self.local_path, remote_dir="", copy_contents=True
)

def merge_with(
self,
other: "LuxonisDataset",
inplace: bool = True,
new_dataset_name: Optional[str] = None,
) -> "LuxonisDataset":
"""Merge all data from `other` LuxonisDataset into the current
dataset (in-place or in a new dataset).
@type other: LuxonisDataset
@param other: The dataset to merge into the current dataset.
@type inplace: bool
@param inplace: Whether to merge into the current dataset (True)
or create a new dataset (False).
@type new_dataset_name: str
@param new_dataset_name: The name of the new dataset to create
if inplace is False.
"""
if not inplace and not new_dataset_name:
raise ValueError(
"You must specify a name for the new dataset when inplace is False."
)

target_dataset = (
self
if inplace
else self.clone(new_dataset_name, push_to_cloud=False)
)

if self.is_remote:
other.sync_from_cloud(update_mode=UpdateMode.ALWAYS)
self.sync_from_cloud(
update_mode=UpdateMode.ALWAYS
if inplace
else UpdateMode.IF_EMPTY
)

df_self = self._load_df_offline()
df_other = other._load_df_offline()
duplicate_uuids = set(df_self["uuid"]).intersection(df_other["uuid"])
if duplicate_uuids:
df_other = df_other.filter(
~df_other["uuid"].is_in(duplicate_uuids)
)

df_merged = pl.concat([df_self, df_other])
target_dataset._save_df_offline(df_merged)

file_index_self = self._get_file_index()
file_index_other = other._get_file_index()
file_index_duplicates = set(file_index_self["uuid"]).intersection(
file_index_other["uuid"]
)
if file_index_duplicates:
file_index_other = file_index_other.filter(
~file_index_other["uuid"].is_in(file_index_duplicates)
)

merged_file_index = pl.concat([file_index_self, file_index_other])
if merged_file_index is not None:
file_index_path = (
target_dataset.metadata_path / "file_index.parquet"
)
merged_file_index.write_parquet(file_index_path)

splits_self = self._load_splits(self.metadata_path)
splits_other = self._load_splits(other.metadata_path)
self._merge_splits(splits_self, splits_other)
target_dataset._save_splits(splits_self)

if self.is_remote:
shutil.copytree(
other.media_path, target_dataset.media_path, dirs_exist_ok=True
)
target_dataset.sync_to_cloud()

target_dataset._merge_metadata_with(other)

return target_dataset

def _load_splits(self, path: Path) -> Dict[str, List[str]]:
splits_path = path / "splits.json"
with open(splits_path, "r") as f:
return json.load(f)

def _merge_splits(
self,
splits_self: Dict[str, List[str]],
splits_other: Dict[str, List[str]],
) -> None:
for split_name, uuids_other in splits_other.items():
if split_name not in splits_self:
splits_self[split_name] = []
combined_uuids = set(splits_self[split_name]).union(uuids_other)
splits_self[split_name] = list(combined_uuids)

def _save_splits(self, splits: Dict[str, List[str]]) -> None:
splits_path_self = self.metadata_path / "splits.json"
with open(splits_path_self, "w") as f:
json.dump(splits, f, indent=4)

@overload
def _load_df_offline(
self, lazy: Literal[False] = ...
Expand Down Expand Up @@ -284,25 +505,33 @@ def _load_df_offline(

@overload
def _get_file_index(
self, lazy: Literal[False] = ...
self, lazy: Literal[False] = ..., sync_from_cloud: bool = ...
) -> Optional[pl.DataFrame]: ...

@overload
def _get_file_index(
self, lazy: Literal[True] = ...
self, lazy: Literal[True] = ..., sync_from_cloud: bool = ...
) -> Optional[pl.LazyFrame]: ...

def _get_file_index(
self, lazy: bool = False
self, lazy: bool = False, sync_from_cloud: bool = False
) -> Optional[Union[pl.DataFrame, pl.LazyFrame]]:
"""Loads the file index DataFrame from the local storage or the
cloud.
If loads from cloud it always downloads before loading.
cloud if sync_from_cloud.
@type lazy: bool
@param lazy: Whether to return a LazyFrame instead of a
DataFrame
@type sync_from_cloud: bool
@param sync_from_cloud: Whether to sync from cloud before
loading the index
"""
path = get_file(
self.fs, "metadata/file_index.parquet", self.metadata_path
)
if sync_from_cloud:
get_file(
self.fs, "metadata/file_index.parquet", self.metadata_path
)

path = self.metadata_path / "file_index.parquet"
if path is not None and path.exists():
if not lazy:
df = pl.read_parquet(path)
Expand Down Expand Up @@ -599,7 +828,7 @@ def _add_process_batch(
def add(
self, generator: DatasetIterator, batch_size: int = 1_000_000
) -> Self:
index = self._get_file_index()
index = self._get_file_index(sync_from_cloud=True)
new_index = {"uuid": [], "file": [], "original_filepath": []}
processed_uuids = set()

Expand Down Expand Up @@ -828,7 +1057,7 @@ def make_splits(
lower_bound = upper_bound

else:
index = self._get_file_index()
index = self._get_file_index(sync_from_cloud=True)
if index is None:
raise FileNotFoundError("File index not found")
for split, filepaths in definitions.items():
Expand Down Expand Up @@ -883,22 +1112,20 @@ def list_datasets(
bucket_storage: BucketStorage = BucketStorage.LOCAL,
bucket: Optional[str] = None,
) -> List[str]:
"""Returns a dictionary of all datasets.
"""Returns a list of all datasets.
@type team_id: Optional[str]
@param team_id: Optional team identifier
@type bucket_storage: BucketStorage
@param bucket_storage: Underlying bucket storage from C{local},
C{S3}, or C{GCS}. Default is C{local}.
@param bucket_storage: Underlying bucket storage (local, S3, or
GCS). Default is local.
@type bucket: Optional[str]
@param bucket: Name of the bucket. Default is C{None}.
@param bucket: Name of the bucket. Default is None.
@rtype: List[str]
@return: List of all dataset names.
"""
base_path = environ.LUXONISML_BASE_PATH

team_id = team_id or environ.LUXONISML_TEAM_ID
names = []

if bucket_storage == BucketStorage.LOCAL:
fs = LuxonisFileSystem(
Expand All @@ -918,13 +1145,18 @@ def list_datasets(
if not fs.exists():
return []

for path in fs.walk_dir("", recursive=False, typ="directory"):
def process_directory(path: str) -> Optional[str]:
path = Path(path)
metadata_path = path / "metadata" / "metadata.json"
if not fs.exists(metadata_path):
continue
metadata_text = fs.read_text(metadata_path)
if isinstance(metadata_text, bytes):
metadata_text = metadata_text.decode()
names.append(path.name)
if fs.exists(metadata_path):
return path.name
return None

# Collect directory paths and process them in parallel
paths = list(fs.walk_dir("", recursive=False, typ="directory"))
with ThreadPoolExecutor() as executor:
names = [
name for name in executor.map(process_directory, paths) if name
]

return names
1 change: 1 addition & 0 deletions luxonis_ml/data/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ polars[timezone]>=0.20.31
ordered-set>=4.0.0
semver>=3.0.0
# roboflow>=0.1.1
filelock>=3.0.0
11 changes: 10 additions & 1 deletion luxonis_ml/utils/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ def put_dir(
remote_dir: PathType,
uuid_dict: Optional[Dict[str, str]] = None,
mlflow_instance: Optional[ModuleType] = None,
copy_contents: bool = False,
) -> Optional[Dict[str, str]]:
"""Uploads files to remote storage.
Expand All @@ -246,6 +247,9 @@ def put_dir(
@type mlflow_instance: Optional[L{ModuleType}]
@param mlflow_instance: MLFlow instance if uploading to active
run. Defaults to None.
@type copy_contents: bool
@param copy_contents: If True, only copy the content of the
folder specified in local_paths. Defaults to False.
@rtype: Optional[Dict[str, str]]
@return: When local_paths is a list, this maps local_paths to
remote_paths
Expand All @@ -257,8 +261,13 @@ def put_dir(
local_paths = Path(local_paths)
if not Path(local_paths).is_dir():
raise ValueError("Path must be a directory.")
source_path = (
str(local_paths) + "/"
if copy_contents
else str(local_paths)
)
self.fs.put(
str(local_paths),
source_path,
str(self.path / remote_dir),
recursive=True,
)
Expand Down
Loading

0 comments on commit 539fde4

Please sign in to comment.