From a414b7e4468409fdd7df2903aaa2cc1dcd2f5545 Mon Sep 17 00:00:00 2001 From: Jernej Sabadin Date: Mon, 6 Jan 2025 11:34:34 +0100 Subject: [PATCH 01/10] feat: skip dataset re-download --- luxonis_ml/data/datasets/luxonis_dataset.py | 13 ++++++++++--- luxonis_ml/data/loaders/luxonis_loader.py | 8 +++++++- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/luxonis_ml/data/datasets/luxonis_dataset.py b/luxonis_ml/data/datasets/luxonis_dataset.py index fb43ae2a..7f32fbf4 100644 --- a/luxonis_ml/data/datasets/luxonis_dataset.py +++ b/luxonis_ml/data/datasets/luxonis_dataset.py @@ -418,15 +418,22 @@ def get_skeletons( def get_tasks(self) -> List[str]: return self.metadata.get("tasks", []) - def sync_from_cloud(self, force: bool = False) -> None: + def sync_from_cloud( + self, force: bool = False, skip_redownload_dataset: bool = False + ) -> None: """Downloads data from a remote cloud bucket.""" - if not self.is_remote: logger.warning("This is a local dataset! Cannot sync") else: + local_dir = self.base_path / "data" / self.team_id / "datasets" + if local_dir.exists() and skip_redownload_dataset and not force: + logger.info( + "Local dataset directory already exists. Skipping download." + ) + return + if not self._is_synced or force: logger.info("Syncing from cloud...") - local_dir = self.base_path / "data" / self.team_id / "datasets" local_dir.mkdir(exist_ok=True, parents=True) self.fs.get_dir(remote_paths="", local_dir=local_dir) diff --git a/luxonis_ml/data/loaders/luxonis_loader.py b/luxonis_ml/data/loaders/luxonis_loader.py index 91a679f2..385b446a 100644 --- a/luxonis_ml/data/loaders/luxonis_loader.py +++ b/luxonis_ml/data/loaders/luxonis_loader.py @@ -49,6 +49,7 @@ def __init__( out_image_format: Literal["RGB", "BGR"] = "RGB", *, force_resync: bool = False, + skip_redownload_dataset: bool = False, ) -> None: """A loader class used for loading data from L{LuxonisDataset}. @@ -87,6 +88,8 @@ def __init__( @type force_resync: bool @param force_resync: Flag to force resync from cloud. Defaults to C{False}. + @param skip_redownload_dataset: If True, skip downloading when local dataset + already exists. If False, force redownload (unless force_resync is True). """ self.logger = logging.getLogger(__name__) @@ -96,7 +99,10 @@ def __init__( self.sync_mode = self.dataset.is_remote if self.sync_mode: - self.dataset.sync_from_cloud(force=force_resync) + self.dataset.sync_from_cloud( + force=force_resync, + skip_redownload_dataset=skip_redownload_dataset, + ) if isinstance(view, str): view = [view] From 5ded5615f87583da581f886fbb827e8b7951c697 Mon Sep 17 00:00:00 2001 From: Jernej Sabadin Date: Mon, 6 Jan 2025 12:42:10 +0100 Subject: [PATCH 02/10] feat: add FileLock --- luxonis_ml/data/datasets/luxonis_dataset.py | 24 ++++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/luxonis_ml/data/datasets/luxonis_dataset.py b/luxonis_ml/data/datasets/luxonis_dataset.py index 7f32fbf4..6f9ba523 100644 --- a/luxonis_ml/data/datasets/luxonis_dataset.py +++ b/luxonis_ml/data/datasets/luxonis_dataset.py @@ -26,6 +26,7 @@ import numpy as np import polars as pl import pyarrow.parquet as pq +from filelock import FileLock from ordered_set import OrderedSet from semver.version import Version from typing_extensions import Self, override @@ -423,10 +424,20 @@ def sync_from_cloud( ) -> None: """Downloads data from a remote cloud bucket.""" if not self.is_remote: - logger.warning("This is a local dataset! Cannot sync") - else: - local_dir = self.base_path / "data" / self.team_id / "datasets" - if local_dir.exists() and skip_redownload_dataset and not force: + logger.warning("This is a local dataset! Cannot sync.") + return + + local_dir = self.base_path / "data" / self.team_id / "datasets" + local_dir.mkdir(exist_ok=True, parents=True) + + lock_path = local_dir / ".sync.lock" + + with FileLock(str(lock_path)): + if ( + (local_dir / self.dataset_name).exists() + and skip_redownload_dataset + and not force + ): logger.info( "Local dataset directory already exists. Skipping download." ) @@ -434,13 +445,10 @@ def sync_from_cloud( if not self._is_synced or force: logger.info("Syncing from cloud...") - local_dir.mkdir(exist_ok=True, parents=True) - self.fs.get_dir(remote_paths="", local_dir=local_dir) - self._is_synced = True else: - logger.warning("Already synced. Use force=True to resync") + logger.warning("Already synced. Use force=True to resync.") @override def delete_dataset(self, *, delete_remote: bool = False) -> None: From 7135911f8fdb73bc252d2cf31748e3cfc0bc8258 Mon Sep 17 00:00:00 2001 From: Jernej Sabadin Date: Mon, 6 Jan 2025 14:22:42 +0100 Subject: [PATCH 03/10] fix bug --- luxonis_ml/data/datasets/luxonis_dataset.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/luxonis_ml/data/datasets/luxonis_dataset.py b/luxonis_ml/data/datasets/luxonis_dataset.py index 6f9ba523..69458760 100644 --- a/luxonis_ml/data/datasets/luxonis_dataset.py +++ b/luxonis_ml/data/datasets/luxonis_dataset.py @@ -433,8 +433,12 @@ def sync_from_cloud( lock_path = local_dir / ".sync.lock" with FileLock(str(lock_path)): + any_subfolder_empty = any( + subfolder.is_dir() and not any(subfolder.iterdir()) + for subfolder in (local_dir / self.dataset_name).iterdir() + ) if ( - (local_dir / self.dataset_name).exists() + not any_subfolder_empty and skip_redownload_dataset and not force ): From aab2522d87ece148f475f973847fa9b459ebd2c9 Mon Sep 17 00:00:00 2001 From: Jernej Sabadin Date: Tue, 7 Jan 2025 07:19:22 +0100 Subject: [PATCH 04/10] fix: remove redundant annotation download (already handled by sync_from_cloud) --- luxonis_ml/data/datasets/luxonis_dataset.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/luxonis_ml/data/datasets/luxonis_dataset.py b/luxonis_ml/data/datasets/luxonis_dataset.py index 69458760..f991267b 100644 --- a/luxonis_ml/data/datasets/luxonis_dataset.py +++ b/luxonis_ml/data/datasets/luxonis_dataset.py @@ -227,9 +227,16 @@ def _load_df_offline( def _load_df_offline( self, lazy: bool = False ) -> Optional[Union[pl.DataFrame, pl.LazyFrame]]: - path = get_dir(self.fs, "annotations", self.local_path) + path = ( + self.base_path + / "data" + / self.team_id + / "datasets" + / self.dataset_name + / "annotations" + ) - if path is None or not path.exists(): + if not path.exists(): return None if lazy: From 4a745ada5ed25dc920f937365ac21e85f695d5b5 Mon Sep 17 00:00:00 2001 From: Jernej Sabadin Date: Tue, 7 Jan 2025 07:21:19 +0100 Subject: [PATCH 05/10] tracker hotfix --- luxonis_ml/tracker/tracker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/luxonis_ml/tracker/tracker.py b/luxonis_ml/tracker/tracker.py index fc5ff774..7e48aee8 100644 --- a/luxonis_ml/tracker/tracker.py +++ b/luxonis_ml/tracker/tracker.py @@ -2,6 +2,7 @@ import json import logging import os +import time from functools import wraps from importlib.util import find_spec from pathlib import Path @@ -139,6 +140,7 @@ def __init__( if rank == 0: self.run_name = self._get_run_name() else: + time.sleep(1) self.run_name = self._get_latest_run_name() Path(f"{self.save_directory}/{self.run_name}").mkdir( From f86d8d5fff4391b9efbcb5f24b56d9fac282edba Mon Sep 17 00:00:00 2001 From: Jernej Sabadin Date: Tue, 7 Jan 2025 07:25:13 +0100 Subject: [PATCH 06/10] hotfix comment --- luxonis_ml/tracker/tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luxonis_ml/tracker/tracker.py b/luxonis_ml/tracker/tracker.py index 7e48aee8..3d5486cb 100644 --- a/luxonis_ml/tracker/tracker.py +++ b/luxonis_ml/tracker/tracker.py @@ -140,7 +140,7 @@ def __init__( if rank == 0: self.run_name = self._get_run_name() else: - time.sleep(1) + time.sleep(1) # DDP hotfix self.run_name = self._get_latest_run_name() Path(f"{self.save_directory}/{self.run_name}").mkdir( From f92881d56fae2821193b7880a69f426576ea35a6 Mon Sep 17 00:00:00 2001 From: Jernej Sabadin Date: Tue, 7 Jan 2025 13:42:51 +0100 Subject: [PATCH 07/10] refactor sync_from_cloud for improved clarity --- luxonis_ml/data/datasets/__init__.py | 3 ++- luxonis_ml/data/datasets/luxonis_dataset.py | 25 ++++++++++++--------- luxonis_ml/data/loaders/luxonis_loader.py | 17 +++++--------- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/luxonis_ml/data/datasets/__init__.py b/luxonis_ml/data/datasets/__init__.py index 6955783b..ecefc974 100644 --- a/luxonis_ml/data/datasets/__init__.py +++ b/luxonis_ml/data/datasets/__init__.py @@ -8,7 +8,7 @@ load_annotation, ) from .base_dataset import DATASETS_REGISTRY, BaseDataset, DatasetIterator -from .luxonis_dataset import LuxonisDataset +from .luxonis_dataset import LuxonisDataset, UpdateMode from .source import LuxonisComponent, LuxonisSource __all__ = [ @@ -25,4 +25,5 @@ "load_annotation", "Detection", "ArrayAnnotation", + "UpdateMode", ] diff --git a/luxonis_ml/data/datasets/luxonis_dataset.py b/luxonis_ml/data/datasets/luxonis_dataset.py index f991267b..917ff06c 100644 --- a/luxonis_ml/data/datasets/luxonis_dataset.py +++ b/luxonis_ml/data/datasets/luxonis_dataset.py @@ -5,6 +5,7 @@ import tempfile from collections import defaultdict from contextlib import suppress +from enum import Enum from functools import cached_property from pathlib import Path from typing import ( @@ -68,6 +69,11 @@ class Metadata(TypedDict): skeletons: Dict[str, Skeletons] +class UpdateMode(Enum): + ALWAYS = "always" + IF_EMPTY = "if_empty" + + class LuxonisDataset(BaseDataset): def __init__( self, @@ -427,11 +433,12 @@ def get_tasks(self) -> List[str]: return self.metadata.get("tasks", []) def sync_from_cloud( - self, force: bool = False, skip_redownload_dataset: bool = False + self, update_mode: UpdateMode = UpdateMode.IF_EMPTY ) -> None: """Downloads data from a remote cloud bucket.""" + if not self.is_remote: - logger.warning("This is a local dataset! Cannot sync.") + logger.warning("This is a local dataset! Cannot sync from cloud.") return local_dir = self.base_path / "data" / self.team_id / "datasets" @@ -443,23 +450,21 @@ def sync_from_cloud( any_subfolder_empty = any( subfolder.is_dir() and not any(subfolder.iterdir()) for subfolder in (local_dir / self.dataset_name).iterdir() + if subfolder.is_dir() ) - if ( - not any_subfolder_empty - and skip_redownload_dataset - and not force - ): + if update_mode == UpdateMode.IF_EMPTY and not any_subfolder_empty: logger.info( "Local dataset directory already exists. Skipping download." ) return - - if not self._is_synced or force: + if update_mode == UpdateMode.ALWAYS or not self._is_synced: logger.info("Syncing from cloud...") self.fs.get_dir(remote_paths="", local_dir=local_dir) self._is_synced = True else: - logger.warning("Already synced. Use force=True to resync.") + logger.warning( + "Already synced. Use update_mode=ALWAYS to resync." + ) @override def delete_dataset(self, *, delete_remote: bool = False) -> None: diff --git a/luxonis_ml/data/loaders/luxonis_loader.py b/luxonis_ml/data/loaders/luxonis_loader.py index 385b446a..05321a32 100644 --- a/luxonis_ml/data/loaders/luxonis_loader.py +++ b/luxonis_ml/data/loaders/luxonis_loader.py @@ -18,6 +18,7 @@ from luxonis_ml.data.datasets import ( Annotation, LuxonisDataset, + UpdateMode, load_annotation, ) from luxonis_ml.data.loaders.base_loader import BaseLoader @@ -48,8 +49,7 @@ def __init__( keep_aspect_ratio: bool = True, out_image_format: Literal["RGB", "BGR"] = "RGB", *, - force_resync: bool = False, - skip_redownload_dataset: bool = False, + update_mode: UpdateMode = UpdateMode.ALWAYS, ) -> None: """A loader class used for loading data from L{LuxonisDataset}. @@ -85,11 +85,9 @@ def __init__( @type width: Optional[int] @param width: The width of the output images. Defaults to C{None}. - @type force_resync: bool - @param force_resync: Flag to force resync from cloud. Defaults - to C{False}. - @param skip_redownload_dataset: If True, skip downloading when local dataset - already exists. If False, force redownload (unless force_resync is True). + @param update_mode: Enum that determines the sync mode: + - UpdateMode.ALWAYS: Force a fresh download + - UpdateMode.IF_EMPTY: Skip downloading if local data exists """ self.logger = logging.getLogger(__name__) @@ -99,10 +97,7 @@ def __init__( self.sync_mode = self.dataset.is_remote if self.sync_mode: - self.dataset.sync_from_cloud( - force=force_resync, - skip_redownload_dataset=skip_redownload_dataset, - ) + self.dataset.sync_from_cloud(update_mode=update_mode) if isinstance(view, str): view = [view] From 05f64c801dd0ff8c9e5768d9f7316c06b3d811d3 Mon Sep 17 00:00:00 2001 From: Jernej Sabadin Date: Tue, 7 Jan 2025 13:55:40 +0100 Subject: [PATCH 08/10] fix failing tests --- luxonis_ml/data/__init__.py | 2 ++ tests/test_data/test_task_ingestion.py | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/luxonis_ml/data/__init__.py b/luxonis_ml/data/__init__.py index 244124dd..ebb85c11 100644 --- a/luxonis_ml/data/__init__.py +++ b/luxonis_ml/data/__init__.py @@ -11,6 +11,7 @@ LuxonisComponent, LuxonisDataset, LuxonisSource, + UpdateMode, ) from .loaders import LOADERS_REGISTRY, BaseLoader, LuxonisLoader from .parsers import LuxonisParser @@ -46,6 +47,7 @@ def load_loader_plugins() -> None: # pragma: no cover "ImageType", "LuxonisComponent", "LuxonisDataset", + "UpdateMode", "LuxonisLoader", "LuxonisParser", "LuxonisSource", diff --git a/tests/test_data/test_task_ingestion.py b/tests/test_data/test_task_ingestion.py index 612e1bf1..1944641e 100644 --- a/tests/test_data/test_task_ingestion.py +++ b/tests/test_data/test_task_ingestion.py @@ -7,7 +7,12 @@ import numpy as np import pytest -from luxonis_ml.data import BucketStorage, LuxonisDataset, LuxonisLoader +from luxonis_ml.data import ( + BucketStorage, + LuxonisDataset, + LuxonisLoader, + UpdateMode, +) from luxonis_ml.data.utils import get_task_name, get_task_type DATA_DIR = Path("tests/data/test_task_ingestion") @@ -36,7 +41,7 @@ def make_image(i) -> Path: def compute_histogram(dataset: LuxonisDataset) -> Dict[str, int]: classes = defaultdict(int) - loader = LuxonisLoader(dataset, force_resync=True) + loader = LuxonisLoader(dataset, update_mode=UpdateMode.ALWAYS) for _, record in loader: for task, _ in record.items(): if get_task_type(task) != "classification": From 2ec48a9974c8047dbc7f4fb51bb633ad66ac6b4d Mon Sep 17 00:00:00 2001 From: Jernej Sabadin Date: Tue, 7 Jan 2025 17:00:13 +0100 Subject: [PATCH 09/10] fix docs, address additional potential race conditions --- luxonis_ml/data/datasets/luxonis_dataset.py | 28 ++++++++++++++------- luxonis_ml/data/loaders/luxonis_loader.py | 1 + luxonis_ml/data/utils/__init__.py | 3 ++- luxonis_ml/data/utils/enums.py | 7 ++++++ 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/luxonis_ml/data/datasets/luxonis_dataset.py b/luxonis_ml/data/datasets/luxonis_dataset.py index 917ff06c..09bf8ad6 100644 --- a/luxonis_ml/data/datasets/luxonis_dataset.py +++ b/luxonis_ml/data/datasets/luxonis_dataset.py @@ -5,7 +5,6 @@ import tempfile from collections import defaultdict from contextlib import suppress -from enum import Enum from functools import cached_property from pathlib import Path from typing import ( @@ -36,6 +35,7 @@ BucketStorage, BucketType, ParquetFileManager, + UpdateMode, infer_task, warn_on_duplicates, ) @@ -69,11 +69,6 @@ class Metadata(TypedDict): skeletons: Dict[str, Skeletons] -class UpdateMode(Enum): - ALWAYS = "always" - IF_EMPTY = "if_empty" - - class LuxonisDataset(BaseDataset): def __init__( self, @@ -292,8 +287,14 @@ def _get_file_index( def _get_file_index( self, lazy: bool = False ) -> Optional[Union[pl.DataFrame, pl.LazyFrame]]: - path = get_file( - self.fs, "metadata/file_index.parquet", self.metadata_path + path = ( + self.base_path + / "data" + / self.team_id + / "datasets" + / self.dataset_name + / "metadata" + / "file_index.parquet" ) if path is not None and path.exists(): if not lazy: @@ -435,8 +436,17 @@ def get_tasks(self) -> List[str]: def sync_from_cloud( self, update_mode: UpdateMode = UpdateMode.IF_EMPTY ) -> None: - """Downloads data from a remote cloud bucket.""" + """Synchronizes the dataset from a remote cloud bucket to the + local directory. + This method performs the download only if local data is empty, or always downloads + depending on the provided update_mode. + + @type update_mode: UpdateMode + @param update_mode: Specifies the update behavior. + - UpdateMode.IF_EMPTY: Downloads data only if the local dataset is empty. + - UpdateMode.ALWAYS: Always downloads and overwrites the local dataset. + """ if not self.is_remote: logger.warning("This is a local dataset! Cannot sync from cloud.") return diff --git a/luxonis_ml/data/loaders/luxonis_loader.py b/luxonis_ml/data/loaders/luxonis_loader.py index 05321a32..628a38e5 100644 --- a/luxonis_ml/data/loaders/luxonis_loader.py +++ b/luxonis_ml/data/loaders/luxonis_loader.py @@ -85,6 +85,7 @@ def __init__( @type width: Optional[int] @param width: The width of the output images. Defaults to C{None}. + @type update_mode: UpdateMode @param update_mode: Enum that determines the sync mode: - UpdateMode.ALWAYS: Force a fresh download - UpdateMode.IF_EMPTY: Skip downloading if local data exists diff --git a/luxonis_ml/data/utils/__init__.py b/luxonis_ml/data/utils/__init__.py index f11079e1..39967645 100644 --- a/luxonis_ml/data/utils/__init__.py +++ b/luxonis_ml/data/utils/__init__.py @@ -1,5 +1,5 @@ from .data_utils import infer_task, rgb_to_bool_masks, warn_on_duplicates -from .enums import BucketStorage, BucketType, ImageType, MediaType +from .enums import BucketStorage, BucketType, ImageType, MediaType, UpdateMode from .parquet import ParquetDetection, ParquetFileManager, ParquetRecord from .task_utils import ( get_task_name, @@ -24,6 +24,7 @@ "ImageType", "BucketType", "BucketStorage", + "UpdateMode", "get_task_name", "task_type_iterator", "task_is_metadata", diff --git a/luxonis_ml/data/utils/enums.py b/luxonis_ml/data/utils/enums.py index 7ce4e669..17bbe273 100644 --- a/luxonis_ml/data/utils/enums.py +++ b/luxonis_ml/data/utils/enums.py @@ -31,3 +31,10 @@ class BucketStorage(Enum): S3 = "s3" GCS = "gcs" AZURE_BLOB = "azure" + + +class UpdateMode(Enum): + """Update mode for the dataset.""" + + ALWAYS = "always" + IF_EMPTY = "if_empty" From 2c49d1229190d50fa07f5ebe6b0ce1913626ed34 Mon Sep 17 00:00:00 2001 From: Jernej Sabadin Date: Wed, 8 Jan 2025 08:02:19 +0100 Subject: [PATCH 10/10] fix rare race conditions --- luxonis_ml/data/datasets/luxonis_dataset.py | 32 ++++++++++++++------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/luxonis_ml/data/datasets/luxonis_dataset.py b/luxonis_ml/data/datasets/luxonis_dataset.py index 09bf8ad6..47895b6a 100644 --- a/luxonis_ml/data/datasets/luxonis_dataset.py +++ b/luxonis_ml/data/datasets/luxonis_dataset.py @@ -135,7 +135,13 @@ def __init__( else: self.fs = LuxonisFileSystem(self.path) - self.metadata = cast(Metadata, defaultdict(dict, self._get_metadata())) + _lock_metadata = self.base_path / ".metadata.lock" + with FileLock( + str(_lock_metadata) + ): # DDP GCS training - multiple processes + self.metadata = cast( + Metadata, defaultdict(dict, self._get_metadata()) + ) if self.version != LDF_VERSION: logger.warning( @@ -228,6 +234,8 @@ def _load_df_offline( def _load_df_offline( self, lazy: bool = False ) -> Optional[Union[pl.DataFrame, pl.LazyFrame]]: + """Loads the dataset DataFrame **always** from the local + storage.""" path = ( self.base_path / "data" @@ -287,14 +295,13 @@ def _get_file_index( def _get_file_index( self, lazy: bool = False ) -> Optional[Union[pl.DataFrame, pl.LazyFrame]]: - path = ( - self.base_path - / "data" - / self.team_id - / "datasets" - / self.dataset_name - / "metadata" - / "file_index.parquet" + """Loads the file index DataFrame from the local storage or the + cloud. + + If loads from cloud it always downloads before loading. + """ + path = get_file( + self.fs, "metadata/file_index.parquet", self.metadata_path ) if path is not None and path.exists(): if not lazy: @@ -342,6 +349,11 @@ def _init_credentials(self) -> Dict[str, Any]: return {} def _get_metadata(self) -> Metadata: + """Loads metadata from local storage or cloud, depending on the + BucketStorage type. + + If loads from cloud it always downloads before loading. + """ if self.fs.exists("metadata/metadata.json"): path = get_file( self.fs, @@ -456,7 +468,7 @@ def sync_from_cloud( lock_path = local_dir / ".sync.lock" - with FileLock(str(lock_path)): + with FileLock(str(lock_path)): # DDP GCS training - multiple processes any_subfolder_empty = any( subfolder.is_dir() and not any(subfolder.iterdir()) for subfolder in (local_dir / self.dataset_name).iterdir()