Skip to content

Commit

Permalink
Merge branch 'develop' into karansh1/client_auth_bug
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterSkepticista authored Nov 22, 2024
2 parents 94fd85b + 728f665 commit 0b05d27
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 29 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/task_runner_e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
run: |
python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \
-m ${{ env.MODEL_NAME }} --model_name ${{ env.MODEL_NAME }} \
--num_rounds $NUM_ROUNDS --num_collaborators $NUM_COLLABORATORS
--num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }}
echo "Task runner end to end test run completed"
- name: Print test summary
Expand Down Expand Up @@ -140,7 +140,7 @@ jobs:
run: |
python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \
-m ${{ env.MODEL_NAME }} --model_name ${{ env.MODEL_NAME }} \
--num_rounds $NUM_ROUNDS --num_collaborators $NUM_COLLABORATORS --disable_tls
--num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls
echo "Task runner end to end test run completed"
- name: Print test summary
Expand Down
16 changes: 14 additions & 2 deletions openfl/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from openfl.pipelines import NoCompressionPipeline, TensorCodec
from openfl.protocols import base_pb2, utils
from openfl.utilities import TaskResultKey, TensorKey, change_tags
from openfl.utilities.logs import write_metric
from openfl.utilities.logs import get_memory_usage, write_metric


class Aggregator:
Expand Down Expand Up @@ -76,6 +76,7 @@ def __init__(
compression_pipeline=None,
db_store_rounds=1,
write_logs=False,
log_memory_usage=False,
log_metric_callback=None,
**kwargs,
):
Expand Down Expand Up @@ -123,7 +124,9 @@ def __init__(
)
self._end_of_round_check_done = [False] * rounds_to_train
self.stragglers = []

# Flag can be enabled to get memory usage details for ubuntu system
self.log_memory_usage = log_memory_usage
self.memory_details = []
self.rounds_to_train = rounds_to_train

# if the collaborator requests a delta, this value is set to true
Expand Down Expand Up @@ -969,6 +972,13 @@ def _end_of_round_check(self):
for task_name in all_tasks:
self._compute_validation_related_task_metrics(task_name)

if self.log_memory_usage:
# This is the place to check the memory usage of the aggregator
memory_detail = get_memory_usage()
memory_detail["round_number"] = self.round_number
memory_detail["metric_origin"] = "aggregator"
self.memory_details.append(memory_detail)

# Once all of the task results have been processed
self._end_of_round_check_done[self.round_number] = True

Expand All @@ -984,6 +994,8 @@ def _end_of_round_check(self):

# TODO This needs to be fixed!
if self._time_to_quit():
if self.log_memory_usage:
self.logger.info(f"Publish memory usage: {self.memory_details}")
self.logger.info("Experiment Completed. Cleaning up...")
else:
self.logger.info("Starting round %s...", self.round_number)
Expand Down
15 changes: 13 additions & 2 deletions openfl/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


"""Collaborator module."""

from enum import Enum
from logging import getLogger
from time import sleep
Expand All @@ -13,6 +12,7 @@
from openfl.pipelines import NoCompressionPipeline, TensorCodec
from openfl.protocols import utils
from openfl.utilities import TensorKey
from openfl.utilities.logs import get_memory_usage


class DevicePolicy(Enum):
Expand Down Expand Up @@ -80,6 +80,7 @@ def __init__(
delta_updates=False,
compression_pipeline=None,
db_store_rounds=1,
log_memory_usage=False,
**kwargs,
):
"""Initialize the Collaborator object.
Expand Down Expand Up @@ -123,7 +124,8 @@ def __init__(
self.delta_updates = delta_updates

self.client = client

# Flag can be enabled to get memory usage details for ubuntu system
self.log_memory_usage = log_memory_usage
self.task_config = task_config

self.logger = getLogger(__name__)
Expand Down Expand Up @@ -158,6 +160,7 @@ def set_available_devices(self, cuda: Tuple[str] = ()):

def run(self):
"""Run the collaborator."""
memory_details = []
while True:
tasks, round_number, sleep_time, time_to_quit = self.get_tasks()
if time_to_quit:
Expand All @@ -171,6 +174,14 @@ def run(self):

# Cleaning tensor db
self.tensor_db.clean_up(self.db_store_rounds)
if self.log_memory_usage:
# This is the place to check the memory usage of the collaborator
memory_detail = get_memory_usage()
memory_detail["round_number"] = round_number
memory_details["metric_origin"] = self.collaborator_name
memory_details.append(memory_detail)
if self.log_memory_usage:
self.logger.info(f"Publish memory usage: {memory_details}")

self.logger.info("End of Federation reached. Exiting...")

Expand Down
35 changes: 35 additions & 0 deletions openfl/utilities/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
"""Logs utilities."""

import logging
import os

import psutil
from rich.console import Console
from rich.logging import RichHandler
from tensorboardX import SummaryWriter
Expand Down Expand Up @@ -57,3 +59,36 @@ def setup_loggers(log_level=logging.INFO):
formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] - %(message)s")
handler.setFormatter(formatter)
root.addHandler(handler)


def get_memory_usage() -> dict:
"""Return memory usage details of the current process.
Returns:
dict: A dictionary containing memory usage details.
"""
process = psutil.Process(os.getpid())
virtual_memory = psutil.virtual_memory()
swap_memory = psutil.swap_memory()
memory_usage = {
"process_memory": round(process.memory_info().rss / (1024**2), 2),
"virtual_memory": {
"total": round(virtual_memory.total / (1024**2), 2),
"available": round(virtual_memory.available / (1024**2), 2),
"percent": virtual_memory.percent,
"used": round(virtual_memory.used / (1024**2), 2),
"free": round(virtual_memory.free / (1024**2), 2),
"active": round(virtual_memory.active / (1024**2), 2),
"inactive": round(virtual_memory.inactive / (1024**2), 2),
"buffers": round(virtual_memory.buffers / (1024**2), 2),
"cached": round(virtual_memory.cached / (1024**2), 2),
"shared": round(virtual_memory.shared / (1024**2), 2),
},
"swap_memory": {
"total": round(swap_memory.total / (1024**2), 2),
"used": round(swap_memory.used / (1024**2), 2),
"free": round(swap_memory.free / (1024**2), 2),
"percent": swap_memory.percent,
},
}
return memory_usage
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def run(self):
'docker',
'dynaconf==3.2.6',
'flatten_json',
'grpcio>=1.56.2',
'grpcio>=1.56.2,<1.66.0',
'ipykernel',
'jupyterlab',
'numpy',
Expand Down
14 changes: 11 additions & 3 deletions tests/end_to_end/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Define a named tuple to store the objects for model owner, aggregator, and collaborators
federation_fixture = collections.namedtuple(
"federation_fixture",
"model_owner, aggregator, collaborators, model_name, disable_client_auth, disable_tls, workspace_path, results_dir",
"model_owner, aggregator, collaborators, model_name, disable_client_auth, disable_tls, workspace_path, results_dir, num_rounds",
)


Expand Down Expand Up @@ -62,6 +62,11 @@ def pytest_addoption(parser):
action="store_true",
help="Disable TLS for communication",
)
parser.addoption(
"--log_memory_usage",
action="store_true",
help="Enable memory log in collaborators and aggregator",
)


@pytest.fixture(scope="session", autouse=True)
Expand Down Expand Up @@ -234,14 +239,16 @@ def fx_federation(request, pytestconfig):
num_rounds = args.num_rounds
disable_client_auth = args.disable_client_auth
disable_tls = args.disable_tls
log_memory_usage = args.log_memory_usage

log.info(
f"Running federation setup using Task Runner API on single machine with below configurations:\n"
f"\tNumber of collaborators: {num_collaborators}\n"
f"\tNumber of rounds: {num_rounds}\n"
f"\tModel name: {model_name}\n"
f"\tClient authentication: {not disable_client_auth}\n"
f"\tTLS: {not disable_tls}"
f"\tTLS: {not disable_tls}\n"
f"\tMemory Logs: {log_memory_usage}"
)

# Validate the model name and create the workspace name
Expand All @@ -251,7 +258,7 @@ def fx_federation(request, pytestconfig):
workspace_name = f"workspace_{model_name}"

# Create model owner object and the workspace for the model
model_owner = participants.ModelOwner(workspace_name, model_name)
model_owner = participants.ModelOwner(workspace_name, model_name, log_memory_usage)
try:
workspace_path = model_owner.create_workspace(results_dir=results_dir)
except Exception as e:
Expand Down Expand Up @@ -318,4 +325,5 @@ def fx_federation(request, pytestconfig):
disable_tls=disable_tls,
workspace_path=workspace_path,
results_dir=results_dir,
num_rounds=num_rounds,
)
8 changes: 7 additions & 1 deletion tests/end_to_end/models/participants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ class ModelOwner:
4. Importing and exporting the workspace etc.
"""

def __init__(self, workspace_name, model_name):
def __init__(self, workspace_name, model_name, log_memory_usage):
"""
Initialize the ModelOwner class
Args:
workspace_name (str): Workspace name
model_name (str): Model name
log_memory_usage (bool): Memory Log flag
"""
self.workspace_name = workspace_name
self.model_name = model_name
Expand All @@ -38,6 +39,7 @@ def __init__(self, workspace_name, model_name):
self.plan_path = None
self.num_collaborators = constants.NUM_COLLABORATORS
self.rounds_to_train = constants.NUM_ROUNDS
self.log_memory_usage = log_memory_usage

def create_workspace(self, results_dir=None):
"""
Expand Down Expand Up @@ -132,6 +134,10 @@ def modify_plan(self, new_rounds=None, num_collaborators=None, disable_client_au
data = yaml.load(fp, Loader=yaml.FullLoader)

data["aggregator"]["settings"]["rounds_to_train"] = int(self.rounds_to_train)
# Memory Leak related
data["aggregator"]["settings"]["log_memory_usage"] = self.log_memory_usage
data["collaborator"]["settings"]["log_memory_usage"] = self.log_memory_usage

data["data_loader"]["settings"]["collaborator_count"] = int(self.num_collaborators)
data["network"]["settings"]["disable_client_auth"] = disable_client_auth
data["network"]["settings"]["tls"] = not disable_tls
Expand Down
2 changes: 2 additions & 0 deletions tests/end_to_end/utils/conftest_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def parse_arguments():
- model_name (str, default="torch_cnn_mnist"): Model name
- disable_client_auth (bool): Disable client authentication
- disable_tls (bool): Disable TLS for communication
- log_memory_usage (bool): Enable Memory leak logs
Raises:
SystemExit: If the required arguments are not provided or if any argument parsing error occurs.
Expand All @@ -32,6 +33,7 @@ def parse_arguments():
parser.add_argument("--model_name", type=str, help="Model name")
parser.add_argument("--disable_client_auth", action="store_true", help="Disable client authentication")
parser.add_argument("--disable_tls", action="store_true", help="Disable TLS for communication")
parser.add_argument("--log_memory_usage", action="store_true", help="Enable Memory leak logs")
args = parser.parse_known_args()[0]
return args

Expand Down
5 changes: 3 additions & 2 deletions tests/end_to_end/utils/federation_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def verify_federation_run_completion(fed_obj, results):
executor.submit(
_verify_completion_for_participant,
participant,
fed_obj.num_rounds,
results[i]
)
for i, participant in enumerate(fed_obj.collaborators + [fed_obj.aggregator])
Expand All @@ -99,7 +100,7 @@ def verify_federation_run_completion(fed_obj, results):
return all(results)


def _verify_completion_for_participant(participant, result_file):
def _verify_completion_for_participant(participant, num_rounds, result_file, time_for_each_round=100):
"""
Verify the completion of the process for the participant
Args:
Expand All @@ -109,7 +110,7 @@ def _verify_completion_for_participant(participant, result_file):
bool: True if successful, else False
"""
# Wait for the successful output message to appear in the log till timeout
timeout = 900 # in seconds
timeout = 300 + ( time_for_each_round * num_rounds ) # in seconds
log.info(f"Printing the last line of the log file for {participant.name} to track the progress")
with open(result_file, 'r') as file:
content = file.read()
Expand Down
34 changes: 24 additions & 10 deletions tests/end_to_end/utils/summary_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from lxml import etree
import os

import tests.end_to_end.utils.constants as constants

# Initialize the XML parser
parser = etree.XMLParser(recover=True, encoding='utf-8')
tree = ET.parse("results/results.xml", parser=parser)
Expand Down Expand Up @@ -101,29 +103,41 @@ def get_testcase_result():
return database_list


if __name__ == "__main__":
def main():
"""
Main function to get the test case results and aggregator logs
And write the results to GitHub step summary
IMP: Do not fail the test in any scenario
"""
result = get_testcase_result()

if not all([os.getenv(var) for var in ["NUM_COLLABORATORS", "NUM_ROUNDS", "MODEL_NAME", "GITHUB_STEP_SUMMARY"]]):
print("One or more environment variables not set. Skipping writing to GitHub step summary")
return

num_cols = os.getenv("NUM_COLLABORATORS")
num_rounds = os.getenv("NUM_ROUNDS")
model_name = os.getenv("MODEL_NAME")
summary_file = os.getenv("GITHUB_STEP_SUMMARY")

if not model_name:
print("MODEL_NAME is not set, cannot find out aggregator logs")
agg_accuracy = "Not Found"
else:
workspace_name = "workspace_" + model_name
agg_log_file = os.path.join("results", workspace_name, "aggregator.log")
agg_accuracy = get_aggregated_accuracy(agg_log_file)
# Validate the model name and create the workspace name
if not model_name.upper() in constants.ModelName._member_names_:
print(f"Invalid model name: {model_name}. Skipping writing to GitHub step summary")
return

# Write the results to GitHub step summary
with open(os.getenv('GITHUB_STEP_SUMMARY'), 'a') as fh:
workspace_name = "workspace_" + model_name
agg_log_file = os.path.join("results", workspace_name, "aggregator.log")
agg_accuracy = get_aggregated_accuracy(agg_log_file)

# Write the results to GitHub step summary file
# This file is created at runtime by the GitHub action, thus we cannot verify its existence beforehand
with open(summary_file, 'a') as fh:
# DO NOT change the print statements
print("| Name | Time (in seconds) | Result | Error (if any) | Collaborators | Rounds to train | Score (if applicable) |", file=fh)
print("| ------------- | ------------- | ------------- | ------------- | ------------- | ------------- | ------------- |", file=fh)
for item in result:
print(f"| {item['name']} | {item['time']} | {item['result']} | {item['err_msg']} | {num_cols} | {num_rounds} | {agg_accuracy} |", file=fh)


if __name__ == "__main__":
main()
Loading

0 comments on commit 0b05d27

Please sign in to comment.