From cfdfb88661f498b66a70971f6c4a7857cbac0f91 Mon Sep 17 00:00:00 2001 From: Anja Adamov <57316423+adamovanja@users.noreply.github.com> Date: Fri, 30 Aug 2024 14:16:22 +0200 Subject: [PATCH 1/7] refactor: all except NN runs --- README.md | 4 + experiments/raytune_nn_rmse_mismatch.ipynb | 278 +++++++++++++++++++++ experiments/raytune_nn_rmse_mismatch.py | 180 +++++++++++++ experiments/raytune_nn_verify_rmse.ipynb | 107 ++++++++ q2_ritme/evaluate_all_experiments.py | 2 +- q2_ritme/evaluate_models.py | 4 +- q2_ritme/model_space/static_searchspace.py | 1 + q2_ritme/model_space/static_trainables.py | 24 +- q2_ritme/tests/test_static_trainables.py | 128 +++++++++- q2_ritme/tune_models.py | 12 +- 10 files changed, 718 insertions(+), 22 deletions(-) create mode 100644 experiments/raytune_nn_rmse_mismatch.ipynb create mode 100644 experiments/raytune_nn_rmse_mismatch.py create mode 100644 experiments/raytune_nn_verify_rmse.ipynb diff --git a/README.md b/README.md index b512f80..7943f4f 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,10 @@ If you are using SLURM and ... ## Model tracking In the config file you can choose to track your trials with MLflow (tracking_uri=="mlruns") or with WandB (tracking_uri=="wandb"). +### Choice between MLflow & WandB +WandB stores aggregate metrics on their servers. The way *ritme* is set up no sample-specific information is stored remotely. This information is stored on your local machine. +To choose which tracking set-up works best for you, it is best to review the respective services. + ### MLflow In case of using MLflow you can view your models with `mlflow ui --backend-store-uri experiments/mlruns`. For more information check out the [official MLflow documentation](https://mlflow.org/docs/latest/index.html). diff --git a/experiments/raytune_nn_rmse_mismatch.ipynb b/experiments/raytune_nn_rmse_mismatch.ipynb new file mode 100644 index 0000000..b722861 --- /dev/null +++ b/experiments/raytune_nn_rmse_mismatch.ipynb @@ -0,0 +1,278 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Inconsistent train metrics and logging frequency in Ray Tune with PyTorch Lightning\n", + "\n", + "**The Bug:**\n", + "When I train a Pytorch Lightning model with Ray Tune and I extract the best train metric from `results.get_best_result()` the metric differs from the recalculated metric using the best checkpoint on the same data (`rmse_train_recalc` in script). However, the respective validation metrics(`best_rmse_val` & `rmse_val_recalc`) remain consistent. \n", + "Also, the counts on how often the metrics are logged differ between train and validation set: train is only logged 9 times (`train_log_count`) whereas validation is logged 10 times (`val_log_count`).\n", + "Why is that? \n", + "\n", + "**Expected behavior:**\n", + "- Consistent train metrics between best result and recalculation using best checkpoint\n", + "- Equal logging frequency (10 times) for both train and validation metrics\n", + "\n", + "**Environment:**\n", + "- PyTorch: 2.4\n", + "- Lightning: 2.4.0\n", + "- Ray: 2.24.0\n", + "\n", + "can be created as:\n", + "```\n", + "conda create -n ray_bug_recom -y\n", + "conda activate ray_bug_recom\n", + "conda install -c conda-forge -c pytorch pytorch==2.4 lightning==2.4.0 -y\n", + "pip install \"ray[data,train,tune,serve]\"==2.24.0\n", + "```\n", + "\n", + "**Opened issue with `raytune_nn_rmse_mismatch.py` to [ray repos here](https://github.com/ray-project/ray/issues/47333) on Monday 26th Aug'24.**\n", + "\n", + "Note: Issue is reproducible in both ray_bug_recom and ritme conda environments." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "import torch\n", + "from lightning import LightningModule, Trainer\n", + "from ray import init, tune\n", + "from ray.tune.integration.pytorch_lightning import TuneReportCheckpointCallback\n", + "from torch import nn\n", + "from torch.utils.data import DataLoader, TensorDataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class SimpleNN(LightningModule):\n", + " def __init__(self, input_size, hidden_size, learning_rate):\n", + " super().__init__()\n", + " self.save_hyperparameters()\n", + " self.model = nn.Sequential(\n", + " nn.Linear(input_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, 1)\n", + " )\n", + " self.learning_rate = learning_rate\n", + " self.train_loss = 0\n", + " self.val_loss = 0\n", + " self.train_predictions = []\n", + " self.train_targets = []\n", + " self.val_predictions = []\n", + " self.val_targets = []\n", + " self.train_log_count = 0\n", + " self.val_log_count = 0\n", + "\n", + " def forward(self, x):\n", + " return self.model(x)\n", + "\n", + " def training_step(self, batch, batch_idx):\n", + " x, y = batch\n", + " y_hat = self(x)\n", + " loss = nn.MSELoss()(y_hat, y)\n", + " self.train_loss = loss\n", + "\n", + " self.train_predictions.append(y_hat.detach())\n", + " self.train_targets.append(y.detach())\n", + "\n", + " return loss\n", + "\n", + " def validation_step(self, batch, batch_idx):\n", + " x, y = batch\n", + " y_hat = self(x)\n", + " loss = nn.MSELoss()(y_hat, y)\n", + " self.val_loss = loss\n", + "\n", + " self.val_predictions.append(y_hat.detach())\n", + " self.val_targets.append(y.detach())\n", + "\n", + " self.log(\"val_loss\", loss)\n", + "\n", + " return {\"val_loss\": loss}\n", + "\n", + " def on_train_epoch_end(self):\n", + " all_preds_train = torch.cat(self.train_predictions)\n", + " all_targets_train = torch.cat(self.train_targets)\n", + "\n", + " rmse_train = torch.sqrt(\n", + " nn.functional.mse_loss(all_preds_train, all_targets_train)\n", + " )\n", + " self.train_log_count += 1\n", + " self.log(\"train_log_count\", self.train_log_count)\n", + " self.log(\"rmse_train\", rmse_train)\n", + "\n", + " self.train_predictions.clear()\n", + " self.train_targets.clear()\n", + "\n", + " def on_validation_epoch_end(self):\n", + " all_preds_val = torch.cat(self.val_predictions)\n", + " all_targets_val = torch.cat(self.val_targets)\n", + "\n", + " rmse_val = torch.sqrt(nn.functional.mse_loss(all_preds_val, all_targets_val))\n", + "\n", + " self.val_log_count += 1\n", + " self.log(\"val_log_count\", self.val_log_count)\n", + " self.log(\"rmse_val\", rmse_val)\n", + "\n", + " self.val_predictions.clear()\n", + " self.val_targets.clear()\n", + "\n", + " def configure_optimizers(self):\n", + " return torch.optim.Adam(self.parameters(), lr=self.learning_rate)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def train_nn(config, train_data, val_data):\n", + " model = SimpleNN(\n", + " input_size=10, hidden_size=config[\"hidden_size\"], learning_rate=config[\"lr\"]\n", + " )\n", + "\n", + " train_loader = DataLoader(train_data, batch_size=800)\n", + " val_loader = DataLoader(val_data, batch_size=800)\n", + "\n", + " trainer = Trainer(\n", + " max_epochs=10,\n", + " num_sanity_val_steps=0,\n", + " check_val_every_n_epoch=1,\n", + " val_check_interval=1,\n", + " callbacks=[\n", + " TuneReportCheckpointCallback(\n", + " metrics={\n", + " \"loss\": \"val_loss\",\n", + " \"rmse_val\": \"rmse_val\",\n", + " \"rmse_train\": \"rmse_train\",\n", + " \"val_log_count\": \"val_log_count\",\n", + " \"train_log_count\": \"train_log_count\",\n", + " },\n", + " filename=\"checkpoint\",\n", + " on=\"validation_end\",\n", + " save_checkpoints=True,\n", + " ),\n", + " ],\n", + " deterministic=True,\n", + " )\n", + "\n", + " trainer.fit(model, train_loader, val_loader)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "config = {\n", + " \"lr\": tune.loguniform(1e-4, 1e-1),\n", + " \"hidden_size\": tune.choice([32, 64, 128]),\n", + "}\n", + "\n", + "init(\n", + " address=\"local\",\n", + " include_dashboard=False,\n", + " ignore_reinit_error=True,\n", + ")\n", + "\n", + "torch.manual_seed(42)\n", + "X = torch.randn(1000, 10)\n", + "y = torch.sum(X, dim=1, keepdim=True)\n", + "X_train, y_train = X[:800], y[:800]\n", + "X_val, y_val = X[800:], y[800:]\n", + "\n", + "train_data = TensorDataset(X_train, y_train)\n", + "val_data = TensorDataset(X_val, y_val)\n", + "\n", + "tuner = tune.Tuner(\n", + " tune.with_parameters(train_nn, train_data=train_data, val_data=val_data),\n", + " tune_config=tune.TuneConfig(metric=\"rmse_val\", mode=\"min\", num_samples=2),\n", + " param_space=config,\n", + ")\n", + "\n", + "results = tuner.fit()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# get best ray result\n", + "best_result = results.get_best_result(\"rmse_val\", \"min\", scope=\"all\")\n", + "best_rmse_train = best_result.metrics[\"rmse_train\"]\n", + "best_rmse_val = best_result.metrics[\"rmse_val\"]\n", + "print(f\"Best trial final train rmse: {best_rmse_train}\")\n", + "print(f\"Best trial final validation rmse: {best_rmse_val}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# get best model checkpoint\n", + "checkpoint_dir = best_result.checkpoint.path\n", + "checkpoint_path = os.path.join(checkpoint_dir, \"checkpoint\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# load model\n", + "model = SimpleNN.load_from_checkpoint(checkpoint_path)\n", + "\n", + "# recalculate rmse_train\n", + "rmse_train_recalc = torch.sqrt(nn.functional.mse_loss(model(X_train), y_train)).item()\n", + "print(f\"rmse_train_recalc: {rmse_train_recalc}\")\n", + "\n", + "# recalculate rmse_val\n", + "rmse_val_recalc = torch.sqrt(nn.functional.mse_loss(model(X_val), y_val)).item()\n", + "print(f\"rmse_val_recalc: {rmse_val_recalc}\")\n", + "\n", + "# assertions\n", + "if not best_rmse_val == rmse_val_recalc:\n", + " raise ValueError(\"best_rmse_val != rmse_val_recalc\")\n", + "if not best_rmse_train == rmse_train_recalc:\n", + " raise ValueError(\"best_rmse_train != rmse_train_recalc\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "ritme", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.5" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/experiments/raytune_nn_rmse_mismatch.py b/experiments/raytune_nn_rmse_mismatch.py new file mode 100644 index 0000000..65a38c8 --- /dev/null +++ b/experiments/raytune_nn_rmse_mismatch.py @@ -0,0 +1,180 @@ +import os + +import torch +from lightning import LightningModule, Trainer +from ray import init, tune +from ray.tune.integration.pytorch_lightning import TuneReportCheckpointCallback +from torch import nn +from torch.utils.data import DataLoader, TensorDataset + + +class SimpleNN(LightningModule): + def __init__(self, input_size, hidden_size, learning_rate): + super().__init__() + self.save_hyperparameters() + self.model = nn.Sequential( + nn.Linear(input_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, 1) + ) + self.learning_rate = learning_rate + self.train_loss = 0 + self.val_loss = 0 + self.train_predictions = [] + self.train_targets = [] + self.val_predictions = [] + self.val_targets = [] + self.train_log_count = 0 + self.val_log_count = 0 + + def forward(self, x): + return self.model(x) + + def training_step(self, batch, batch_idx): + x, y = batch + y_hat = self(x) + loss = nn.MSELoss()(y_hat, y) + self.train_loss = loss + + self.train_predictions.append(y_hat.detach()) + self.train_targets.append(y.detach()) + + return loss + + def validation_step(self, batch, batch_idx): + x, y = batch + y_hat = self(x) + loss = nn.MSELoss()(y_hat, y) + self.val_loss = loss + + self.val_predictions.append(y_hat.detach()) + self.val_targets.append(y.detach()) + + self.log("val_loss", loss) + + return {"val_loss": loss} + + def on_train_epoch_end(self): + all_preds_train = torch.cat(self.train_predictions) + all_targets_train = torch.cat(self.train_targets) + + rmse_train = torch.sqrt( + nn.functional.mse_loss(all_preds_train, all_targets_train) + ) + self.train_log_count += 1 + self.log("train_log_count", self.train_log_count) + self.log("rmse_train", rmse_train) + + self.train_predictions.clear() + self.train_targets.clear() + + def on_validation_epoch_end(self): + all_preds_val = torch.cat(self.val_predictions) + all_targets_val = torch.cat(self.val_targets) + + rmse_val = torch.sqrt(nn.functional.mse_loss(all_preds_val, all_targets_val)) + + self.val_log_count += 1 + self.log("val_log_count", self.val_log_count) + self.log("rmse_val", rmse_val) + + self.val_predictions.clear() + self.val_targets.clear() + + def configure_optimizers(self): + return torch.optim.Adam(self.parameters(), lr=self.learning_rate) + + +def train_nn(config, train_data, val_data): + model = SimpleNN( + input_size=10, hidden_size=config["hidden_size"], learning_rate=config["lr"] + ) + + train_loader = DataLoader(train_data, batch_size=800) + val_loader = DataLoader(val_data, batch_size=800) + + trainer = Trainer( + max_epochs=10, + num_sanity_val_steps=0, + check_val_every_n_epoch=1, + val_check_interval=1, + callbacks=[ + TuneReportCheckpointCallback( + metrics={ + "loss": "val_loss", + "rmse_val": "rmse_val", + "rmse_train": "rmse_train", + "val_log_count": "val_log_count", + "train_log_count": "train_log_count", + }, + filename="checkpoint", + on="validation_end", + save_checkpoints=True, + ), + ], + deterministic=True, + ) + + trainer.fit(model, train_loader, val_loader) + + +def main(): + config = { + "lr": tune.loguniform(1e-4, 1e-1), + "hidden_size": tune.choice([32, 64, 128]), + } + + init( + address="local", + include_dashboard=False, + ignore_reinit_error=True, + ) + + torch.manual_seed(42) + X = torch.randn(1000, 10) + y = torch.sum(X, dim=1, keepdim=True) + X_train, y_train = X[:800], y[:800] + X_val, y_val = X[800:], y[800:] + + train_data = TensorDataset(X_train, y_train) + val_data = TensorDataset(X_val, y_val) + + tuner = tune.Tuner( + tune.with_parameters(train_nn, train_data=train_data, val_data=val_data), + tune_config=tune.TuneConfig(metric="rmse_val", mode="min", num_samples=2), + param_space=config, + ) + + results = tuner.fit() + + # get best ray result + best_result = results.get_best_result("rmse_val", "min", scope="all") + best_rmse_train = best_result.metrics["rmse_train"] + best_rmse_val = best_result.metrics["rmse_val"] + print(f"Best trial final train rmse: {best_rmse_train}") + print(f"Best trial final validation rmse: {best_rmse_val}") + + # get best model checkpoint + checkpoint_dir = best_result.checkpoint.path + checkpoint_path = os.path.join(checkpoint_dir, "checkpoint") + + # load model + model = SimpleNN.load_from_checkpoint(checkpoint_path) + + # recalculate rmse_train + rmse_train_recalc = torch.sqrt( + nn.functional.mse_loss(model(X_train), y_train) + ).item() + print(f"rmse_train_recalc: {rmse_train_recalc}") + + # recalculate rmse_val + rmse_val_recalc = torch.sqrt(nn.functional.mse_loss(model(X_val), y_val)).item() + print(f"rmse_val_recalc: {rmse_val_recalc}") + + # assertions + if not best_rmse_val == rmse_val_recalc: + raise ValueError("best_rmse_val != rmse_val_recalc") + if not best_rmse_train == rmse_train_recalc: + raise ValueError("best_rmse_train != rmse_train_recalc") + + +if __name__ == "__main__": + main() diff --git a/experiments/raytune_nn_verify_rmse.ipynb b/experiments/raytune_nn_verify_rmse.ipynb new file mode 100644 index 0000000..6806816 --- /dev/null +++ b/experiments/raytune_nn_verify_rmse.ipynb @@ -0,0 +1,107 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Compare rmse calculated from all predictions vs. during ray tune logging\n", + "\n", + "Linked with raytune_nn_rmse_mismatch" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import pandas as pd\n", + "import torch\n", + "from torch import nn\n", + "from sklearn.metrics import root_mean_squared_error" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "path_to_pred = \"/Users/adamova/Documents/projects/14_LM1/q2-ritme/experiments/models/r_nn_all_test_local_one_epoch_1/nn_class/train_nn_class_2385d79f_1_batch_size=64,data_aggregation=None,data_selection=variance_threshold,data_selection_t=0.0001,data_trans_2024-08-22_19-49-05/\"\n", + "filename = \"debug_last_log_vs_preds.csv\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pred_df = pd.read_csv(os.path.join(path_to_pred, filename), index_col=0)\n", + "pred_df_train = pred_df[pred_df[\"split\"] == \"train\"]\n", + "pred_df_val = pred_df[pred_df[\"split\"] == \"val\"]\n", + "pred_df[\"pred\"] = pred_df[\"pred\"].astype(float)\n", + "pred_df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"train:\")\n", + "print(root_mean_squared_error(pred_df_train[\"true\"], pred_df_train[\"pred\"]))\n", + "print(\"val:\")\n", + "print(root_mean_squared_error(pred_df_val[\"true\"], pred_df_val[\"pred\"]))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"train:\")\n", + "rmse_tensor_train = torch.sqrt(\n", + " nn.functional.mse_loss(\n", + " torch.from_numpy(pred_df_train[\"pred\"].values),\n", + " torch.from_numpy(pred_df_train[\"true\"].values),\n", + " )\n", + ")\n", + "print(rmse_tensor_train)\n", + "\n", + "print(\"val:\")\n", + "rmse_tensor_val = torch.sqrt(\n", + " nn.functional.mse_loss(\n", + " torch.from_numpy(pred_df_val[\"pred\"].values),\n", + " torch.from_numpy(pred_df_val[\"true\"].values),\n", + " )\n", + ")\n", + "rmse_tensor_val" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "ritme", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.19" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/q2_ritme/evaluate_all_experiments.py b/q2_ritme/evaluate_all_experiments.py index d4be3a1..a1d19e4 100644 --- a/q2_ritme/evaluate_all_experiments.py +++ b/q2_ritme/evaluate_all_experiments.py @@ -29,7 +29,7 @@ def best_trial_name(analyses_ls, metric_to_evaluate, mode="min"): for analysis in analyses_ls: # Get the best trial for the current analysis based on the metric - best_trial = analysis.get_best_trial(metric_to_evaluate, mode, "last") + best_trial = analysis.get_best_trial(metric_to_evaluate, mode, "all") # Retrieve the best metric for this trial best_trial_metric = best_trial.metric_analysis[metric_to_evaluate][mode] diff --git a/q2_ritme/evaluate_models.py b/q2_ritme/evaluate_models.py index 90d802a..5e7ddf0 100644 --- a/q2_ritme/evaluate_models.py +++ b/q2_ritme/evaluate_models.py @@ -210,7 +210,7 @@ def predict(self, data, split): def retrieve_best_models(result_dic): best_model_dic = {} for model_type, result_grid in result_dic.items(): - best_result = result_grid.get_best_result() + best_result = result_grid.get_best_result(scope="all") best_model = get_model(model_type, best_result) best_data_proc = get_data_processing(best_result) @@ -324,7 +324,7 @@ def get_best_model_metrics_and_config( tuple: A tuple containing a DataFrame of metrics and a dictionary of the best model's configuration. """ - best_result = trial_result.get_best_result() + best_result = trial_result.get_best_result(scope="all") config = best_result.config metrics_ser = best_result.metrics_dataframe[metric_ls].iloc[-1] metrics_df = pd.DataFrame(metrics_ser).transpose() diff --git a/q2_ritme/model_space/static_searchspace.py b/q2_ritme/model_space/static_searchspace.py index db9b912..c9f07d6 100644 --- a/q2_ritme/model_space/static_searchspace.py +++ b/q2_ritme/model_space/static_searchspace.py @@ -97,6 +97,7 @@ def get_nn_space( trial, tax, model_name: str, test_mode: bool = False ) -> Dict[str, str]: get_data_eng_space(trial, tax, test_mode) + # todo: make max_layers configurable parameter! max_layers = 30 # Sample random uniformly between [1,max_layers] rounding to multiples of 5 trial.suggest_int("n_hidden_layers", 1, max_layers, step=5) diff --git a/q2_ritme/model_space/static_trainables.py b/q2_ritme/model_space/static_trainables.py index 644d853..d52f026 100644 --- a/q2_ritme/model_space/static_trainables.py +++ b/q2_ritme/model_space/static_trainables.py @@ -18,7 +18,7 @@ from coral_pytorch.losses import corn_loss from lightning import LightningModule, Trainer, seed_everything from lightning.pytorch.callbacks import ModelCheckpoint -from ray import train, tune +from ray import train from ray.air import session from ray.tune.integration.pytorch_lightning import TuneReportCheckpointCallback from ray.tune.integration.xgboost import TuneReportCheckpointCallback as xgb_cc @@ -164,15 +164,18 @@ def _predict_rmse_r2_trac(alpha, log_geom_X, y): return root_mean_squared_error(y, y_pred), r2_score(y, y_pred) -def _report_results_manually_trac( - alpha, A_df, log_geom_train, y_train, log_geom_val, y_val, tax -): - # save coefficients w labels & matrix A with labels -> model_path +def _bundle_trac_model(alpha, A_df): + # get coefficients w labels & matrix A with labels idx_alpha = ["intercept"] + A_df.columns.tolist() df_alpha_with_labels = pd.DataFrame(alpha, columns=["alpha"], index=idx_alpha) model = {"model": df_alpha_with_labels, "matrix_a": A_df} + return model + +def _report_results_manually_trac( + model, log_geom_train, y_train, log_geom_val, y_val, tax +): # save model path_to_save = ray.train.get_context().get_trial_dir() model_path = os.path.join(path_to_save, "model.pkl") @@ -180,6 +183,8 @@ def _report_results_manually_trac( pickle.dump(model, file) # calculate RMSE and R2 + df_alpha_with_labels = model["model"] + alpha = model["model"]["alpha"].values rmse_train, r2_train = _predict_rmse_r2_trac(alpha, log_geom_train, y_train) rmse_val, r2_val = _predict_rmse_r2_trac(alpha, log_geom_val, y_val) @@ -252,8 +257,10 @@ def train_trac( matrices_train, selected_param, intercept=intercept ) + model = _bundle_trac_model(alpha, a_df) + _report_results_manually_trac( - alpha, a_df, log_geom_train, y_train, log_geom_val, y_val, tax + model, log_geom_train, y_train, log_geom_val, y_val, tax ) @@ -497,9 +504,8 @@ def train_nn( ) _save_taxonomy(tax) # Callbacks - checkpoint_dir = ( - tune.get_trial_dir() if "TUNE_TRIAL_NAME" in os.environ else "checkpoints" - ) + checkpoint_dir = ray.train.get_context().get_trial_dir() + os.makedirs(checkpoint_dir, exist_ok=True) callbacks = [ diff --git a/q2_ritme/tests/test_static_trainables.py b/q2_ritme/tests/test_static_trainables.py index f06ebf5..65bb654 100644 --- a/q2_ritme/tests/test_static_trainables.py +++ b/q2_ritme/tests/test_static_trainables.py @@ -8,11 +8,22 @@ import pandas as pd import skbio import torch +from parameterized import parameterized from qiime2.plugin.testing import TestPluginBase +from ray import tune from sklearn.linear_model import LinearRegression -from sklearn.metrics import r2_score, root_mean_squared_error - +from sklearn.metrics import mean_squared_error, r2_score, root_mean_squared_error + +from q2_ritme.evaluate_models import ( + TunedModel, + get_data_processing, + get_model, + get_predictions, + get_taxonomy, +) from q2_ritme.model_space import static_trainables as st +from q2_ritme.process_data import split_data_by_host +from q2_ritme.tune_models import check_for_errors_in_trials, model_trainables class TestHelperFunctions(TestPluginBase): @@ -131,13 +142,15 @@ def test_train_trac( # Arrange config = {"lambda": 0.1} mock_process_train.return_value = (None, None, None, None, None) - mock_create_matrix.return_value = pd.DataFrame() + mock_create_matrix.return_value = pd.DataFrame( + {"F1": [1, 0], "F2": [0, 1]}, index=["F1", "F2"] + ) mock_preprocess_taxonomy.side_effect = [ (np.array([[1, 2], [3, 4]]), 2), (np.array([[5, 6], [7, 8]]), 2), ] - mock_classo.return_value = np.array([0.1, 0.2]) - mock_min_least_squares.return_value = np.array([0.1, 0.2]) + mock_classo.return_value = np.array([0.1, 0.1, 0.2]) + mock_min_least_squares.return_value = np.array([0.1, 0.1, 0.2]) # Act st.train_trac( @@ -277,8 +290,10 @@ def test_train_xgb( @patch("q2_ritme.model_space.static_trainables.load_data") @patch("q2_ritme.model_space.static_trainables.NeuralNet") @patch("q2_ritme.model_space.static_trainables.Trainer") + @patch("ray.train.get_context") def test_train_nn( self, + mock_get_context, mock_trainer, mock_neural_net, mock_load_data, @@ -296,7 +311,7 @@ def test_train_nn( ) mock_load_data.return_value = (MagicMock(), MagicMock()) mock_trainer_instance = mock_trainer.return_value - + mock_get_context.return_value.get_trial_dir.return_value = tempfile.mkdtemp() # Define dummy config and parameters config = { "n_hidden_layers": 2, @@ -325,3 +340,104 @@ def test_train_nn( n_units=[5, 10, 5, 1], learning_rate=0.01, nn_type="regression" ) mock_trainer_instance.fit.assert_called() + + +class TestTrainableLogging(TestPluginBase): + package = "q2_ritme.tests" + + def setUp(self): + super().setUp() + np.random.seed(42) + self.X = np.random.randn(1000, 10) + self.y = np.sum(self.X, axis=1) + np.random.randn(1000) * 0.1 + self.features = [f"F{i}" for i in range(10)] + self.train_val = pd.DataFrame(self.X, columns=[f"F{i}" for i in range(10)]) + self.train_val["target"] = self.y + self.train_val["host_id"] = np.random.randint(0, 5, 1000) + self.seed_data = 42 + self.seed_model = 42 + self.host_id = "host_id" + self.target = "target" + self.tax = pd.DataFrame([]) + self.tree_phylo = skbio.TreeNode() + + def _create_tuned_model(self, model_type, best_result): + model = get_model(model_type, best_result) + return TunedModel( + model, + get_data_processing(best_result), + get_taxonomy(best_result), + best_result.path, + ) + + @parameterized.expand( + [ + ("linreg",), + ("xgb",), + ("nn_reg",), + ] + ) + def test_logged_vs_bestresult_rmse(self, model_type): + """ + Verify that logged rmse values are identical to metrics obtained with + best result's checkpoint. This is intentionally tested for one model of + each type of checkpoint callbacks, namely manual reporting (linreg + representative for trac, rf), and each of the own checkpoint_callbacks + (xgb and nn_reg representative for all NNs). + + """ + # todo: make it run for nn_reg too (currently failing) + # fit model + search_space = { + "data_selection": None, + "data_aggregation": None, + "data_transform": None, + "data_alr_denom_idx": None, + "alpha": 0.1, + "l1_ratio": 0.5, + "batch_size": 64, + "n_hidden_layers": 1, + "epochs": 1, + "learning_rate": 0.01, + } + # todo: make max_layers configurable parameter! + for i in range(30): + search_space[f"n_units_hl{i}"] = 2 + metric = "rmse_val" + mode = "min" + tuner = tune.Tuner( + tune.with_parameters( + model_trainables[model_type], + train_val=self.train_val, + target=self.target, + host_id=self.host_id, + seed_data=self.seed_data, + seed_model=self.seed_model, + tax=self.tax, + tree_phylo=self.tree_phylo, + ), + param_space=search_space, + tune_config=tune.TuneConfig(metric=metric, mode=mode, num_samples=1), + ) + results = tuner.fit() + check_for_errors_in_trials(results) + + # get logs + best_result = results.get_best_result("rmse_val", "min", "all") + logged_rmse = { + "train": best_result.metrics["rmse_train"], + "val": best_result.metrics["rmse_val"], + } + # get recreated predictions & assert + tuned_model = self._create_tuned_model(model_type, best_result) + # split data with same split as during training - ensures with self.seed_data + train, val = split_data_by_host( + self.train_val, self.host_id, 0.8, self.seed_data + ) + + for split, data in [("train", train), ("val", val)]: + preds = get_predictions( + data, tuned_model, self.target, self.features, split=split + ) + calculated_rmse = np.sqrt(mean_squared_error(preds["pred"], preds["true"])) + self.assertAlmostEqual(logged_rmse[split], calculated_rmse, places=6) diff --git a/q2_ritme/tune_models.py b/q2_ritme/tune_models.py index 9e6357e..8482df4 100644 --- a/q2_ritme/tune_models.py +++ b/q2_ritme/tune_models.py @@ -38,6 +38,13 @@ def get_slurm_resource(resource_name, default_value=0): return default_value +def check_for_errors_in_trials(result): + if result.num_errors > 0: + raise RuntimeError( + "Some trials encountered errors see above for reported ray tune errors" + ) + + def run_trials( tracking_uri, exp_name, @@ -211,10 +218,7 @@ def run_trials( result = analysis.fit() # Check all trials & check for error status - if result.num_errors > 0: - raise RuntimeError( - "Some trials encountered errors see above for reported ray tune errors" - ) + check_for_errors_in_trials(result) return result From c207e3a0178e10a6ae50c097e93ed1b0fc505cd1 Mon Sep 17 00:00:00 2001 From: Anja Adamov <57316423+adamovanja@users.noreply.github.com> Date: Fri, 30 Aug 2024 15:59:33 +0200 Subject: [PATCH 2/7] clean-up NN --- q2_ritme/evaluate_models.py | 15 +-- q2_ritme/model_space/static_trainables.py | 124 ++++++++++++---------- q2_ritme/tests/test_static_trainables.py | 6 +- 3 files changed, 75 insertions(+), 70 deletions(-) diff --git a/q2_ritme/evaluate_models.py b/q2_ritme/evaluate_models.py index 5e7ddf0..aa43800 100644 --- a/q2_ritme/evaluate_models.py +++ b/q2_ritme/evaluate_models.py @@ -7,7 +7,6 @@ import pandas as pd import torch import xgboost as xgb -from coral_pytorch.dataset import corn_label_from_logits from joblib import load from ray.air.result import Result from sklearn.metrics import root_mean_squared_error @@ -184,17 +183,9 @@ def predict(self, data, split): transformed = self.transform(selected) if isinstance(self.model, NeuralNet): with torch.no_grad(): - if self.model.nn_type == "regression": - predicted = self.model(transformed).numpy().flatten() - - # if classification predicted class needs to be transformed from - # logit - elif self.model.nn_type == "classification": - logits = self.model(transformed) - predicted = torch.argmax(logits, dim=1).numpy() - elif self.model.nn_type == "ordinal_regression": - logits = self.model(transformed) - predicted = corn_label_from_logits(logits).numpy() + X_t = torch.tensor(transformed, dtype=torch.float32) + predicted = self.model(X_t) + predicted = self.model._prepare_predictions(predicted) elif isinstance(self.model, dict): # trac model log_geom, _ = _preprocess_taxonomy_aggregation( diff --git a/q2_ritme/model_space/static_trainables.py b/q2_ritme/model_space/static_trainables.py index d52f026..2e70d66 100644 --- a/q2_ritme/model_space/static_trainables.py +++ b/q2_ritme/model_space/static_trainables.py @@ -322,12 +322,12 @@ def __init__(self, n_units, learning_rate, nn_type="regression"): self.learning_rate = learning_rate self.nn_type = nn_type self.num_classes = n_units[-1] - if self.nn_type == "regression": - self.loss_fn = nn.MSELoss() - elif self.nn_type == "classification": - self.loss_fn = nn.CrossEntropyLoss() - elif self.nn_type == "ordinal_regression": - self.loss_fn = None + self.train_loss = 0 + self.val_loss = 0 + self.train_predictions = [] + self.train_targets = [] + self.validation_predictions = [] + self.validation_targets = [] def forward(self, x): for layer in self.layers: @@ -352,64 +352,87 @@ def _calculate_metrics(self, predictions, targets): return rmse, r2 + def _calculate_loss(self, predictions, targets): + # loss: corn_loss, cross-entropy or mse + # calculated on rounded classes as targets for ordinal regression and + # classification + targets_rounded = torch.round(targets).long() + if self.nn_type == "ordinal_regression": + # predictions = logits + return corn_loss(predictions, targets_rounded, self.num_classes) + elif self.nn_type == "classification": + loss_fn = nn.CrossEntropyLoss() + # predictions = logits + return loss_fn(predictions, targets_rounded) + loss_fn = nn.MSELoss() + return loss_fn(predictions, targets) + def training_step(self, batch, batch_idx): inputs, targets = batch - predictions = self(inputs).squeeze() + predictions = self.forward(inputs).squeeze() - # loss: corn_loss, cross-entropy or mse - # todo: clean up and remove redundancy with val step - if self.nn_type == "ordinal_regression": - loss = corn_loss(predictions, targets, self.num_classes) - else: - loss = self.loss_fn(predictions, targets) - self.log( - "train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True - ) + # Store predictions and targets + self.train_predictions.append(predictions.detach()) + self.train_targets.append(targets.detach()) - # rmse and r2 - rmse, r2 = self._calculate_metrics(predictions, targets) - self.log( - "train_rmse", rmse, on_step=True, on_epoch=True, prog_bar=True, logger=True - ) - self.log( - "train_r2", r2, on_step=True, on_epoch=True, prog_bar=True, logger=True - ) + self.train_loss = self._calculate_loss(predictions, targets) - return loss + return self.train_loss def validation_step(self, batch, batch_idx): inputs, targets = batch - predictions = self(inputs).squeeze() + predictions = self.forward(inputs).squeeze() - # loss: corn_loss, cross-entropy or mse - if self.nn_type == "ordinal_regression": - loss = corn_loss(predictions, targets, self.num_classes) - else: - loss = self.loss_fn(predictions, targets) - self.log( - "val_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True - ) + self.validation_predictions.append(predictions.detach()) + self.validation_targets.append(targets.detach()) + + self.val_loss = self._calculate_loss(predictions, targets) + # nb_features log + self.log("nb_features", inputs.shape[1]) + return {"val_loss": self.val_loss} + + def on_train_epoch_end(self): + all_preds = torch.cat(self.train_predictions) + all_targets = torch.cat(self.train_targets) + loss = self._calculate_loss(all_preds, all_targets) + self.log("train_loss", loss, on_epoch=True, prog_bar=True, logger=True) + + rmse, r2 = self._calculate_metrics(all_preds, all_targets) + self.log("train_rmse", rmse, on_epoch=True, prog_bar=True, logger=True) + self.log("train_r2", r2, on_epoch=True, prog_bar=True, logger=True) + self.train_predictions.clear() + self.train_targets.clear() + + def on_validation_epoch_end(self): + # make use of all outputs from each validation_step() + all_preds = torch.cat(self.validation_predictions) + all_targets = torch.cat(self.validation_targets) + + # do something with all preds and targets + loss = self._calculate_loss(all_preds, all_targets) + self.log("val_loss", loss, on_epoch=True, prog_bar=True, logger=True) # rmse and r2 - rmse, r2 = self._calculate_metrics(predictions, targets) - self.log( - "val_rmse", rmse, on_step=True, on_epoch=True, prog_bar=True, logger=True - ) - self.log("val_r2", r2, on_step=True, on_epoch=True, prog_bar=True, logger=True) + rmse, r2 = self._calculate_metrics(all_preds, all_targets) + self.log("val_rmse", rmse, on_epoch=True, prog_bar=True, logger=True) + self.log("val_r2", r2, on_epoch=True, prog_bar=True, logger=True) + + self.validation_predictions.clear() + self.validation_targets.clear() def configure_optimizers(self): optimizer = Adam(self.parameters(), lr=self.learning_rate) return optimizer -def load_data(X_train, y_train, X_val, y_val, y_type, config): +def load_data(X_train, y_train, X_val, y_val, config): train_dataset = TensorDataset( torch.tensor(X_train, dtype=torch.float32), - torch.tensor(y_train, dtype=y_type), + torch.tensor(y_train, dtype=torch.float32), ) val_dataset = TensorDataset( torch.tensor(X_val, dtype=torch.float32), - torch.tensor(y_val, dtype=y_type), + torch.tensor(y_val, dtype=torch.float32), ) train_loader = DataLoader( train_dataset, batch_size=config["batch_size"], shuffle=True, num_workers=2 @@ -464,30 +487,19 @@ def train_nn( config, train_val, target, host_id, tax, seed_data ) - # round target to monthly classes in case of ordinal regression and load - # data with data loaders - if nn_type in ["ordinal_regression", "classification"]: - y_train = np.round(y_train) - y_val = np.round(y_val) - train_loader, val_loader = load_data( - X_train, y_train, X_val, y_val, torch.long, config - ) - - else: - train_loader, val_loader = load_data( - X_train, y_train, X_val, y_val, torch.float32, config - ) + train_loader, val_loader = load_data(X_train, y_train, X_val, y_val, config) # Model n_layers = config["n_hidden_layers"] # output layer defined by target - n_target_classes = len(np.unique(y_train)) if nn_type == "regression": output_layer = [1] elif nn_type == "classification": + n_target_classes = len(np.unique(np.round(y_train))) output_layer = [n_target_classes] elif nn_type == "ordinal_regression": # CORN reduces number of classes by 1 + n_target_classes = len(np.unique(np.round(y_train))) output_layer = [n_target_classes - 1] n_units = ( diff --git a/q2_ritme/tests/test_static_trainables.py b/q2_ritme/tests/test_static_trainables.py index 65bb654..8198182 100644 --- a/q2_ritme/tests/test_static_trainables.py +++ b/q2_ritme/tests/test_static_trainables.py @@ -386,7 +386,9 @@ def test_logged_vs_bestresult_rmse(self, model_type): (xgb and nn_reg representative for all NNs). """ - # todo: make it run for nn_reg too (currently failing) + # todo: make it run for nn_reg too (currently failing: see issue here: + # https://github.com/ray-project/ray/issues/47333) + # fit model search_space = { "data_selection": None, @@ -397,7 +399,7 @@ def test_logged_vs_bestresult_rmse(self, model_type): "l1_ratio": 0.5, "batch_size": 64, "n_hidden_layers": 1, - "epochs": 1, + "epochs": 2, "learning_rate": 0.01, } # todo: make max_layers configurable parameter! From 6b069929015467cbf66a69d64095f713f07835e5 Mon Sep 17 00:00:00 2001 From: Anja Adamov <57316423+adamovanja@users.noreply.github.com> Date: Fri, 30 Aug 2024 16:09:46 +0200 Subject: [PATCH 3/7] comment refac --- q2_ritme/tests/test_static_trainables.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/q2_ritme/tests/test_static_trainables.py b/q2_ritme/tests/test_static_trainables.py index 8198182..f54f2c1 100644 --- a/q2_ritme/tests/test_static_trainables.py +++ b/q2_ritme/tests/test_static_trainables.py @@ -374,7 +374,9 @@ def _create_tuned_model(self, model_type, best_result): [ ("linreg",), ("xgb",), - ("nn_reg",), + # todo: NN is currently failing: see issue here: + # https://github.com/ray-project/ray/issues/47333) + # ("nn_reg",), ] ) def test_logged_vs_bestresult_rmse(self, model_type): @@ -386,9 +388,6 @@ def test_logged_vs_bestresult_rmse(self, model_type): (xgb and nn_reg representative for all NNs). """ - # todo: make it run for nn_reg too (currently failing: see issue here: - # https://github.com/ray-project/ray/issues/47333) - # fit model search_space = { "data_selection": None, From df1eceeb83051e1488d6bf8509be116d3cf16275 Mon Sep 17 00:00:00 2001 From: Anja Adamov <57316423+adamovanja@users.noreply.github.com> Date: Fri, 30 Aug 2024 16:41:50 +0200 Subject: [PATCH 4/7] fix ci --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ae6c52b..68753e4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,6 +21,7 @@ jobs: - name: Install dependencies run: | + python -m pip install --upgrade pip pip install -q https://github.com/qiime2/q2lint/archive/master.zip pip install -q flake8 From 8d2dd3dafc0971fc53e0e850c362120e3029e317 Mon Sep 17 00:00:00 2001 From: Anja Adamov <57316423+adamovanja@users.noreply.github.com> Date: Fri, 30 Aug 2024 16:45:40 +0200 Subject: [PATCH 5/7] remove q2lint --- .github/workflows/ci.yml | 2 -- Makefile | 1 - 2 files changed, 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 68753e4..04aaacb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,8 +21,6 @@ jobs: - name: Install dependencies run: | - python -m pip install --upgrade pip - pip install -q https://github.com/qiime2/q2lint/archive/master.zip pip install -q flake8 - name: Lint diff --git a/Makefile b/Makefile index e9aa43e..f1b3081 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,6 @@ PYTHON ?= python all: ; lint: - q2lint flake8 test: all From b13319e68d636020e242b5bc689f71e9b406271e Mon Sep 17 00:00:00 2001 From: Anja Adamov <57316423+adamovanja@users.noreply.github.com> Date: Fri, 30 Aug 2024 16:53:53 +0200 Subject: [PATCH 6/7] add ruff as linter --- .github/workflows/ci.yml | 2 +- Makefile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 04aaacb..6c8ca3d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: - name: Install dependencies run: | - pip install -q flake8 + pip install -q ruff - name: Lint run: make lint diff --git a/Makefile b/Makefile index f1b3081..aa09c39 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ PYTHON ?= python all: ; lint: - flake8 + ruff check test: all py.test From 7b357395b1b34d0a098f61072efb91f50b764984 Mon Sep 17 00:00:00 2001 From: Anja Adamov <57316423+adamovanja@users.noreply.github.com> Date: Fri, 30 Aug 2024 16:56:37 +0200 Subject: [PATCH 7/7] ruff with own GHA --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6c8ca3d..68c7929 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,8 +23,8 @@ jobs: run: | pip install -q ruff - - name: Lint - run: make lint + - name: Ruff + uses: chartboost/ruff-action@v1 build-and-test: needs: lint