diff --git a/luxonis_ml/data/datasets/luxonis_dataset.py b/luxonis_ml/data/datasets/luxonis_dataset.py index 09bf8ad6..47895b6a 100644 --- a/luxonis_ml/data/datasets/luxonis_dataset.py +++ b/luxonis_ml/data/datasets/luxonis_dataset.py @@ -135,7 +135,13 @@ def __init__( else: self.fs = LuxonisFileSystem(self.path) - self.metadata = cast(Metadata, defaultdict(dict, self._get_metadata())) + _lock_metadata = self.base_path / ".metadata.lock" + with FileLock( + str(_lock_metadata) + ): # DDP GCS training - multiple processes + self.metadata = cast( + Metadata, defaultdict(dict, self._get_metadata()) + ) if self.version != LDF_VERSION: logger.warning( @@ -228,6 +234,8 @@ def _load_df_offline( def _load_df_offline( self, lazy: bool = False ) -> Optional[Union[pl.DataFrame, pl.LazyFrame]]: + """Loads the dataset DataFrame **always** from the local + storage.""" path = ( self.base_path / "data" @@ -287,14 +295,13 @@ def _get_file_index( def _get_file_index( self, lazy: bool = False ) -> Optional[Union[pl.DataFrame, pl.LazyFrame]]: - path = ( - self.base_path - / "data" - / self.team_id - / "datasets" - / self.dataset_name - / "metadata" - / "file_index.parquet" + """Loads the file index DataFrame from the local storage or the + cloud. + + If loads from cloud it always downloads before loading. + """ + path = get_file( + self.fs, "metadata/file_index.parquet", self.metadata_path ) if path is not None and path.exists(): if not lazy: @@ -342,6 +349,11 @@ def _init_credentials(self) -> Dict[str, Any]: return {} def _get_metadata(self) -> Metadata: + """Loads metadata from local storage or cloud, depending on the + BucketStorage type. + + If loads from cloud it always downloads before loading. + """ if self.fs.exists("metadata/metadata.json"): path = get_file( self.fs, @@ -456,7 +468,7 @@ def sync_from_cloud( lock_path = local_dir / ".sync.lock" - with FileLock(str(lock_path)): + with FileLock(str(lock_path)): # DDP GCS training - multiple processes any_subfolder_empty = any( subfolder.is_dir() and not any(subfolder.iterdir()) for subfolder in (local_dir / self.dataset_name).iterdir()