Skip to content

Commit

Permalink
fix: reconnecting logic and log_matrix logic
Browse files Browse the repository at this point in the history
  • Loading branch information
JSabadin committed Nov 24, 2024
1 parent db62c8b commit 4ccf57e
Showing 1 changed file with 47 additions and 18 deletions.
65 changes: 47 additions & 18 deletions luxonis_ml/tracker/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,12 @@ def __init__(
self.is_sweep = is_sweep
self.rank = rank
self.local_logs = {
"metrics": [],
"metric": [],
"params": {},
"images": [],
"artifacts": [],
"matrices": [],
"metrics": [],
}
self.mlflow_initialized = False

Expand Down Expand Up @@ -171,32 +173,44 @@ def log_to_mlflow(self, log_fn: Callable, *args, **kwargs) -> None:

def store_log_locally(self, log_fn: Callable, *args, **kwargs) -> None:
"""Stores log data locally if logging to MLflow fails."""
# Checking functions without reinitializing experiment
if log_fn == self.log_metric:
self.local_logs["metrics"].append(
# Checking functions without triggering reconnections.
if log_fn == self._experiment["mlflow"].log_metric:
self.local_logs["metric"].append(
{"name": args[0], "value": args[1], "step": args[2]}
)
elif log_fn == self.log_hyperparams:
if log_fn == self._experiment["mlflow"].log_metrics:
self.local_logs["metrics"].append(
{"metrics": args[0], "step": args[1]}
)
elif log_fn == self._experiment["mlflow"].log_params:
self.local_logs["params"].update(args[0])
elif log_fn == self.log_image:
elif log_fn == self._experiment["mlflow"].log_image:
self.local_logs["images"].append(
{"image_data": args[0], "name": args[1]}
)
elif log_fn == self.upload_artifact:
self.local_logs["artifacts"].append(
{"path": str(args[0]), "name": args[1], "type": args[2]}
)
elif log_fn == self._experiment["mlflow"].log_dict:
self.local_logs["matrices"].append(
{"matrix": args[0], "name": args[1]}
)

def log_stored_logs_to_mlflow(self):
"""Attempts to log any data stored in local_logs to MLflow."""
if not self.mlflow_initialized or not any(self.local_logs.values()):
return

try:
for metric in self.local_logs["metrics"]:
for metric in self.local_logs["metric"]:
self._experiment["mlflow"].log_metric(
metric["name"], metric["value"], metric["step"]
)
for metrics in self.local_logs["metrics"]:
self._experiment["mlflow"].log_metrics(
metrics["metrics"], metrics["step"]
)
if self.local_logs["params"]:
self._experiment["mlflow"].log_params(
self.local_logs["params"]
Expand All @@ -209,20 +223,26 @@ def log_stored_logs_to_mlflow(self):
self.upload_artifact(
Path(artifact["path"]), artifact["name"], artifact["type"]
)
for matrix in self.local_logs["matrices"]:
self._experiment["mlflow"].log_dict(
matrix["matrix"], matrix["name"]
)

self.local_logs = {
"metrics": [],
"params": {},
"images": [],
"artifacts": [],
"matrices": [],
"metric": [],
}
logger.info("Successfully re-logged stored logs to MLflow.")
except Exception as e:
logger.warning(f"Failed to re-log stored logs to MLflow: {e}")

def save_logs_locally(self):
"""Saves metrics, parameters, and artifacts to JSON and images
to separate files."""
"""Saves metrics, parameters, images, artifacts, and matrices
locally."""
run_dir = Path(self.save_directory) / self.run_name
image_dir = run_dir / "images"
artifact_dir = run_dir / "artifacts"
Expand All @@ -238,20 +258,25 @@ def save_logs_locally(self):
)
img["image_data"] = img_path # Replace data with path

# Save artifacts to local storage directory
for artifact in self.local_logs["artifacts"]:
artifact_path = Path(artifact["path"])
if artifact_path.exists():
local_path = artifact_dir / artifact_path.name
local_path.write_bytes(artifact_path.read_bytes())
artifact["path"] = str(local_path)

# Save logs to JSON file
with open(run_dir / "local_logs.json", "w") as f:
json.dump(
{
k: self.local_logs[k]
for k in ["metrics", "params", "images", "artifacts"]
for k in [
"metrics",
"metric",
"params",
"images",
"artifacts",
"matrices",
]
},
f,
)
Expand Down Expand Up @@ -381,7 +406,7 @@ def log_hyperparams(
if self.is_wandb:
self.experiment["wandb"].config.update(params)
if self.is_mlflow:
self.log_to_mlflow(self.experiment["mlflow"].log_params, params)
self.log_to_mlflow(self._experiment["mlflow"].log_params, params)

@rank_zero_only
def log_metric(self, name: str, value: float, step: int) -> None:
Expand All @@ -405,7 +430,7 @@ def log_metric(self, name: str, value: float, step: int) -> None:

if self.is_mlflow:
self.log_to_mlflow(
self.experiment["mlflow"].log_metric, name, value, step
self._experiment["mlflow"].log_metric, name, value, step
)

@rank_zero_only
Expand All @@ -424,7 +449,7 @@ def log_metrics(self, metrics: Dict[str, float], step: int) -> None:
self.experiment["wandb"].log(metrics)
if self.is_mlflow:
self.log_to_mlflow(
self.experiment["mlflow"].log_metrics, metrics, step
self._experiment["mlflow"].log_metrics, metrics, step
)

@rank_zero_only
Expand Down Expand Up @@ -455,7 +480,7 @@ def log_image(self, name: str, img: np.ndarray, step: int) -> None:
base_path, img_caption = name.rsplit("/", 1)
img_path = f"{base_path}/{step}/{img_caption}.png"
self.log_to_mlflow(
self.experiment["mlflow"].log_image, img, img_path
self._experiment["mlflow"].log_image, img, img_path
)

@rank_zero_only
Expand Down Expand Up @@ -491,7 +516,7 @@ def upload_artifact(
fs.put_file(
local_path=path,
remote_path=name or path.name,
mlflow_instance=self.experiment.get("mlflow"),
mlflow_instance=self._experiment.get("mlflow"),
)
except Exception as e:
logger.warning(f"Failed to upload artifact to MLflow: {e}")
Expand All @@ -516,7 +541,11 @@ def log_matrix(self, matrix: np.ndarray, name: str, step: int) -> None:
"flat_array": matrix.flatten().tolist(),
"shape": matrix.shape,
}
self.experiment["mlflow"].log_dict(matrix_data, f"{name}.json")
self.log_to_mlflow(
self._experiment["mlflow"].log_dict,
matrix_data,
f"{name}.json",
)

if self.is_tensorboard:
matrix_str = np.array2string(matrix, separator=", ")
Expand Down

0 comments on commit 4ccf57e

Please sign in to comment.