From ec53c439f69ac82b0650163c979dfed548526f4a Mon Sep 17 00:00:00 2001 From: kagrawa2 Date: Tue, 19 Nov 2024 22:28:51 -0800 Subject: [PATCH] Add workflow based linear regression tutorial Signed-off-by: kagrawa2 --- ...105_Numpy_Linear_Regression_Workflow.ipynb | 575 ++++++++++++++++++ 1 file changed, 575 insertions(+) create mode 100644 openfl-tutorials/experimental/105_Numpy_Linear_Regression_Workflow.ipynb diff --git a/openfl-tutorials/experimental/105_Numpy_Linear_Regression_Workflow.ipynb b/openfl-tutorials/experimental/105_Numpy_Linear_Regression_Workflow.ipynb new file mode 100644 index 00000000000..fe06ac49873 --- /dev/null +++ b/openfl-tutorials/experimental/105_Numpy_Linear_Regression_Workflow.ipynb @@ -0,0 +1,575 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Linear Regression Tutorial using Workflow API\n", + "\n", + "This notebook demonstrates a linear regression tutorial using the Workflow Interface. The key steps include:\n", + "\n", + "1. **Model Definition**:\n", + " - Define a linear regression model with Mean Squared Error (MSE) as the loss function.\n", + "\n", + "2. **Synthetic Data Generation**:\n", + " - Generate synthetic datasets for training and validation.\n", + " - Shard the dataset among multiple collaborators for federated learning.\n", + "\n", + "3. **Federated Learning Workflow**:\n", + " - Define a federated learning workflow using the `FederatedFlow` class.\n", + " - Implement tasks for the aggregator and collaborators, including model validation, training, and aggregation.\n", + "\n", + "4. **Training and Evaluation**:\n", + " - Train the model locally to establish a baseline.\n", + " - Execute the federated learning workflow to train the model across multiple collaborators.\n", + " - Plot and analyze the aggregated model's performance over multiple rounds.\n", + "\n", + "By the end of this tutorial, you will understand how to set up and execute a federated learning using workflow interface for a linear regression model." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Getting Started\n", + "\n", + "First we start by installing the necessary dependencies for the workflow interface and importing the relevant libraries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Below code will display the print statement output on screen as well\n", + "import sys\n", + "sys.stdout = open('/dev/stdout', 'w')\n", + "\n", + "!pip install git+https://github.com/securefederatedai/openfl.git\n", + "!pip install -r workflow_interface_requirements.txt\n", + "!pip install matplotlib\n", + "\n", + "# Uncomment this if running in Google Colab and set USERNAME if running in docker container.\n", + "#!pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/workflow_interface_requirements.txt\n", + "#import os\n", + "#os.environ[\"USERNAME\"] = \"colab\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import torch as pt\n", + "import torch.utils.data as data\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "import torch.optim as optim\n", + "import numpy as np\n", + "from typing import List, Union\n", + "import random\n", + "import matplotlib.pyplot as plt\n", + "%matplotlib inline\n", + "from matplotlib.pylab import rcParams\n", + "\n", + "import warnings\n", + "warnings.filterwarnings(\"ignore\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, define a linear regression model with Mean Squared Error (MSE) as the loss function. Train this linear model locally on a synthetic dataset to establish a baseline solution." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class LinRegLasso:\n", + " def __init__(self, n_feat: int) -> None:\n", + " self.weights = np.ones((n_feat + 1)) # (n_feat + 1,) weights + bias\n", + "\n", + " def predict(self, feature_vector: Union[np.ndarray, List[int]]) -> float:\n", + " '''\n", + " feature_vector may be a list or have shape (n_feat,)\n", + " or it may be a bunch of vectors (n_vec, nfeat)\n", + " '''\n", + " feature_vector = np.array(feature_vector)\n", + " if len(feature_vector.shape) == 1:\n", + " feature_vector = feature_vector[:,np.newaxis]\n", + " assert feature_vector.shape[-1] == self.weights.shape[0] - 1, \\\n", + " f\"sample shape is {feature_vector.shape} and weights shape is f{self.weights}\"\n", + "\n", + " return self.weights @ np.concatenate((feature_vector.T, [[1]*feature_vector.shape[0]]))\n", + "\n", + " def mse(self, X: np.ndarray, Y: np.ndarray) -> float:\n", + " Y_hat = self.predict(X)\n", + " return np.sum((Y - Y_hat)**2) / Y.shape[0]\n", + "\n", + " def _update_weights(self, X: np.ndarray, Y: np.ndarray, lr: float, wd: float) -> None:\n", + " '''\n", + " X: (n_samples, n_features)\n", + " Y: (n_samples,)\n", + " self.weights: (n_features + 1)\n", + " \n", + " Cost function is MSE: (y - W*X - b)**2;\n", + " its derivative with resp to any x is -2*X*(y - W*X - b),\n", + " and with resp to b is -2*(y - W*X - b).\n", + " \n", + " Regularisation function is L1 |W|;\n", + " its derivative is SIGN(w)\n", + " '''\n", + " predictions = self.predict(X)\n", + " error = Y - predictions # (n_samples,)\n", + " X_with_bias = np.concatenate((X.T, [[1]*X.shape[0]])).T\n", + " updates = -2 * X_with_bias.T @ error / Y.shape[0]\n", + " regression_term = np.sign(self.weights)\n", + "\n", + " self.weights = self.weights - lr * updates + wd * regression_term\n", + "\n", + " def fit(self, X: np.ndarray, Y: np.ndarray,\n", + " n_epochs: int, lr: float, wd: float,\n", + " silent: bool=False) -> None:\n", + " for i in range(n_epochs):\n", + " self._update_weights(X, Y, lr, wd)\n", + " mse = self.mse(X, Y)\n", + " if not silent:\n", + " print(f'epoch: {i}, \\t MSE: {mse}')\n", + "\n", + " def train(self, train_data, lr, wd, epochs):\n", + " # Initialize lists to store all data\n", + " all_X = []\n", + " all_Y = []\n", + "\n", + " # Iterate through the DataLoader to get all data\n", + " for batch_idx, (X_batch, Y_batch) in enumerate(train_data):\n", + " all_X.append(X_batch.numpy()) # Convert to NumPy and append\n", + " all_Y.append(Y_batch.numpy()) # Convert to NumPy and append\n", + "\n", + " # Concatenate all batches into NumPy arrays\n", + " X = np.concatenate(all_X, axis=0)\n", + " Y = np.concatenate(all_Y, axis=0)\n", + " self.fit(X, Y, epochs, lr, wd, silent=True)\n", + " train_mse = self.mse(X, Y)\n", + " return train_mse\n", + "\n", + "def validate(my_model, val_loader):\n", + " total_mse = 0.0\n", + " num_samples = 0\n", + "\n", + " for batch_idx, (X, Y) in enumerate(val_loader):\n", + " X = X.numpy()\n", + " Y = Y.numpy()\n", + "\n", + " total_mse += my_model.mse(X, Y) * len(X) # Multiply MSE by batch size\n", + " num_samples += len(X) # Count total number of samples\n", + "\n", + " validation_mse = total_mse / num_samples # Average MSE across all samples\n", + " return validation_mse" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define input array with angles from 60deg to 300deg converted to radians\n", + "x = np.array([i*np.pi/180 for i in range(60,300,4)])\n", + "np.random.seed(10) # Setting seed for reproducibility\n", + "y = np.sin(x) + np.random.normal(0,0.15,len(x))\n", + "#plt.plot(x,y,'.')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "lr_model = LinRegLasso(1)\n", + "wd = 0.0001\n", + "lr = 0.08\n", + "epochs = 100\n", + "\n", + "print(f\"Initial MSE: {lr_model.mse(x,y)}\")\n", + "lr_model.fit(x[:,np.newaxis],y, epochs, lr, wd, silent=True)\n", + "print(f\"Final MSE: {lr_model.mse(x,y)}\")\n", + "print(f\"Final parameters: {lr_model.weights}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Federated Averaging Function\n", + "\n", + "The following cell defines the `FedAvg` function, which simulates federated averaging for `LinRegLasso` models. This function takes a list of model instances from collaborators and optionally a list of weights for weighted averaging. It returns an updated global model with averaged weights." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def FedAvg(models, weights=None):\n", + " \"\"\"\n", + " Simulates federated averaging for LinRegLasso models.\n", + "\n", + " Args:\n", + " models (list): List of LinRegLasso model instances from collaborators.\n", + " weights (list, optional): List of weights for each model, used for weighted averaging. Defaults to None, indicating equal weights.\n", + "\n", + " Returns:\n", + " LinRegLasso: Updated global model with averaged weights.\n", + " \"\"\"\n", + " new_model = models[0] # Use the first model as a base\n", + " all_weights = [model.weights for model in models] # Get weights from all models\n", + " new_model.weights = np.average(all_weights, axis=0, weights=weights) # Average the weights\n", + " return new_model" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We define a federated learning workflow using the `FederatedFlow` class, which extends `FLSpec`. The workflow includes defining the tasks which will be executed by aggregator and collabarators." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from openfl.experimental.interface import FLSpec, Aggregator, Collaborator\n", + "from openfl.experimental.runtime import LocalRuntime\n", + "from openfl.experimental.placement import aggregator, collaborator\n", + "\n", + "class FederatedFlow(FLSpec):\n", + "\n", + " def __init__(self, model=None, optimizer=None, rounds=3, **kwargs):\n", + " super().__init__(**kwargs)\n", + " self.model = model\n", + " self.rounds = rounds\n", + " self.aggregated_mse_history = []\n", + " self.train_loss_history = []\n", + "\n", + " @aggregator\n", + " def start(self):\n", + " print(f'Performing initialization for model')\n", + " self.collaborators = self.runtime.collaborators\n", + " self.current_round = 0\n", + " self.next(self.aggregated_model_validation, foreach='collaborators')\n", + "\n", + " @collaborator\n", + " def aggregated_model_validation(self):\n", + " print(f'Performing aggregated model validation for collaborator {self.input}')\n", + " self.agg_validation_score = validate(self.model, self.test_loader)\n", + " print(f'{self.input} value of {self.agg_validation_score}')\n", + " self.next(self.train)\n", + "\n", + " @collaborator\n", + " def train(self):\n", + " self.wd = 0.0001\n", + " self.lr = 0.08\n", + " self.epochs = 100\n", + " self.loss = self.model.train(self.train_loader, self.lr, self.wd, self.epochs)\n", + " self.next(self.local_model_validation)\n", + "\n", + " @collaborator\n", + " def local_model_validation(self):\n", + " self.local_validation_score = validate(self.model, self.test_loader)\n", + " print(\n", + " f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')\n", + " self.next(self.join)\n", + "\n", + " @aggregator\n", + " def join(self, inputs):\n", + " self.average_train_loss = sum(input.loss for input in inputs) / len(inputs)\n", + " self.aggregated_model_mse = sum(\n", + " input.agg_validation_score for input in inputs) / len(inputs)\n", + " self.local_model_mse = sum(\n", + " input.local_validation_score for input in inputs) / len(inputs)\n", + " self.train_loss_history.append(self.average_train_loss)\n", + " self.aggregated_mse_history.append(self.aggregated_model_mse)\n", + " print(f'Average aggregated model validation values = {self.aggregated_model_mse}')\n", + " print(f'Average training loss = {self.average_train_loss}')\n", + " print(f'Average local model validation values = {self.local_model_mse}')\n", + " self.model = FedAvg([input.model for input in inputs])\n", + " \n", + " self.current_round += 1\n", + " if self.current_round < self.rounds:\n", + " self.next(self.aggregated_model_validation,\n", + " foreach='collaborators', exclude=['private'])\n", + " else:\n", + " self.next(self.end)\n", + "\n", + " @aggregator\n", + " def end(self):\n", + " print(f'This is the end of the flow')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we can generate the training and validation datasets and shard the dataset among the collaborators. This allows each collaborator to have their own subset of the data for federated learning. The `split` method provides data loaders for training and testing for each collaborator." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def generate_synthetic(rank: int, n_samples: int, noise: float):\n", + " \"\"\"\n", + " Generate synthetic data for linear regression.\n", + "\n", + " Args:\n", + " rank (int): Seed for random number generation.\n", + " n_samples (int): Number of samples to generate.\n", + " noise (float): Standard deviation of the Gaussian noise added to the y values.\n", + "\n", + " Returns:\n", + " np.ndarray: Generated x values.\n", + " np.ndarray: Generated y values.\n", + " \"\"\"\n", + " np.random.seed(rank)\n", + " n_samples = max(n_samples, 10)\n", + " interval = 240\n", + " x_start = 60\n", + "\n", + " x = np.random.rand(n_samples, 1) * interval + x_start\n", + " x *= np.pi / 180\n", + "\n", + " y = np.sin(x) + np.random.normal(0, noise, size=(n_samples,1))\n", + " y = y.reshape(-1)\n", + "\n", + " return x, y\n", + "\n", + "class SyntheticFederatedDataset:\n", + " def __init__(self, num_collaborators=2, batch_size=1, num_samples=10, **kwargs):\n", + " self.batch_size = batch_size\n", + " X, y = generate_synthetic(rank=42, n_samples=num_samples, noise=0.1)\n", + " X = np.array(X, dtype=np.float32)\n", + " y = np.array(y, dtype=np.float32)\n", + "\n", + " self.X_train_all = X[:int(0.8 * len(X))]\n", + " self.y_train_all = y[:int(0.8 * len(y))]\n", + " \n", + " self.X_test_all = X[int(0.8 * len(X)):]\n", + " self.y_test_all = y[int(0.8 * len(y)):]\n", + "\n", + " min_samples = max(len(self.X_train_all) // num_collaborators, 1)\n", + "\n", + " while len(self.X_test_all) < num_collaborators or any(len(chunk) < 1 for chunk in self.X_train_all):\n", + " X, y = generate_synthetic(rank=42, n_samples=len(self.X_train_all) + len(self.X_valid_all) + 1, noise=0.1)\n", + " X = np.array(X, dtype=np.float32)\n", + " y = np.array(y, dtype=np.float32)\n", + " self.X_train_all = X[:int(0.9 * len(X))]\n", + " self.X_test_all = X[int(0.9 * len(X)):]\n", + " self.y_train_all = y[:int(0.9 * len(y))]\n", + " self.y_test_all = y[int(0.9 * len(y)):]\n", + "\n", + " self.X_train_all = np.array_split(self.X_train_all, num_collaborators)\n", + " self.X_test_all = np.array_split(self.X_test_all, num_collaborators)\n", + " self.y_train_all = np.array_split(self.y_train_all, num_collaborators)\n", + " self.y_test_all = np.array_split(self.y_test_all, num_collaborators)\n", + "\n", + " def split(self, index):\n", + " return {\n", + " \"train_loader\":\n", + " data.DataLoader(\n", + " data.TensorDataset(\n", + " pt.from_numpy(self.X_train_all[index]),\n", + " pt.from_numpy(self.y_train_all[index])\n", + " ),\n", + " batch_size=self.batch_size, shuffle=True\n", + " ),\n", + " \"test_loader\":\n", + " data.DataLoader(\n", + " data.TensorDataset(\n", + " pt.from_numpy(self.X_test_all[index]),\n", + " pt.from_numpy(self.y_test_all[index])\n", + " ),\n", + " batch_size=self.batch_size, shuffle=True\n", + " )\n", + " }" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We will set up the aggregator and collaborators for the federated learning and initialize the local runtime environment. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "aggregator = Aggregator()\n", + "aggregator.private_attributes = {}\n", + "\n", + "collaborator_names = ['Bangalore', 'Paris', 'Texas', 'Seoul']\n", + "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", + "synthetic_federated_dataset = SyntheticFederatedDataset(num_collaborators=len(collaborator_names), num_samples=2000, batch_size=20)\n", + "\n", + "def callable_to_initialize_collaborator_private_attributes(index):\n", + " private_attributes = synthetic_federated_dataset.split(index)\n", + " return private_attributes\n", + "\n", + "collaborators = []\n", + "for idx, collaborator_name in enumerate(collaborator_names):\n", + " collaborators.append(\n", + " Collaborator(\n", + " name=collaborator_name, num_cpus=1.0, num_gpus=0.0,\n", + " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + " index=idx\n", + " )\n", + " )\n", + "\n", + "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')\n", + "print(f'Local runtime collaborators = {local_runtime.collaborators}')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now that we have our flow and runtime defined, let's run the experiment!" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model = lr_model\n", + "optimizer = None\n", + "flflow = FederatedFlow(model, optimizer, rounds=10, checkpoint=True)\n", + "flflow.runtime = local_runtime\n", + "flflow.run()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(f'\\nFinal aggregated MSE for {flflow.rounds} rounds of training: {flflow.aggregated_model_mse}')\n", + "print(f'\\nFinal loss model for {flflow.rounds} rounds of training: {flflow.train_loss_history}')\n", + "print(f'\\nFinal parameters: {flflow.model.weights}')\n", + "print(f'\\n Aggregated model MSE History : {flflow.aggregated_mse_history}')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Plot the training Mean Squared Error (MSE) over the training rounds." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plt.plot(range(1, flflow.rounds + 1), flflow.train_loss_history)\n", + "plt.xlabel('Epoch')\n", + "plt.ylabel('Loss (MSE)')\n", + "plt.title('Loss Function during Training')\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Plot the aggregated model Mean Squared Error (MSE) over the training rounds." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plt.plot(range(1, flflow.rounds + 1), flflow.aggregated_mse_history)\n", + "plt.xlabel('Round')\n", + "plt.ylabel('Aggregated Model MSE')\n", + "plt.title('Aggregated Model MSE over Rounds')\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we can validate how our final trained model performs on any random dataset." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "n_cols = 20\n", + "n_samples = 4\n", + "interval = 240\n", + "x_start = 60\n", + "noise = 0.3\n", + "\n", + "X = None\n", + "final_model = flflow.model # Get the final model after training\n", + "for rank in range(n_cols):\n", + " np.random.seed(rank) # Setting seed for reproducibility\n", + " x = np.random.rand(n_samples, 1) * interval + x_start\n", + " x *= np.pi / 180\n", + " X = x if X is None else np.vstack((X,x))\n", + " y = np.sin(x) + np.random.normal(0, noise, size=(n_samples, 1))\n", + " plt.plot(x,y,'+')\n", + " \n", + "X.sort() \n", + "Y_hat = final_model.predict(X)\n", + "plt.plot(X,Y_hat,'--')" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}