From beccad7e2c8eeab2172117f9a8823a22305488eb Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Tue, 19 Nov 2024 08:15:08 -0800 Subject: [PATCH 01/11] Memory Leak Logs Signed-off-by: Chaurasiya, Payal --- openfl/component/aggregator/aggregator.py | 59 +++++++++++++++- openfl/component/collaborator/collaborator.py | 69 ++++++++++++++++++- tests/end_to_end/conftest.py | 14 +++- tests/end_to_end/models/participants.py | 8 ++- tests/end_to_end/utils/conftest_helper.py | 2 + tests/end_to_end/utils/federation_helper.py | 5 +- 6 files changed, 147 insertions(+), 10 deletions(-) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 0ec816276b..27bf7677ee 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -5,6 +5,8 @@ """Aggregator module.""" import queue import time +import psutil +import json from logging import getLogger from threading import Lock @@ -16,6 +18,7 @@ from openfl.utilities import TaskResultKey, TensorKey, change_tags from openfl.utilities.logs import write_metric +AGG_MEM_FILE_NAME = "agg_mem_details.json" class Aggregator: """An Aggregator is the central node in federated learning. @@ -75,6 +78,7 @@ def __init__( compression_pipeline=None, db_store_rounds=1, write_logs=False, + log_memory_usage=False, log_metric_callback=None, **kwargs, ): @@ -122,7 +126,8 @@ def __init__( ) self._end_of_round_check_done = [False] * rounds_to_train self.stragglers = [] - + self.log_memory_usage = log_memory_usage + self.memory_details = [] self.rounds_to_train = rounds_to_train # if the collaborator requests a delta, this value is set to true @@ -667,6 +672,49 @@ def send_local_task_results( self._end_of_round_with_stragglers_check() + def get_memory_usage(self, round_number, metric_origin): + """Logs the memory usage statistics for the given round number. + + This method retrieves the current virtual and swap memory usage statistics + using the psutil library, formats them into a dictionary, and logs the + information using the logger. + + Args: + round_number (int): The current round number for which memory usage is being logged. + """ + process = psutil.Process() + self.logger.info(f"{metric_origin} process id is {process}") + virtual_memory = psutil.virtual_memory() + swap_memory = psutil.swap_memory() + memory_usage = { + "round_number": round_number, + "metric_origin": metric_origin, + "process_memory": round(process.memory_info().rss / (1024 ** 2),2), + "virtual_memory": { + "total": round(virtual_memory.total / (1024 ** 2), 2), + "available": round(virtual_memory.available / (1024 ** 2), 2), + "percent": virtual_memory.percent, + "used": round(virtual_memory.used / (1024 ** 2), 2), + "free": round(virtual_memory.free / (1024 ** 2), 2), + "active": round(virtual_memory.active / (1024 ** 2), 2), + "inactive": round(virtual_memory.inactive / (1024 ** 2), 2), + "buffers": round(virtual_memory.buffers / (1024 ** 2), 2), + "cached": round(virtual_memory.cached / (1024 ** 2), 2), + "shared": round(virtual_memory.shared / (1024 ** 2), 2), + }, + "swap_memory": { + "total": round(swap_memory.total / (1024 ** 2), 2), + "used": round(swap_memory.used / (1024 ** 2), 2), + "free": round(swap_memory.free / (1024 ** 2), 2), + "percent": swap_memory.percent, + }, + } + self.logger.info(f"**************** End of round check: {metric_origin} Memory Logs ******************") + self.logger.info("Memory Usage: %s", memory_usage) + self.logger.info("*************************************************************************************") + + return memory_usage + def _end_of_round_with_stragglers_check(self): """ Checks if the minimum required collaborators have reported their results, @@ -852,7 +900,7 @@ def _prepare_trained(self, tensor_name, origin, round_number, report, agg_result new_model_round_number, new_model_report, new_model_tags, - ) = new_model_tk + ) = new_model_tk final_model_tk = TensorKey( new_model_tensor_name, new_model_origin, @@ -965,6 +1013,8 @@ def _end_of_round_check(self): all_tasks = self.assigner.get_all_tasks_for_round(self.round_number) for task_name in all_tasks: self._compute_validation_related_task_metrics(task_name) + memory_detail = self.get_memory_usage(self.round_number, "aggregator") + self.memory_details.append(memory_detail) # Once all of the task results have been processed self._end_of_round_check_done[self.round_number] = True @@ -981,6 +1031,11 @@ def _end_of_round_check(self): # TODO This needs to be fixed! if self._time_to_quit(): + # Write self.memory_details to a file + if self.log_memory_usage: + self.logger.info("Writing memory details to file...") + with open(AGG_MEM_FILE_NAME, "w") as f: + json.dump(self.memory_details, f, indent=4) self.logger.info("Experiment Completed. Cleaning up...") else: self.logger.info("Starting round %s...", self.round_number) diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index ce44966a71..dc9c5d5a90 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -3,7 +3,8 @@ """Collaborator module.""" - +import psutil +import json from enum import Enum from logging import getLogger from time import sleep @@ -80,6 +81,7 @@ def __init__( delta_updates=False, compression_pipeline=None, db_store_rounds=1, + log_memory_usage=False, **kwargs, ): """Initialize the Collaborator object. @@ -123,7 +125,7 @@ def __init__( self.delta_updates = delta_updates self.client = client - + self.log_memory_usage = log_memory_usage self.task_config = task_config self.logger = getLogger(__name__) @@ -158,6 +160,7 @@ def set_available_devices(self, cuda: Tuple[str] = ()): def run(self): """Run the collaborator.""" + memory_details = [] while True: tasks, round_number, sleep_time, time_to_quit = self.get_tasks() if time_to_quit: @@ -171,6 +174,24 @@ def run(self): # Cleaning tensor db self.tensor_db.clean_up(self.db_store_rounds) + if self.log_memory_usage: + # This is the place to check the memory usage of the collaborator + self.logger.info("*****************COLLABORATOR LOGS*******************************") + process = psutil.Process() + self.logger.info(process) + process_mem = round(process.memory_info().rss / (1024 ** 2),2) + self.logger.info("Collaborator Round: %s", round_number) + self.logger.info("Collaborator Process Mem: %s", process_mem) + self.logger.info("******************************************************************") + + # NAD:This prints the data correctly : Get the Mem usage info here + memory_detail = self.get_memory_usage(round_number, + metric_origin=self.collaborator_name) + memory_details.append(memory_detail) + if self.log_memory_usage: + # Write json file with memory usage details and collabrator name + with open(f"{self.collaborator_name}_mem_details.json", "w") as f: + json.dump(memory_details, f, indent=4) self.logger.info("End of Federation reached. Exiting...") @@ -588,3 +609,47 @@ def named_tensor_to_nparray(self, named_tensor): self.tensor_db.cache_tensor({decompressed_tensor_key: decompressed_nparray}) return decompressed_nparray + + def get_memory_usage(self, round_number, metric_origin): + """ + Logs the memory usage statistics for the given round number. + + This method retrieves the current virtual and swap memory usage statistics + using the psutil library, formats them into a dictionary, and logs the + information using the logger. + + Args: + round_number (int): The current round number for which memory usage is being logged. + """ + process = psutil.Process() + self.logger.info(f"{metric_origin} process id is {process}") + virtual_memory = psutil.virtual_memory() + swap_memory = psutil.swap_memory() + memory_usage = { + "round_number": round_number, + "metric_origin": metric_origin, + "process_memory": round(process.memory_info().rss / (1024 ** 2),2), + "virtual_memory": { + "total": round(virtual_memory.total / (1024 ** 2), 2), + "available": round(virtual_memory.available / (1024 ** 2), 2), + "percent": virtual_memory.percent, + "used": round(virtual_memory.used / (1024 ** 2), 2), + "free": round(virtual_memory.free / (1024 ** 2), 2), + "active": round(virtual_memory.active / (1024 ** 2), 2), + "inactive": round(virtual_memory.inactive / (1024 ** 2), 2), + "buffers": round(virtual_memory.buffers / (1024 ** 2), 2), + "cached": round(virtual_memory.cached / (1024 ** 2), 2), + "shared": round(virtual_memory.shared / (1024 ** 2), 2), + }, + "swap_memory": { + "total": round(swap_memory.total / (1024 ** 2), 2), + "used": round(swap_memory.used / (1024 ** 2), 2), + "free": round(swap_memory.free / (1024 ** 2), 2), + "percent": swap_memory.percent, + }, + } + self.logger.info(f"**************** End of round check: {metric_origin} Memory Logs ******************") + self.logger.info("Memory Usage: %s", memory_usage) + self.logger.info("*************************************************************************************") + + return memory_usage diff --git a/tests/end_to_end/conftest.py b/tests/end_to_end/conftest.py index d2c9c20f89..85d7db4d58 100644 --- a/tests/end_to_end/conftest.py +++ b/tests/end_to_end/conftest.py @@ -17,7 +17,7 @@ # Define a named tuple to store the objects for model owner, aggregator, and collaborators federation_fixture = collections.namedtuple( "federation_fixture", - "model_owner, aggregator, collaborators, model_name, disable_client_auth, disable_tls, workspace_path, results_dir", + "model_owner, aggregator, collaborators, model_name, disable_client_auth, disable_tls, workspace_path, results_dir, num_rounds", ) @@ -62,6 +62,11 @@ def pytest_addoption(parser): action="store_true", help="Disable TLS for communication", ) + parser.addoption( + "--log_memory_usage", + action="store_true", + help="Enable memory log in collaborators and aggregator", + ) @pytest.fixture(scope="session", autouse=True) @@ -234,6 +239,7 @@ def fx_federation(request, pytestconfig): num_rounds = args.num_rounds disable_client_auth = args.disable_client_auth disable_tls = args.disable_tls + log_memory_usage = args.log_memory_usage log.info( f"Running federation setup using Task Runner API on single machine with below configurations:\n" @@ -241,7 +247,8 @@ def fx_federation(request, pytestconfig): f"\tNumber of rounds: {num_rounds}\n" f"\tModel name: {model_name}\n" f"\tClient authentication: {not disable_client_auth}\n" - f"\tTLS: {not disable_tls}" + f"\tTLS: {not disable_tls}\n" + f"\tMemory Logs: {log_memory_usage}" ) # Validate the model name and create the workspace name @@ -251,7 +258,7 @@ def fx_federation(request, pytestconfig): workspace_name = f"workspace_{model_name}" # Create model owner object and the workspace for the model - model_owner = participants.ModelOwner(workspace_name, model_name) + model_owner = participants.ModelOwner(workspace_name, model_name, log_memory_usage) try: workspace_path = model_owner.create_workspace(results_dir=results_dir) except Exception as e: @@ -318,4 +325,5 @@ def fx_federation(request, pytestconfig): disable_tls=disable_tls, workspace_path=workspace_path, results_dir=results_dir, + num_rounds=num_rounds, ) diff --git a/tests/end_to_end/models/participants.py b/tests/end_to_end/models/participants.py index 5bde7f39ec..6eb455aaa8 100644 --- a/tests/end_to_end/models/participants.py +++ b/tests/end_to_end/models/participants.py @@ -23,12 +23,13 @@ class ModelOwner: 4. Importing and exporting the workspace etc. """ - def __init__(self, workspace_name, model_name): + def __init__(self, workspace_name, model_name, log_memory_usage): """ Initialize the ModelOwner class Args: workspace_name (str): Workspace name model_name (str): Model name + log_memory_usage (bool): Memory Log flag """ self.workspace_name = workspace_name self.model_name = model_name @@ -38,6 +39,7 @@ def __init__(self, workspace_name, model_name): self.plan_path = None self.num_collaborators = constants.NUM_COLLABORATORS self.rounds_to_train = constants.NUM_ROUNDS + self.log_memory_usage = log_memory_usage def create_workspace(self, results_dir=None): """ @@ -132,6 +134,10 @@ def modify_plan(self, new_rounds=None, num_collaborators=None, disable_client_au data = yaml.load(fp, Loader=yaml.FullLoader) data["aggregator"]["settings"]["rounds_to_train"] = int(self.rounds_to_train) + # Memory Leak related + data["aggregator"]["settings"]["log_memory_usage"] = self.log_memory_usage + data["collaborator"]["settings"]["log_memory_usage"] = self.log_memory_usage + data["data_loader"]["settings"]["collaborator_count"] = int(self.num_collaborators) data["network"]["settings"]["disable_client_auth"] = disable_client_auth data["network"]["settings"]["tls"] = not disable_tls diff --git a/tests/end_to_end/utils/conftest_helper.py b/tests/end_to_end/utils/conftest_helper.py index b8d70fa7ba..35c6986ea8 100644 --- a/tests/end_to_end/utils/conftest_helper.py +++ b/tests/end_to_end/utils/conftest_helper.py @@ -20,6 +20,7 @@ def parse_arguments(): - model_name (str, default="torch_cnn_mnist"): Model name - disable_client_auth (bool): Disable client authentication - disable_tls (bool): Disable TLS for communication + - log_memory_usage (bool): Enable Memory leak logs Raises: SystemExit: If the required arguments are not provided or if any argument parsing error occurs. @@ -32,6 +33,7 @@ def parse_arguments(): parser.add_argument("--model_name", type=str, help="Model name") parser.add_argument("--disable_client_auth", action="store_true", help="Disable client authentication") parser.add_argument("--disable_tls", action="store_true", help="Disable TLS for communication") + parser.add_argument("--log_memory_usage", action="store_true", help="Enable Memory leak logs") args = parser.parse_known_args()[0] return args diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index 1da1c68012..df53787940 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -85,6 +85,7 @@ def verify_federation_run_completion(fed_obj, results): executor.submit( _verify_completion_for_participant, participant, + fed_obj.num_rounds, results[i] ) for i, participant in enumerate(fed_obj.collaborators + [fed_obj.aggregator]) @@ -99,7 +100,7 @@ def verify_federation_run_completion(fed_obj, results): return all(results) -def _verify_completion_for_participant(participant, result_file): +def _verify_completion_for_participant(participant, num_rounds, result_file, time_for_each_round=100): """ Verify the completion of the process for the participant Args: @@ -109,7 +110,7 @@ def _verify_completion_for_participant(participant, result_file): bool: True if successful, else False """ # Wait for the successful output message to appear in the log till timeout - timeout = 900 # in seconds + timeout = 300 + time_for_each_round * num_rounds # in seconds log.info(f"Printing the last line of the log file for {participant.name} to track the progress") with open(result_file, 'r') as file: content = file.read() From 98b8448dd505188d0a2965e6b4b5605c735ed472 Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Tue, 19 Nov 2024 08:40:34 -0800 Subject: [PATCH 02/11] Fix formatting issue Signed-off-by: Chaurasiya, Payal --- openfl/component/aggregator/aggregator.py | 42 ++++++++------- openfl/component/collaborator/collaborator.py | 52 +++++++++++-------- 2 files changed, 55 insertions(+), 39 deletions(-) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 27bf7677ee..ff221fcfe0 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -3,13 +3,14 @@ """Aggregator module.""" +import json import queue import time -import psutil -import json from logging import getLogger from threading import Lock +import psutil + from openfl.component.straggler_handling_functions import CutoffTimeBasedStragglerHandling from openfl.databases import TensorDB from openfl.interface.aggregation_functions import WeightedAverage @@ -20,6 +21,7 @@ AGG_MEM_FILE_NAME = "agg_mem_details.json" + class Aggregator: """An Aggregator is the central node in federated learning. @@ -689,29 +691,33 @@ def get_memory_usage(self, round_number, metric_origin): memory_usage = { "round_number": round_number, "metric_origin": metric_origin, - "process_memory": round(process.memory_info().rss / (1024 ** 2),2), + "process_memory": round(process.memory_info().rss / (1024**2), 2), "virtual_memory": { - "total": round(virtual_memory.total / (1024 ** 2), 2), - "available": round(virtual_memory.available / (1024 ** 2), 2), + "total": round(virtual_memory.total / (1024**2), 2), + "available": round(virtual_memory.available / (1024**2), 2), "percent": virtual_memory.percent, - "used": round(virtual_memory.used / (1024 ** 2), 2), - "free": round(virtual_memory.free / (1024 ** 2), 2), - "active": round(virtual_memory.active / (1024 ** 2), 2), - "inactive": round(virtual_memory.inactive / (1024 ** 2), 2), - "buffers": round(virtual_memory.buffers / (1024 ** 2), 2), - "cached": round(virtual_memory.cached / (1024 ** 2), 2), - "shared": round(virtual_memory.shared / (1024 ** 2), 2), + "used": round(virtual_memory.used / (1024**2), 2), + "free": round(virtual_memory.free / (1024**2), 2), + "active": round(virtual_memory.active / (1024**2), 2), + "inactive": round(virtual_memory.inactive / (1024**2), 2), + "buffers": round(virtual_memory.buffers / (1024**2), 2), + "cached": round(virtual_memory.cached / (1024**2), 2), + "shared": round(virtual_memory.shared / (1024**2), 2), }, "swap_memory": { - "total": round(swap_memory.total / (1024 ** 2), 2), - "used": round(swap_memory.used / (1024 ** 2), 2), - "free": round(swap_memory.free / (1024 ** 2), 2), + "total": round(swap_memory.total / (1024**2), 2), + "used": round(swap_memory.used / (1024**2), 2), + "free": round(swap_memory.free / (1024**2), 2), "percent": swap_memory.percent, }, } - self.logger.info(f"**************** End of round check: {metric_origin} Memory Logs ******************") + self.logger.info( + f"**************** End of round check: {metric_origin} Memory Logs ******************" + ) self.logger.info("Memory Usage: %s", memory_usage) - self.logger.info("*************************************************************************************") + self.logger.info( + "*************************************************************************************" + ) return memory_usage @@ -900,7 +906,7 @@ def _prepare_trained(self, tensor_name, origin, round_number, report, agg_result new_model_round_number, new_model_report, new_model_tags, - ) = new_model_tk + ) = new_model_tk final_model_tk = TensorKey( new_model_tensor_name, new_model_origin, diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index dc9c5d5a90..6840e8054e 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -3,13 +3,14 @@ """Collaborator module.""" -import psutil import json from enum import Enum from logging import getLogger from time import sleep from typing import Tuple +import psutil + from openfl.databases import TensorDB from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import utils @@ -176,17 +177,22 @@ def run(self): self.tensor_db.clean_up(self.db_store_rounds) if self.log_memory_usage: # This is the place to check the memory usage of the collaborator - self.logger.info("*****************COLLABORATOR LOGS*******************************") + self.logger.info( + "*****************COLLABORATOR LOGS*******************************" + ) process = psutil.Process() self.logger.info(process) - process_mem = round(process.memory_info().rss / (1024 ** 2),2) + process_mem = round(process.memory_info().rss / (1024**2), 2) self.logger.info("Collaborator Round: %s", round_number) self.logger.info("Collaborator Process Mem: %s", process_mem) - self.logger.info("******************************************************************") + self.logger.info( + "******************************************************************" + ) # NAD:This prints the data correctly : Get the Mem usage info here - memory_detail = self.get_memory_usage(round_number, - metric_origin=self.collaborator_name) + memory_detail = self.get_memory_usage( + round_number, metric_origin=self.collaborator_name + ) memory_details.append(memory_detail) if self.log_memory_usage: # Write json file with memory usage details and collabrator name @@ -628,28 +634,32 @@ def get_memory_usage(self, round_number, metric_origin): memory_usage = { "round_number": round_number, "metric_origin": metric_origin, - "process_memory": round(process.memory_info().rss / (1024 ** 2),2), + "process_memory": round(process.memory_info().rss / (1024**2), 2), "virtual_memory": { - "total": round(virtual_memory.total / (1024 ** 2), 2), - "available": round(virtual_memory.available / (1024 ** 2), 2), + "total": round(virtual_memory.total / (1024**2), 2), + "available": round(virtual_memory.available / (1024**2), 2), "percent": virtual_memory.percent, - "used": round(virtual_memory.used / (1024 ** 2), 2), - "free": round(virtual_memory.free / (1024 ** 2), 2), - "active": round(virtual_memory.active / (1024 ** 2), 2), - "inactive": round(virtual_memory.inactive / (1024 ** 2), 2), - "buffers": round(virtual_memory.buffers / (1024 ** 2), 2), - "cached": round(virtual_memory.cached / (1024 ** 2), 2), - "shared": round(virtual_memory.shared / (1024 ** 2), 2), + "used": round(virtual_memory.used / (1024**2), 2), + "free": round(virtual_memory.free / (1024**2), 2), + "active": round(virtual_memory.active / (1024**2), 2), + "inactive": round(virtual_memory.inactive / (1024**2), 2), + "buffers": round(virtual_memory.buffers / (1024**2), 2), + "cached": round(virtual_memory.cached / (1024**2), 2), + "shared": round(virtual_memory.shared / (1024**2), 2), }, "swap_memory": { - "total": round(swap_memory.total / (1024 ** 2), 2), - "used": round(swap_memory.used / (1024 ** 2), 2), - "free": round(swap_memory.free / (1024 ** 2), 2), + "total": round(swap_memory.total / (1024**2), 2), + "used": round(swap_memory.used / (1024**2), 2), + "free": round(swap_memory.free / (1024**2), 2), "percent": swap_memory.percent, }, } - self.logger.info(f"**************** End of round check: {metric_origin} Memory Logs ******************") + self.logger.info( + f"**************** End of round check: {metric_origin} Memory Logs ******************" + ) self.logger.info("Memory Usage: %s", memory_usage) - self.logger.info("*************************************************************************************") + self.logger.info( + "*************************************************************************************" + ) return memory_usage From 411f5d0f31a194aa20b58864e0f7434d6424df6c Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Tue, 19 Nov 2024 08:44:49 -0800 Subject: [PATCH 03/11] Fix formatting issue Signed-off-by: Chaurasiya, Payal --- openfl/component/collaborator/collaborator.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index 6840e8054e..2b851ba7c5 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -177,19 +177,6 @@ def run(self): self.tensor_db.clean_up(self.db_store_rounds) if self.log_memory_usage: # This is the place to check the memory usage of the collaborator - self.logger.info( - "*****************COLLABORATOR LOGS*******************************" - ) - process = psutil.Process() - self.logger.info(process) - process_mem = round(process.memory_info().rss / (1024**2), 2) - self.logger.info("Collaborator Round: %s", round_number) - self.logger.info("Collaborator Process Mem: %s", process_mem) - self.logger.info( - "******************************************************************" - ) - - # NAD:This prints the data correctly : Get the Mem usage info here memory_detail = self.get_memory_usage( round_number, metric_origin=self.collaborator_name ) From 624253a599ef4e76d2e663feb72dc62b6927cbd7 Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Tue, 19 Nov 2024 20:58:06 -0800 Subject: [PATCH 04/11] Put memory under flag Signed-off-by: Chaurasiya, Payal --- openfl/component/aggregator/aggregator.py | 9 ++++++--- openfl/component/collaborator/collaborator.py | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index ff221fcfe0..c36852c3a9 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -128,7 +128,7 @@ def __init__( ) self._end_of_round_check_done = [False] * rounds_to_train self.stragglers = [] - self.log_memory_usage = log_memory_usage + self.log_memory_usage = log_memory_usage # Flag can be enabled to get memory usage details for ubuntu system self.memory_details = [] self.rounds_to_train = rounds_to_train @@ -1019,8 +1019,11 @@ def _end_of_round_check(self): all_tasks = self.assigner.get_all_tasks_for_round(self.round_number) for task_name in all_tasks: self._compute_validation_related_task_metrics(task_name) - memory_detail = self.get_memory_usage(self.round_number, "aggregator") - self.memory_details.append(memory_detail) + + if self.log_memory_usage: + # This is the place to check the memory usage of the aggregator + memory_detail = self.get_memory_usage(self.round_number, "aggregator") + self.memory_details.append(memory_detail) # Once all of the task results have been processed self._end_of_round_check_done[self.round_number] = True diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index 2b851ba7c5..e0925bc063 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -126,7 +126,7 @@ def __init__( self.delta_updates = delta_updates self.client = client - self.log_memory_usage = log_memory_usage + self.log_memory_usage = log_memory_usage # Flag can be enabled to get memory usage details for ubuntu system self.task_config = task_config self.logger = getLogger(__name__) From b6011411f22d3b4cbf027c24f2ce8feeee36b583 Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Tue, 19 Nov 2024 21:06:32 -0800 Subject: [PATCH 05/11] Put memory under flag Signed-off-by: Chaurasiya, Payal --- openfl/component/aggregator/aggregator.py | 4 +++- openfl/component/collaborator/collaborator.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index c36852c3a9..b202b4bcc5 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -128,7 +128,9 @@ def __init__( ) self._end_of_round_check_done = [False] * rounds_to_train self.stragglers = [] - self.log_memory_usage = log_memory_usage # Flag can be enabled to get memory usage details for ubuntu system + self.log_memory_usage = ( + log_memory_usage # Flag can be enabled to get memory usage details for ubuntu system + ) self.memory_details = [] self.rounds_to_train = rounds_to_train diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index e0925bc063..d0bd232cae 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -126,7 +126,9 @@ def __init__( self.delta_updates = delta_updates self.client = client - self.log_memory_usage = log_memory_usage # Flag can be enabled to get memory usage details for ubuntu system + self.log_memory_usage = ( + log_memory_usage # Flag can be enabled to get memory usage details for ubuntu system + ) self.task_config = task_config self.logger = getLogger(__name__) From 32996a80638619034b46e1263deebc5e7742e351 Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Tue, 19 Nov 2024 21:12:15 -0800 Subject: [PATCH 06/11] Formt timeout formulae Signed-off-by: Chaurasiya, Payal --- tests/end_to_end/utils/federation_helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index df53787940..f25c56edb6 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -110,7 +110,7 @@ def _verify_completion_for_participant(participant, num_rounds, result_file, tim bool: True if successful, else False """ # Wait for the successful output message to appear in the log till timeout - timeout = 300 + time_for_each_round * num_rounds # in seconds + timeout = 300 + ( time_for_each_round * num_rounds ) # in seconds log.info(f"Printing the last line of the log file for {participant.name} to track the progress") with open(result_file, 'r') as file: content = file.read() From eab5ce9ec56ec633e794ea590b49a9ca2039bbd2 Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Tue, 19 Nov 2024 23:51:07 -0800 Subject: [PATCH 07/11] Move memory function to utilities Signed-off-by: Chaurasiya, Payal --- openfl/component/aggregator/aggregator.py | 58 ++---------------- openfl/component/collaborator/collaborator.py | 60 ++----------------- openfl/utilities/logs.py | 49 +++++++++++++++ 3 files changed, 58 insertions(+), 109 deletions(-) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index b202b4bcc5..fd46751f34 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -9,15 +9,13 @@ from logging import getLogger from threading import Lock -import psutil - from openfl.component.straggler_handling_functions import CutoffTimeBasedStragglerHandling from openfl.databases import TensorDB from openfl.interface.aggregation_functions import WeightedAverage from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import base_pb2, utils from openfl.utilities import TaskResultKey, TensorKey, change_tags -from openfl.utilities.logs import write_metric +from openfl.utilities.logs import get_memory_usage, write_metric AGG_MEM_FILE_NAME = "agg_mem_details.json" @@ -128,9 +126,8 @@ def __init__( ) self._end_of_round_check_done = [False] * rounds_to_train self.stragglers = [] - self.log_memory_usage = ( - log_memory_usage # Flag can be enabled to get memory usage details for ubuntu system - ) + # Flag can be enabled to get memory usage details for ubuntu system + self.log_memory_usage = log_memory_usage self.memory_details = [] self.rounds_to_train = rounds_to_train @@ -676,53 +673,6 @@ def send_local_task_results( self._end_of_round_with_stragglers_check() - def get_memory_usage(self, round_number, metric_origin): - """Logs the memory usage statistics for the given round number. - - This method retrieves the current virtual and swap memory usage statistics - using the psutil library, formats them into a dictionary, and logs the - information using the logger. - - Args: - round_number (int): The current round number for which memory usage is being logged. - """ - process = psutil.Process() - self.logger.info(f"{metric_origin} process id is {process}") - virtual_memory = psutil.virtual_memory() - swap_memory = psutil.swap_memory() - memory_usage = { - "round_number": round_number, - "metric_origin": metric_origin, - "process_memory": round(process.memory_info().rss / (1024**2), 2), - "virtual_memory": { - "total": round(virtual_memory.total / (1024**2), 2), - "available": round(virtual_memory.available / (1024**2), 2), - "percent": virtual_memory.percent, - "used": round(virtual_memory.used / (1024**2), 2), - "free": round(virtual_memory.free / (1024**2), 2), - "active": round(virtual_memory.active / (1024**2), 2), - "inactive": round(virtual_memory.inactive / (1024**2), 2), - "buffers": round(virtual_memory.buffers / (1024**2), 2), - "cached": round(virtual_memory.cached / (1024**2), 2), - "shared": round(virtual_memory.shared / (1024**2), 2), - }, - "swap_memory": { - "total": round(swap_memory.total / (1024**2), 2), - "used": round(swap_memory.used / (1024**2), 2), - "free": round(swap_memory.free / (1024**2), 2), - "percent": swap_memory.percent, - }, - } - self.logger.info( - f"**************** End of round check: {metric_origin} Memory Logs ******************" - ) - self.logger.info("Memory Usage: %s", memory_usage) - self.logger.info( - "*************************************************************************************" - ) - - return memory_usage - def _end_of_round_with_stragglers_check(self): """ Checks if the minimum required collaborators have reported their results, @@ -1024,7 +974,7 @@ def _end_of_round_check(self): if self.log_memory_usage: # This is the place to check the memory usage of the aggregator - memory_detail = self.get_memory_usage(self.round_number, "aggregator") + memory_detail = get_memory_usage(self.logger, self.round_number, "aggregator") self.memory_details.append(memory_detail) # Once all of the task results have been processed diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index d0bd232cae..9c5bdc72bb 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -9,12 +9,11 @@ from time import sleep from typing import Tuple -import psutil - from openfl.databases import TensorDB from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import utils from openfl.utilities import TensorKey +from openfl.utilities.logs import get_memory_usage class DevicePolicy(Enum): @@ -126,9 +125,8 @@ def __init__( self.delta_updates = delta_updates self.client = client - self.log_memory_usage = ( - log_memory_usage # Flag can be enabled to get memory usage details for ubuntu system - ) + # Flag can be enabled to get memory usage details for ubuntu system + self.log_memory_usage = log_memory_usage self.task_config = task_config self.logger = getLogger(__name__) @@ -179,8 +177,8 @@ def run(self): self.tensor_db.clean_up(self.db_store_rounds) if self.log_memory_usage: # This is the place to check the memory usage of the collaborator - memory_detail = self.get_memory_usage( - round_number, metric_origin=self.collaborator_name + memory_detail = get_memory_usage( + self.logger, round_number, metric_origin=self.collaborator_name ) memory_details.append(memory_detail) if self.log_memory_usage: @@ -604,51 +602,3 @@ def named_tensor_to_nparray(self, named_tensor): self.tensor_db.cache_tensor({decompressed_tensor_key: decompressed_nparray}) return decompressed_nparray - - def get_memory_usage(self, round_number, metric_origin): - """ - Logs the memory usage statistics for the given round number. - - This method retrieves the current virtual and swap memory usage statistics - using the psutil library, formats them into a dictionary, and logs the - information using the logger. - - Args: - round_number (int): The current round number for which memory usage is being logged. - """ - process = psutil.Process() - self.logger.info(f"{metric_origin} process id is {process}") - virtual_memory = psutil.virtual_memory() - swap_memory = psutil.swap_memory() - memory_usage = { - "round_number": round_number, - "metric_origin": metric_origin, - "process_memory": round(process.memory_info().rss / (1024**2), 2), - "virtual_memory": { - "total": round(virtual_memory.total / (1024**2), 2), - "available": round(virtual_memory.available / (1024**2), 2), - "percent": virtual_memory.percent, - "used": round(virtual_memory.used / (1024**2), 2), - "free": round(virtual_memory.free / (1024**2), 2), - "active": round(virtual_memory.active / (1024**2), 2), - "inactive": round(virtual_memory.inactive / (1024**2), 2), - "buffers": round(virtual_memory.buffers / (1024**2), 2), - "cached": round(virtual_memory.cached / (1024**2), 2), - "shared": round(virtual_memory.shared / (1024**2), 2), - }, - "swap_memory": { - "total": round(swap_memory.total / (1024**2), 2), - "used": round(swap_memory.used / (1024**2), 2), - "free": round(swap_memory.free / (1024**2), 2), - "percent": swap_memory.percent, - }, - } - self.logger.info( - f"**************** End of round check: {metric_origin} Memory Logs ******************" - ) - self.logger.info("Memory Usage: %s", memory_usage) - self.logger.info( - "*************************************************************************************" - ) - - return memory_usage diff --git a/openfl/utilities/logs.py b/openfl/utilities/logs.py index 1ba35f2867..6235560a74 100644 --- a/openfl/utilities/logs.py +++ b/openfl/utilities/logs.py @@ -6,6 +6,7 @@ import logging +import psutil from rich.console import Console from rich.logging import RichHandler from tensorboardX import SummaryWriter @@ -57,3 +58,51 @@ def setup_loggers(log_level=logging.INFO): formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] - %(message)s") handler.setFormatter(formatter) root.addHandler(handler) + + +def get_memory_usage(logger, round_number, metric_origin): + """Logs the memory usage statistics for the given round number. + + This method retrieves the current virtual and swap memory usage statistics + using the psutil library, formats them into a dictionary, and logs the + information using the logger. + + Args: + round_number (int): The current round number for which memory usage is being logged. + """ + process = psutil.Process() + logger.info(f"{metric_origin} process id is {process}") + virtual_memory = psutil.virtual_memory() + swap_memory = psutil.swap_memory() + memory_usage = { + "round_number": round_number, + "metric_origin": metric_origin, + "process_memory": round(process.memory_info().rss / (1024**2), 2), + "virtual_memory": { + "total": round(virtual_memory.total / (1024**2), 2), + "available": round(virtual_memory.available / (1024**2), 2), + "percent": virtual_memory.percent, + "used": round(virtual_memory.used / (1024**2), 2), + "free": round(virtual_memory.free / (1024**2), 2), + "active": round(virtual_memory.active / (1024**2), 2), + "inactive": round(virtual_memory.inactive / (1024**2), 2), + "buffers": round(virtual_memory.buffers / (1024**2), 2), + "cached": round(virtual_memory.cached / (1024**2), 2), + "shared": round(virtual_memory.shared / (1024**2), 2), + }, + "swap_memory": { + "total": round(swap_memory.total / (1024**2), 2), + "used": round(swap_memory.used / (1024**2), 2), + "free": round(swap_memory.free / (1024**2), 2), + "percent": swap_memory.percent, + }, + } + logger.info( + f"**************** End of round check: {metric_origin} Memory Logs ******************" + ) + logger.info("Memory Usage: %s", memory_usage) + logger.info( + "*************************************************************************************" + ) + + return memory_usage From 7438449a1501bfa47d8865d140ae0fbbcd1bfd32 Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Wed, 20 Nov 2024 21:08:29 -0800 Subject: [PATCH 08/11] Remove json and keep details in logs Signed-off-by: Chaurasiya, Payal --- openfl/component/aggregator/aggregator.py | 8 +------- openfl/component/collaborator/collaborator.py | 5 +---- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 6202419ab4..4d45c110cb 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -3,7 +3,6 @@ """Aggregator module.""" -import json import queue import time from logging import getLogger @@ -17,8 +16,6 @@ from openfl.utilities import TaskResultKey, TensorKey, change_tags from openfl.utilities.logs import get_memory_usage, write_metric -AGG_MEM_FILE_NAME = "agg_mem_details.json" - class Aggregator: """An Aggregator is the central node in federated learning. @@ -995,11 +992,8 @@ def _end_of_round_check(self): # TODO This needs to be fixed! if self._time_to_quit(): - # Write self.memory_details to a file if self.log_memory_usage: - self.logger.info("Writing memory details to file...") - with open(AGG_MEM_FILE_NAME, "w") as f: - json.dump(self.memory_details, f, indent=4) + self.logger.info(f"Publish memory usage: {self.memory_details}") self.logger.info("Experiment Completed. Cleaning up...") else: self.logger.info("Starting round %s...", self.round_number) diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index 9c5bdc72bb..0caba31092 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -3,7 +3,6 @@ """Collaborator module.""" -import json from enum import Enum from logging import getLogger from time import sleep @@ -182,9 +181,7 @@ def run(self): ) memory_details.append(memory_detail) if self.log_memory_usage: - # Write json file with memory usage details and collabrator name - with open(f"{self.collaborator_name}_mem_details.json", "w") as f: - json.dump(memory_details, f, indent=4) + self.logger.info(f"Publish memory usage: {self.memory_details}") self.logger.info("End of Federation reached. Exiting...") From 4c1eeee7716c40da6992ba7dc6c1ac6926121830 Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Thu, 21 Nov 2024 01:02:40 -0800 Subject: [PATCH 09/11] Small fix Signed-off-by: Chaurasiya, Payal --- openfl/component/collaborator/collaborator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index 0caba31092..a22c8813ba 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -181,7 +181,7 @@ def run(self): ) memory_details.append(memory_detail) if self.log_memory_usage: - self.logger.info(f"Publish memory usage: {self.memory_details}") + self.logger.info(f"Publish memory usage: {memory_details}") self.logger.info("End of Federation reached. Exiting...") From 87946fc7453982023df5862c19fc70e88b6347c9 Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Thu, 21 Nov 2024 02:28:31 -0800 Subject: [PATCH 10/11] Remove prints post each round Signed-off-by: Chaurasiya, Payal --- openfl/component/aggregator/aggregator.py | 3 ++- openfl/component/collaborator/collaborator.py | 5 ++-- openfl/utilities/logs.py | 26 +++++-------------- 3 files changed, 10 insertions(+), 24 deletions(-) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 4d45c110cb..a00c7d69d7 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -974,7 +974,8 @@ def _end_of_round_check(self): if self.log_memory_usage: # This is the place to check the memory usage of the aggregator - memory_detail = get_memory_usage(self.logger, self.round_number, "aggregator") + memory_detail = get_memory_usage() + memory_detail["round_number"] = self.round_number self.memory_details.append(memory_detail) # Once all of the task results have been processed diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index a22c8813ba..35daf99fe1 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -176,9 +176,8 @@ def run(self): self.tensor_db.clean_up(self.db_store_rounds) if self.log_memory_usage: # This is the place to check the memory usage of the collaborator - memory_detail = get_memory_usage( - self.logger, round_number, metric_origin=self.collaborator_name - ) + memory_detail = get_memory_usage() + memory_detail["round_number"] = round_number memory_details.append(memory_detail) if self.log_memory_usage: self.logger.info(f"Publish memory usage: {memory_details}") diff --git a/openfl/utilities/logs.py b/openfl/utilities/logs.py index 6235560a74..a17f0742fc 100644 --- a/openfl/utilities/logs.py +++ b/openfl/utilities/logs.py @@ -5,6 +5,7 @@ """Logs utilities.""" import logging +import os import psutil from rich.console import Console @@ -60,23 +61,16 @@ def setup_loggers(log_level=logging.INFO): root.addHandler(handler) -def get_memory_usage(logger, round_number, metric_origin): - """Logs the memory usage statistics for the given round number. +def get_memory_usage() -> dict: + """Return memory usage details of the current process. - This method retrieves the current virtual and swap memory usage statistics - using the psutil library, formats them into a dictionary, and logs the - information using the logger. - - Args: - round_number (int): The current round number for which memory usage is being logged. + Returns: + dict: A dictionary containing memory usage details. """ - process = psutil.Process() - logger.info(f"{metric_origin} process id is {process}") + process = psutil.Process(os.getpid()) virtual_memory = psutil.virtual_memory() swap_memory = psutil.swap_memory() memory_usage = { - "round_number": round_number, - "metric_origin": metric_origin, "process_memory": round(process.memory_info().rss / (1024**2), 2), "virtual_memory": { "total": round(virtual_memory.total / (1024**2), 2), @@ -97,12 +91,4 @@ def get_memory_usage(logger, round_number, metric_origin): "percent": swap_memory.percent, }, } - logger.info( - f"**************** End of round check: {metric_origin} Memory Logs ******************" - ) - logger.info("Memory Usage: %s", memory_usage) - logger.info( - "*************************************************************************************" - ) - return memory_usage From 4d42e481e627a2cef894af30d1bc1f8387351a08 Mon Sep 17 00:00:00 2001 From: "Chaurasiya, Payal" Date: Thu, 21 Nov 2024 03:02:06 -0800 Subject: [PATCH 11/11] Added metric origin to output Signed-off-by: Chaurasiya, Payal --- openfl/component/aggregator/aggregator.py | 1 + openfl/component/collaborator/collaborator.py | 1 + 2 files changed, 2 insertions(+) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index a00c7d69d7..805193cf77 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -976,6 +976,7 @@ def _end_of_round_check(self): # This is the place to check the memory usage of the aggregator memory_detail = get_memory_usage() memory_detail["round_number"] = self.round_number + memory_detail["metric_origin"] = "aggregator" self.memory_details.append(memory_detail) # Once all of the task results have been processed diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index 35daf99fe1..3418afab73 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -178,6 +178,7 @@ def run(self): # This is the place to check the memory usage of the collaborator memory_detail = get_memory_usage() memory_detail["round_number"] = round_number + memory_details["metric_origin"] = self.collaborator_name memory_details.append(memory_detail) if self.log_memory_usage: self.logger.info(f"Publish memory usage: {memory_details}")