Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Large Dataset ParquetFileManager Error #74

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 8 additions & 34 deletions luxonis_ml/data/utils/parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import io
import os
from typing import Dict, Tuple

Expand Down Expand Up @@ -26,28 +25,11 @@ def __init__(
self.files = os.listdir(self.dir)
self.num_rows = num_rows

if len(self.files):
self.num = self._find_num()
self.current_file = self._get_current_parquet_file()
else:
self.num = 0
new_filename, self.current_file = self._generate_filename(self.num)
self.files = [new_filename]
self.num = self._find_num() if len(self.files) else 0
self.current_file = self._generate_filename(self.num)

self._read()

def _get_current_parquet_file(self) -> str:
"""Finds the best parquet file to edit based on the file size and most last
write time."""

path = self._generate_filename(self.num)[1]
current_size = os.path.getsize(path) / (1024 * 1024)
if current_size < self.file_size:
return path
else:
self.num += 1
return self._generate_filename(self.num)[1]

def _find_num(self) -> int:
nums = [
int(os.path.splitext(file)[0])
Expand All @@ -59,7 +41,7 @@ def _find_num(self) -> int:
def _generate_filename(self, num: int) -> Tuple[str, str]:
filename = f"{str(num).zfill(10)}.parquet"
path = os.path.join(self.dir, filename)
return filename, path
return path

def _read(self) -> Dict:
if os.path.exists(self.current_file):
Expand All @@ -75,11 +57,6 @@ def _initialize_data(self, data: Dict) -> None:
for key in data:
self.data[key] = []

def _estimate_file_size(self, df: pd.DataFrame) -> float:
with io.BytesIO() as buffer:
df.to_parquet(buffer)
return buffer.tell() / (1024 * 1024)

def write(self, add_data: Dict) -> None:
"""Writes a row to the current working parquet file.

Expand All @@ -94,16 +71,13 @@ def write(self, add_data: Dict) -> None:
if key not in self.data:
raise Exception(f"Key {key} Not Found")
self.data[key].append(add_data[key])
self.row_count += 1

self.row_count += 1
if self.row_count % self.num_rows == 0:
df = pd.DataFrame(self.data)
estimated_size = self._estimate_file_size(df)
if estimated_size > self.file_size:
self.close()
self.num += 1
self.current_file = self._generate_filename(self.num)[1]
self._read()
self.close()
self.num += 1
self.current_file = self._generate_filename(self.num)
self._read()

def close(self) -> None:
"""Ensures all data is written to parquet."""
Expand Down
Loading