From 4ccf57ee2c8c6813c6ab596a508386acf66d0d23 Mon Sep 17 00:00:00 2001 From: Jernej Sabadin Date: Sun, 24 Nov 2024 11:20:01 +0100 Subject: [PATCH] fix: reconnecting logic and log_matrix logic --- luxonis_ml/tracker/tracker.py | 65 +++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 18 deletions(-) diff --git a/luxonis_ml/tracker/tracker.py b/luxonis_ml/tracker/tracker.py index d1ccf49e..6c620a74 100644 --- a/luxonis_ml/tracker/tracker.py +++ b/luxonis_ml/tracker/tracker.py @@ -95,10 +95,12 @@ def __init__( self.is_sweep = is_sweep self.rank = rank self.local_logs = { - "metrics": [], + "metric": [], "params": {}, "images": [], "artifacts": [], + "matrices": [], + "metrics": [], } self.mlflow_initialized = False @@ -171,14 +173,18 @@ def log_to_mlflow(self, log_fn: Callable, *args, **kwargs) -> None: def store_log_locally(self, log_fn: Callable, *args, **kwargs) -> None: """Stores log data locally if logging to MLflow fails.""" - # Checking functions without reinitializing experiment - if log_fn == self.log_metric: - self.local_logs["metrics"].append( + # Checking functions without triggering reconnections. + if log_fn == self._experiment["mlflow"].log_metric: + self.local_logs["metric"].append( {"name": args[0], "value": args[1], "step": args[2]} ) - elif log_fn == self.log_hyperparams: + if log_fn == self._experiment["mlflow"].log_metrics: + self.local_logs["metrics"].append( + {"metrics": args[0], "step": args[1]} + ) + elif log_fn == self._experiment["mlflow"].log_params: self.local_logs["params"].update(args[0]) - elif log_fn == self.log_image: + elif log_fn == self._experiment["mlflow"].log_image: self.local_logs["images"].append( {"image_data": args[0], "name": args[1]} ) @@ -186,6 +192,10 @@ def store_log_locally(self, log_fn: Callable, *args, **kwargs) -> None: self.local_logs["artifacts"].append( {"path": str(args[0]), "name": args[1], "type": args[2]} ) + elif log_fn == self._experiment["mlflow"].log_dict: + self.local_logs["matrices"].append( + {"matrix": args[0], "name": args[1]} + ) def log_stored_logs_to_mlflow(self): """Attempts to log any data stored in local_logs to MLflow.""" @@ -193,10 +203,14 @@ def log_stored_logs_to_mlflow(self): return try: - for metric in self.local_logs["metrics"]: + for metric in self.local_logs["metric"]: self._experiment["mlflow"].log_metric( metric["name"], metric["value"], metric["step"] ) + for metrics in self.local_logs["metrics"]: + self._experiment["mlflow"].log_metrics( + metrics["metrics"], metrics["step"] + ) if self.local_logs["params"]: self._experiment["mlflow"].log_params( self.local_logs["params"] @@ -209,20 +223,26 @@ def log_stored_logs_to_mlflow(self): self.upload_artifact( Path(artifact["path"]), artifact["name"], artifact["type"] ) + for matrix in self.local_logs["matrices"]: + self._experiment["mlflow"].log_dict( + matrix["matrix"], matrix["name"] + ) self.local_logs = { "metrics": [], "params": {}, "images": [], "artifacts": [], + "matrices": [], + "metric": [], } logger.info("Successfully re-logged stored logs to MLflow.") except Exception as e: logger.warning(f"Failed to re-log stored logs to MLflow: {e}") def save_logs_locally(self): - """Saves metrics, parameters, and artifacts to JSON and images - to separate files.""" + """Saves metrics, parameters, images, artifacts, and matrices + locally.""" run_dir = Path(self.save_directory) / self.run_name image_dir = run_dir / "images" artifact_dir = run_dir / "artifacts" @@ -238,7 +258,6 @@ def save_logs_locally(self): ) img["image_data"] = img_path # Replace data with path - # Save artifacts to local storage directory for artifact in self.local_logs["artifacts"]: artifact_path = Path(artifact["path"]) if artifact_path.exists(): @@ -246,12 +265,18 @@ def save_logs_locally(self): local_path.write_bytes(artifact_path.read_bytes()) artifact["path"] = str(local_path) - # Save logs to JSON file with open(run_dir / "local_logs.json", "w") as f: json.dump( { k: self.local_logs[k] - for k in ["metrics", "params", "images", "artifacts"] + for k in [ + "metrics", + "metric", + "params", + "images", + "artifacts", + "matrices", + ] }, f, ) @@ -381,7 +406,7 @@ def log_hyperparams( if self.is_wandb: self.experiment["wandb"].config.update(params) if self.is_mlflow: - self.log_to_mlflow(self.experiment["mlflow"].log_params, params) + self.log_to_mlflow(self._experiment["mlflow"].log_params, params) @rank_zero_only def log_metric(self, name: str, value: float, step: int) -> None: @@ -405,7 +430,7 @@ def log_metric(self, name: str, value: float, step: int) -> None: if self.is_mlflow: self.log_to_mlflow( - self.experiment["mlflow"].log_metric, name, value, step + self._experiment["mlflow"].log_metric, name, value, step ) @rank_zero_only @@ -424,7 +449,7 @@ def log_metrics(self, metrics: Dict[str, float], step: int) -> None: self.experiment["wandb"].log(metrics) if self.is_mlflow: self.log_to_mlflow( - self.experiment["mlflow"].log_metrics, metrics, step + self._experiment["mlflow"].log_metrics, metrics, step ) @rank_zero_only @@ -455,7 +480,7 @@ def log_image(self, name: str, img: np.ndarray, step: int) -> None: base_path, img_caption = name.rsplit("/", 1) img_path = f"{base_path}/{step}/{img_caption}.png" self.log_to_mlflow( - self.experiment["mlflow"].log_image, img, img_path + self._experiment["mlflow"].log_image, img, img_path ) @rank_zero_only @@ -491,7 +516,7 @@ def upload_artifact( fs.put_file( local_path=path, remote_path=name or path.name, - mlflow_instance=self.experiment.get("mlflow"), + mlflow_instance=self._experiment.get("mlflow"), ) except Exception as e: logger.warning(f"Failed to upload artifact to MLflow: {e}") @@ -516,7 +541,11 @@ def log_matrix(self, matrix: np.ndarray, name: str, step: int) -> None: "flat_array": matrix.flatten().tolist(), "shape": matrix.shape, } - self.experiment["mlflow"].log_dict(matrix_data, f"{name}.json") + self.log_to_mlflow( + self._experiment["mlflow"].log_dict, + matrix_data, + f"{name}.json", + ) if self.is_tensorboard: matrix_str = np.array2string(matrix, separator=", ")