Skip to content

Commit

Permalink
fix rare race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
JSabadin committed Jan 8, 2025
1 parent 2ec48a9 commit 2c49d12
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions luxonis_ml/data/datasets/luxonis_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,13 @@ def __init__(
else:
self.fs = LuxonisFileSystem(self.path)

self.metadata = cast(Metadata, defaultdict(dict, self._get_metadata()))
_lock_metadata = self.base_path / ".metadata.lock"
with FileLock(
str(_lock_metadata)
): # DDP GCS training - multiple processes
self.metadata = cast(
Metadata, defaultdict(dict, self._get_metadata())
)

if self.version != LDF_VERSION:
logger.warning(
Expand Down Expand Up @@ -228,6 +234,8 @@ def _load_df_offline(
def _load_df_offline(
self, lazy: bool = False
) -> Optional[Union[pl.DataFrame, pl.LazyFrame]]:
"""Loads the dataset DataFrame **always** from the local
storage."""
path = (
self.base_path
/ "data"
Expand Down Expand Up @@ -287,14 +295,13 @@ def _get_file_index(
def _get_file_index(
self, lazy: bool = False
) -> Optional[Union[pl.DataFrame, pl.LazyFrame]]:
path = (
self.base_path
/ "data"
/ self.team_id
/ "datasets"
/ self.dataset_name
/ "metadata"
/ "file_index.parquet"
"""Loads the file index DataFrame from the local storage or the
cloud.
If loads from cloud it always downloads before loading.
"""
path = get_file(
self.fs, "metadata/file_index.parquet", self.metadata_path
)
if path is not None and path.exists():
if not lazy:
Expand Down Expand Up @@ -342,6 +349,11 @@ def _init_credentials(self) -> Dict[str, Any]:
return {}

def _get_metadata(self) -> Metadata:
"""Loads metadata from local storage or cloud, depending on the
BucketStorage type.
If loads from cloud it always downloads before loading.
"""
if self.fs.exists("metadata/metadata.json"):
path = get_file(
self.fs,
Expand Down Expand Up @@ -456,7 +468,7 @@ def sync_from_cloud(

lock_path = local_dir / ".sync.lock"

with FileLock(str(lock_path)):
with FileLock(str(lock_path)): # DDP GCS training - multiple processes
any_subfolder_empty = any(
subfolder.is_dir() and not any(subfolder.iterdir())
for subfolder in (local_dir / self.dataset_name).iterdir()
Expand Down

0 comments on commit 2c49d12

Please sign in to comment.