Skip to content

Commit

Permalink
Merge branch 'scikit-workflow' of https://github.com/rajithkrishnegow…
Browse files Browse the repository at this point in the history
…da/openfl into scikit-workflow
  • Loading branch information
rajithkrishnegowda committed Nov 21, 2024
2 parents 85d1d88 + 1cfaa77 commit c734c73
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 131 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/task_runner_e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
run: |
python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \
-m ${{ env.MODEL_NAME }} --model_name ${{ env.MODEL_NAME }} \
--num_rounds $NUM_ROUNDS --num_collaborators $NUM_COLLABORATORS
--num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }}
echo "Task runner end to end test run completed"
- name: Print test summary
Expand Down Expand Up @@ -140,7 +140,7 @@ jobs:
run: |
python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \
-m ${{ env.MODEL_NAME }} --model_name ${{ env.MODEL_NAME }} \
--num_rounds $NUM_ROUNDS --num_collaborators $NUM_COLLABORATORS --disable_tls
--num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls
echo "Task runner end to end test run completed"
- name: Print test summary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
"id": "9a61e30e-83d1-422d-aa0a-ebd4376197ec",
"metadata": {},
"source": [
"# Scikit-learn Linear Regression Tutorial using Workflow Interface with Ridge Regularization\n",
"## Scikit-learn Linear Regression Tutorial using Workflow Interface with Ridge Regularization\n",
"\n",
"\n",
"This tutorial demonstrates how to train a linear regression model using scikit-learn with Ridge regularization on a dataset, leveraging the new FedAI Workflow Interface. The Workflow Interface provides a novel way to compose federated learning experiments with OpenFL, enabling researchers to handle non-IID data and perform federated averaging. Through this tutorial, you will learn how to set up the federated learning environment, define the flow, and execute the training process across multiple collaborators."
"This tutorial demonstrates how to train a linear regression model using scikit-learn with Ridge regularization on a dataset, leveraging the new FedAI Workflow Interface. The Workflow Interface provides a novel way to compose federated learning experiments with OpenFL, enabling researchers to handle non-IID data and perform federated averaging with optimizations like FedProx. Through this tutorial, you will learn how to set up the federated learning environment, define the flow, and execute the training process across multiple collaborators\n",
"\n"
]
},
{
Expand All @@ -18,7 +19,7 @@
"tags": []
},
"source": [
"## We will use MSE as loss function and Ridge weights regularization\n",
"# We will use MSE as loss function and Ridge weights regularization\n",
"![image.png](https://www.analyticsvidhya.com/wp-content/uploads/2016/01/eq5-1.png)"
]
},
Expand Down Expand Up @@ -155,8 +156,8 @@
},
"outputs": [],
"source": [
"# Define input array with angles from 60deg to 400deg converted to radians\n",
"x = np.array([i*np.pi/180 for i in range(60,400,4)])\n",
"# Define input array with angles from 60deg to 300deg converted to radians\n",
"x = np.array([i*np.pi/180 for i in range(60,300,4)])\n",
"np.random.seed(10) # Setting seed for reproducibility\n",
"y = np.sin(x) + np.random.normal(0,0.15,len(x))\n",
"# plt.plot(x,y,'.')"
Expand Down Expand Up @@ -221,12 +222,20 @@
"## Now we run the same training on federated learning workflow api"
]
},
{
"cell_type": "markdown",
"id": "66cb4ecf-cb27-4367-891a-6cea2f1d347f",
"metadata": {},
"source": [
"## Test on a Federation"
]
},
{
"cell_type": "markdown",
"id": "08527aab-4b0f-472b-af0c-27ed4ade85c1",
"metadata": {},
"source": [
"## Import required libraries for federated learning"
"## Import necessary libraries for federated learning"
]
},
{
Expand All @@ -238,10 +247,11 @@
},
"outputs": [],
"source": [
"# Import ncessary libraries\n",
"# Import necessary libraries for federated learning\n",
"import numpy as np\n",
"from sklearn.linear_model import Lasso\n",
"from sklearn.metrics import mean_squared_error\n",
"from sklearn.datasets import make_regression\n",
"from openfl.experimental.interface import FLSpec\n",
"from openfl.experimental.placement import aggregator, collaborator\n",
"from openfl.experimental.runtime import FederatedRuntime\n",
Expand All @@ -259,12 +269,35 @@
]
},
{
"cell_type": "markdown",
"id": "052a4195-a410-4983-84b9-942159d9d345",
"cell_type": "code",
"execution_count": null,
"id": "42187367-6a96-4d78-a576-f7154e8f987f",
"metadata": {},
"outputs": [],
"source": [
"## Federated Learning Helper Functions\n",
"Define helper functions for training and validating the federated models."
"# Define a class to split the dataset into shards\n",
"class ShardSplitter:\n",
" def __init__(self, num_shards):\n",
" self.num_shards = num_shards\n",
"\n",
" def split(self, X, y):\n",
" \"\"\"Split the given 2D numpy arrays X and y into equal shards and return list of indexes for each shard.\"\"\"\n",
" num_samples = X.shape[0]\n",
" shard_size = num_samples // self.num_shards\n",
" indexes = np.arange(num_samples)\n",
" #np.random.shuffle(indexes)\n",
" \n",
" shards = []\n",
" for i in range(self.num_shards):\n",
" start_idx = i * shard_size\n",
" if i == self.num_shards - 1:\n",
" # Include any remaining samples in the last shard\n",
" end_idx = num_samples\n",
" else:\n",
" end_idx = start_idx + shard_size\n",
" shards.append(indexes[start_idx:end_idx])\n",
" \n",
" return shards"
]
},
{
Expand Down Expand Up @@ -318,89 +351,42 @@
},
{
"cell_type": "markdown",
"id": "a5cf193b-62a2-403a-96b1-5fa716d9087f",
"id": "8a3aefde-45a3-4bba-8b5b-6040808de966",
"metadata": {},
"source": [
"## Shard Splitter Class\n",
"Define a helper class to split the data into shards for federated learning."
"## Define the federated learning workflow"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "42187367-6a96-4d78-a576-f7154e8f987f",
"id": "c04b4ab2-1d40-44c7-907b-a6a7d176c959",
"metadata": {},
"outputs": [],
"source": [
"# Define a class to split the dataset into shards\n",
"class ShardSplitter:\n",
" def __init__(self, num_shards):\n",
" self.num_shards = num_shards\n",
"# Define the federated learning workflow\n",
"from openfl.experimental.placement import aggregator, collaborator\n",
"\n",
" def split(self, X, y):\n",
" \"\"\"Split the given 2D numpy arrays X and y into equal shards and return list of indexes for each shard.\"\"\"\n",
" num_samples = X.shape[0]\n",
" shard_size = num_samples // self.num_shards\n",
" indexes = np.arange(num_samples)\n",
" #np.random.shuffle(indexes)\n",
" \n",
" shards = []\n",
" for i in range(self.num_shards):\n",
" start_idx = i * shard_size\n",
" if i == self.num_shards - 1:\n",
" # Include any remaining samples in the last shard\n",
" end_idx = num_samples\n",
" else:\n",
" end_idx = start_idx + shard_size\n",
" shards.append(indexes[start_idx:end_idx])\n",
" \n",
" return shards"
]
},
{
"cell_type": "markdown",
"id": "a915fa13-fc10-4b02-808c-ad2c05cee297",
"metadata": {},
"source": [
"## Define Federated Averaging Method\n",
"The FedAvg method is used to average the models from all the collaborators after training."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "65e2b0a2-5a79-4f92-b255-4c8d3e28b635",
"metadata": {},
"outputs": [],
"source": [
"def inference(model, test_loader, batch_size):\n",
" x_test, y_test = test_loader\n",
" loss, accuracy = model.evaluate(\n",
" x_test,\n",
" y_test,\n",
" batch_size=batch_size,\n",
" verbose=1\n",
" )\n",
" accuracy_percentage = accuracy * 100\n",
" print(f\"Test set: Avg. loss: {loss}, Accuracy: {accuracy_percentage:.2f}%\")\n",
" return accuracy\n",
" \n",
"# Federated Averaging for Lasso models\n",
"def FedAvg(models):\n",
" new_model = models[0]\n",
" coef_list = [model.model.coef_ for model in models]\n",
" intercept_list = [model.model.intercept_ for model in models]\n",
" new_model.coef_ = np.mean(coef_list, axis=0)\n",
" new_model.intercept_ = np.mean(intercept_list, axis=0)\n",
" return new_model"
]
},
{
"cell_type": "markdown",
"id": "8a3aefde-45a3-4bba-8b5b-6040808de966",
"metadata": {},
"source": [
"## Define Federated Learning Workflow\n",
"Define the workflow for federated learning using OpenFL's FLSpec."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c04b4ab2-1d40-44c7-907b-a6a7d176c959",
"metadata": {},
"outputs": [],
"source": [
"# Define the federated learning workflow\n",
"from openfl.experimental.placement import aggregator, collaborator\n",
" return new_model\n",
"\n",
"# Federated Learning Workflow using OpenFL's Workflow API\n",
"class FederatedLassoFlow(FLSpec):\n",
Expand Down Expand Up @@ -477,15 +463,6 @@
" print(f\"Final aggregated model MSE on test data: {final_mse:.4f}\")"
]
},
{
"cell_type": "markdown",
"id": "3255cac0-caf3-4713-beef-9ff32fe73372",
"metadata": {},
"source": [
"## Start the Federated Learning Process\n",
"Create an instance of FederatedLassoFlow and run it with the new larger dataset."
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -502,24 +479,6 @@
"# Start the federated learning process\n",
"federated_flow.run()"
]
},
{
"cell_type": "markdown",
"id": "ddfb3ac6-d14a-4e00-83a9-04195b1efdf8",
"metadata": {},
"source": [
"## 🎉 Congratulations! 🎉"
]
},
{
"cell_type": "markdown",
"id": "cbb985a7-1a10-410e-99d3-cebdf2b809a2",
"metadata": {},
"source": [
"Now that you've completed workflow interface notebook for **scikit-learn Linear Regression** using federated learning.\n",
"\n",
"### Happy learning and happy coding with OpenFL! 🎉"
]
}
],
"metadata": {
Expand Down
16 changes: 14 additions & 2 deletions openfl/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from openfl.pipelines import NoCompressionPipeline, TensorCodec
from openfl.protocols import base_pb2, utils
from openfl.utilities import TaskResultKey, TensorKey, change_tags
from openfl.utilities.logs import write_metric
from openfl.utilities.logs import get_memory_usage, write_metric


class Aggregator:
Expand Down Expand Up @@ -76,6 +76,7 @@ def __init__(
compression_pipeline=None,
db_store_rounds=1,
write_logs=False,
log_memory_usage=False,
log_metric_callback=None,
**kwargs,
):
Expand Down Expand Up @@ -123,7 +124,9 @@ def __init__(
)
self._end_of_round_check_done = [False] * rounds_to_train
self.stragglers = []

# Flag can be enabled to get memory usage details for ubuntu system
self.log_memory_usage = log_memory_usage
self.memory_details = []
self.rounds_to_train = rounds_to_train

# if the collaborator requests a delta, this value is set to true
Expand Down Expand Up @@ -969,6 +972,13 @@ def _end_of_round_check(self):
for task_name in all_tasks:
self._compute_validation_related_task_metrics(task_name)

if self.log_memory_usage:
# This is the place to check the memory usage of the aggregator
memory_detail = get_memory_usage()
memory_detail["round_number"] = self.round_number
memory_detail["metric_origin"] = "aggregator"
self.memory_details.append(memory_detail)

# Once all of the task results have been processed
self._end_of_round_check_done[self.round_number] = True

Expand All @@ -984,6 +994,8 @@ def _end_of_round_check(self):

# TODO This needs to be fixed!
if self._time_to_quit():
if self.log_memory_usage:
self.logger.info(f"Publish memory usage: {self.memory_details}")
self.logger.info("Experiment Completed. Cleaning up...")
else:
self.logger.info("Starting round %s...", self.round_number)
Expand Down
15 changes: 13 additions & 2 deletions openfl/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


"""Collaborator module."""

from enum import Enum
from logging import getLogger
from time import sleep
Expand All @@ -13,6 +12,7 @@
from openfl.pipelines import NoCompressionPipeline, TensorCodec
from openfl.protocols import utils
from openfl.utilities import TensorKey
from openfl.utilities.logs import get_memory_usage


class DevicePolicy(Enum):
Expand Down Expand Up @@ -80,6 +80,7 @@ def __init__(
delta_updates=False,
compression_pipeline=None,
db_store_rounds=1,
log_memory_usage=False,
**kwargs,
):
"""Initialize the Collaborator object.
Expand Down Expand Up @@ -123,7 +124,8 @@ def __init__(
self.delta_updates = delta_updates

self.client = client

# Flag can be enabled to get memory usage details for ubuntu system
self.log_memory_usage = log_memory_usage
self.task_config = task_config

self.logger = getLogger(__name__)
Expand Down Expand Up @@ -158,6 +160,7 @@ def set_available_devices(self, cuda: Tuple[str] = ()):

def run(self):
"""Run the collaborator."""
memory_details = []
while True:
tasks, round_number, sleep_time, time_to_quit = self.get_tasks()
if time_to_quit:
Expand All @@ -171,6 +174,14 @@ def run(self):

# Cleaning tensor db
self.tensor_db.clean_up(self.db_store_rounds)
if self.log_memory_usage:
# This is the place to check the memory usage of the collaborator
memory_detail = get_memory_usage()
memory_detail["round_number"] = round_number
memory_details["metric_origin"] = self.collaborator_name
memory_details.append(memory_detail)
if self.log_memory_usage:
self.logger.info(f"Publish memory usage: {memory_details}")

self.logger.info("End of Federation reached. Exiting...")

Expand Down
Loading

0 comments on commit c734c73

Please sign in to comment.