diff --git a/.github/workflows/task_runner_e2e.yml b/.github/workflows/task_runner_e2e.yml index 98ffde8433..a7e01c579f 100644 --- a/.github/workflows/task_runner_e2e.yml +++ b/.github/workflows/task_runner_e2e.yml @@ -73,7 +73,7 @@ jobs: run: | python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ -m ${{ env.MODEL_NAME }} --model_name ${{ env.MODEL_NAME }} \ - --num_rounds $NUM_ROUNDS --num_collaborators $NUM_COLLABORATORS + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} echo "Task runner end to end test run completed" - name: Print test summary @@ -140,7 +140,7 @@ jobs: run: | python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ -m ${{ env.MODEL_NAME }} --model_name ${{ env.MODEL_NAME }} \ - --num_rounds $NUM_ROUNDS --num_collaborators $NUM_COLLABORATORS --disable_tls + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls echo "Task runner end to end test run completed" - name: Print test summary diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index e1c61f8ce3..805193cf77 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -14,7 +14,7 @@ from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import base_pb2, utils from openfl.utilities import TaskResultKey, TensorKey, change_tags -from openfl.utilities.logs import write_metric +from openfl.utilities.logs import get_memory_usage, write_metric class Aggregator: @@ -76,6 +76,7 @@ def __init__( compression_pipeline=None, db_store_rounds=1, write_logs=False, + log_memory_usage=False, log_metric_callback=None, **kwargs, ): @@ -123,7 +124,9 @@ def __init__( ) self._end_of_round_check_done = [False] * rounds_to_train self.stragglers = [] - + # Flag can be enabled to get memory usage details for ubuntu system + self.log_memory_usage = log_memory_usage + self.memory_details = [] self.rounds_to_train = rounds_to_train # if the collaborator requests a delta, this value is set to true @@ -969,6 +972,13 @@ def _end_of_round_check(self): for task_name in all_tasks: self._compute_validation_related_task_metrics(task_name) + if self.log_memory_usage: + # This is the place to check the memory usage of the aggregator + memory_detail = get_memory_usage() + memory_detail["round_number"] = self.round_number + memory_detail["metric_origin"] = "aggregator" + self.memory_details.append(memory_detail) + # Once all of the task results have been processed self._end_of_round_check_done[self.round_number] = True @@ -984,6 +994,8 @@ def _end_of_round_check(self): # TODO This needs to be fixed! if self._time_to_quit(): + if self.log_memory_usage: + self.logger.info(f"Publish memory usage: {self.memory_details}") self.logger.info("Experiment Completed. Cleaning up...") else: self.logger.info("Starting round %s...", self.round_number) diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index ce44966a71..3418afab73 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -3,7 +3,6 @@ """Collaborator module.""" - from enum import Enum from logging import getLogger from time import sleep @@ -13,6 +12,7 @@ from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import utils from openfl.utilities import TensorKey +from openfl.utilities.logs import get_memory_usage class DevicePolicy(Enum): @@ -80,6 +80,7 @@ def __init__( delta_updates=False, compression_pipeline=None, db_store_rounds=1, + log_memory_usage=False, **kwargs, ): """Initialize the Collaborator object. @@ -123,7 +124,8 @@ def __init__( self.delta_updates = delta_updates self.client = client - + # Flag can be enabled to get memory usage details for ubuntu system + self.log_memory_usage = log_memory_usage self.task_config = task_config self.logger = getLogger(__name__) @@ -158,6 +160,7 @@ def set_available_devices(self, cuda: Tuple[str] = ()): def run(self): """Run the collaborator.""" + memory_details = [] while True: tasks, round_number, sleep_time, time_to_quit = self.get_tasks() if time_to_quit: @@ -171,6 +174,14 @@ def run(self): # Cleaning tensor db self.tensor_db.clean_up(self.db_store_rounds) + if self.log_memory_usage: + # This is the place to check the memory usage of the collaborator + memory_detail = get_memory_usage() + memory_detail["round_number"] = round_number + memory_details["metric_origin"] = self.collaborator_name + memory_details.append(memory_detail) + if self.log_memory_usage: + self.logger.info(f"Publish memory usage: {memory_details}") self.logger.info("End of Federation reached. Exiting...") diff --git a/openfl/utilities/logs.py b/openfl/utilities/logs.py index 1ba35f2867..a17f0742fc 100644 --- a/openfl/utilities/logs.py +++ b/openfl/utilities/logs.py @@ -5,7 +5,9 @@ """Logs utilities.""" import logging +import os +import psutil from rich.console import Console from rich.logging import RichHandler from tensorboardX import SummaryWriter @@ -57,3 +59,36 @@ def setup_loggers(log_level=logging.INFO): formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] - %(message)s") handler.setFormatter(formatter) root.addHandler(handler) + + +def get_memory_usage() -> dict: + """Return memory usage details of the current process. + + Returns: + dict: A dictionary containing memory usage details. + """ + process = psutil.Process(os.getpid()) + virtual_memory = psutil.virtual_memory() + swap_memory = psutil.swap_memory() + memory_usage = { + "process_memory": round(process.memory_info().rss / (1024**2), 2), + "virtual_memory": { + "total": round(virtual_memory.total / (1024**2), 2), + "available": round(virtual_memory.available / (1024**2), 2), + "percent": virtual_memory.percent, + "used": round(virtual_memory.used / (1024**2), 2), + "free": round(virtual_memory.free / (1024**2), 2), + "active": round(virtual_memory.active / (1024**2), 2), + "inactive": round(virtual_memory.inactive / (1024**2), 2), + "buffers": round(virtual_memory.buffers / (1024**2), 2), + "cached": round(virtual_memory.cached / (1024**2), 2), + "shared": round(virtual_memory.shared / (1024**2), 2), + }, + "swap_memory": { + "total": round(swap_memory.total / (1024**2), 2), + "used": round(swap_memory.used / (1024**2), 2), + "free": round(swap_memory.free / (1024**2), 2), + "percent": swap_memory.percent, + }, + } + return memory_usage diff --git a/setup.py b/setup.py index 947285c108..9f87bf694c 100644 --- a/setup.py +++ b/setup.py @@ -152,7 +152,7 @@ def run(self): 'docker', 'dynaconf==3.2.6', 'flatten_json', - 'grpcio>=1.56.2', + 'grpcio>=1.56.2,<1.66.0', 'ipykernel', 'jupyterlab', 'numpy', diff --git a/tests/end_to_end/conftest.py b/tests/end_to_end/conftest.py index d2c9c20f89..85d7db4d58 100644 --- a/tests/end_to_end/conftest.py +++ b/tests/end_to_end/conftest.py @@ -17,7 +17,7 @@ # Define a named tuple to store the objects for model owner, aggregator, and collaborators federation_fixture = collections.namedtuple( "federation_fixture", - "model_owner, aggregator, collaborators, model_name, disable_client_auth, disable_tls, workspace_path, results_dir", + "model_owner, aggregator, collaborators, model_name, disable_client_auth, disable_tls, workspace_path, results_dir, num_rounds", ) @@ -62,6 +62,11 @@ def pytest_addoption(parser): action="store_true", help="Disable TLS for communication", ) + parser.addoption( + "--log_memory_usage", + action="store_true", + help="Enable memory log in collaborators and aggregator", + ) @pytest.fixture(scope="session", autouse=True) @@ -234,6 +239,7 @@ def fx_federation(request, pytestconfig): num_rounds = args.num_rounds disable_client_auth = args.disable_client_auth disable_tls = args.disable_tls + log_memory_usage = args.log_memory_usage log.info( f"Running federation setup using Task Runner API on single machine with below configurations:\n" @@ -241,7 +247,8 @@ def fx_federation(request, pytestconfig): f"\tNumber of rounds: {num_rounds}\n" f"\tModel name: {model_name}\n" f"\tClient authentication: {not disable_client_auth}\n" - f"\tTLS: {not disable_tls}" + f"\tTLS: {not disable_tls}\n" + f"\tMemory Logs: {log_memory_usage}" ) # Validate the model name and create the workspace name @@ -251,7 +258,7 @@ def fx_federation(request, pytestconfig): workspace_name = f"workspace_{model_name}" # Create model owner object and the workspace for the model - model_owner = participants.ModelOwner(workspace_name, model_name) + model_owner = participants.ModelOwner(workspace_name, model_name, log_memory_usage) try: workspace_path = model_owner.create_workspace(results_dir=results_dir) except Exception as e: @@ -318,4 +325,5 @@ def fx_federation(request, pytestconfig): disable_tls=disable_tls, workspace_path=workspace_path, results_dir=results_dir, + num_rounds=num_rounds, ) diff --git a/tests/end_to_end/models/participants.py b/tests/end_to_end/models/participants.py index 5bde7f39ec..6eb455aaa8 100644 --- a/tests/end_to_end/models/participants.py +++ b/tests/end_to_end/models/participants.py @@ -23,12 +23,13 @@ class ModelOwner: 4. Importing and exporting the workspace etc. """ - def __init__(self, workspace_name, model_name): + def __init__(self, workspace_name, model_name, log_memory_usage): """ Initialize the ModelOwner class Args: workspace_name (str): Workspace name model_name (str): Model name + log_memory_usage (bool): Memory Log flag """ self.workspace_name = workspace_name self.model_name = model_name @@ -38,6 +39,7 @@ def __init__(self, workspace_name, model_name): self.plan_path = None self.num_collaborators = constants.NUM_COLLABORATORS self.rounds_to_train = constants.NUM_ROUNDS + self.log_memory_usage = log_memory_usage def create_workspace(self, results_dir=None): """ @@ -132,6 +134,10 @@ def modify_plan(self, new_rounds=None, num_collaborators=None, disable_client_au data = yaml.load(fp, Loader=yaml.FullLoader) data["aggregator"]["settings"]["rounds_to_train"] = int(self.rounds_to_train) + # Memory Leak related + data["aggregator"]["settings"]["log_memory_usage"] = self.log_memory_usage + data["collaborator"]["settings"]["log_memory_usage"] = self.log_memory_usage + data["data_loader"]["settings"]["collaborator_count"] = int(self.num_collaborators) data["network"]["settings"]["disable_client_auth"] = disable_client_auth data["network"]["settings"]["tls"] = not disable_tls diff --git a/tests/end_to_end/utils/conftest_helper.py b/tests/end_to_end/utils/conftest_helper.py index b8d70fa7ba..35c6986ea8 100644 --- a/tests/end_to_end/utils/conftest_helper.py +++ b/tests/end_to_end/utils/conftest_helper.py @@ -20,6 +20,7 @@ def parse_arguments(): - model_name (str, default="torch_cnn_mnist"): Model name - disable_client_auth (bool): Disable client authentication - disable_tls (bool): Disable TLS for communication + - log_memory_usage (bool): Enable Memory leak logs Raises: SystemExit: If the required arguments are not provided or if any argument parsing error occurs. @@ -32,6 +33,7 @@ def parse_arguments(): parser.add_argument("--model_name", type=str, help="Model name") parser.add_argument("--disable_client_auth", action="store_true", help="Disable client authentication") parser.add_argument("--disable_tls", action="store_true", help="Disable TLS for communication") + parser.add_argument("--log_memory_usage", action="store_true", help="Enable Memory leak logs") args = parser.parse_known_args()[0] return args diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index 361e220131..d87667077e 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -85,6 +85,7 @@ def verify_federation_run_completion(fed_obj, results): executor.submit( _verify_completion_for_participant, participant, + fed_obj.num_rounds, results[i] ) for i, participant in enumerate(fed_obj.collaborators + [fed_obj.aggregator]) @@ -99,7 +100,7 @@ def verify_federation_run_completion(fed_obj, results): return all(results) -def _verify_completion_for_participant(participant, result_file): +def _verify_completion_for_participant(participant, num_rounds, result_file, time_for_each_round=100): """ Verify the completion of the process for the participant Args: @@ -109,7 +110,7 @@ def _verify_completion_for_participant(participant, result_file): bool: True if successful, else False """ # Wait for the successful output message to appear in the log till timeout - timeout = 900 # in seconds + timeout = 300 + ( time_for_each_round * num_rounds ) # in seconds log.info(f"Printing the last line of the log file for {participant.name} to track the progress") with open(result_file, 'r') as file: content = file.read() diff --git a/tests/end_to_end/utils/summary_helper.py b/tests/end_to_end/utils/summary_helper.py index 685b94bb6a..599c4b6ec5 100644 --- a/tests/end_to_end/utils/summary_helper.py +++ b/tests/end_to_end/utils/summary_helper.py @@ -5,6 +5,8 @@ from lxml import etree import os +import tests.end_to_end.utils.constants as constants + # Initialize the XML parser parser = etree.XMLParser(recover=True, encoding='utf-8') tree = ET.parse("results/results.xml", parser=parser) @@ -101,29 +103,41 @@ def get_testcase_result(): return database_list -if __name__ == "__main__": +def main(): """ Main function to get the test case results and aggregator logs And write the results to GitHub step summary + IMP: Do not fail the test in any scenario """ result = get_testcase_result() + if not all([os.getenv(var) for var in ["NUM_COLLABORATORS", "NUM_ROUNDS", "MODEL_NAME", "GITHUB_STEP_SUMMARY"]]): + print("One or more environment variables not set. Skipping writing to GitHub step summary") + return + num_cols = os.getenv("NUM_COLLABORATORS") num_rounds = os.getenv("NUM_ROUNDS") model_name = os.getenv("MODEL_NAME") + summary_file = os.getenv("GITHUB_STEP_SUMMARY") - if not model_name: - print("MODEL_NAME is not set, cannot find out aggregator logs") - agg_accuracy = "Not Found" - else: - workspace_name = "workspace_" + model_name - agg_log_file = os.path.join("results", workspace_name, "aggregator.log") - agg_accuracy = get_aggregated_accuracy(agg_log_file) + # Validate the model name and create the workspace name + if not model_name.upper() in constants.ModelName._member_names_: + print(f"Invalid model name: {model_name}. Skipping writing to GitHub step summary") + return - # Write the results to GitHub step summary - with open(os.getenv('GITHUB_STEP_SUMMARY'), 'a') as fh: + workspace_name = "workspace_" + model_name + agg_log_file = os.path.join("results", workspace_name, "aggregator.log") + agg_accuracy = get_aggregated_accuracy(agg_log_file) + + # Write the results to GitHub step summary file + # This file is created at runtime by the GitHub action, thus we cannot verify its existence beforehand + with open(summary_file, 'a') as fh: # DO NOT change the print statements print("| Name | Time (in seconds) | Result | Error (if any) | Collaborators | Rounds to train | Score (if applicable) |", file=fh) print("| ------------- | ------------- | ------------- | ------------- | ------------- | ------------- | ------------- |", file=fh) for item in result: print(f"| {item['name']} | {item['time']} | {item['result']} | {item['err_msg']} | {num_cols} | {num_rounds} | {agg_accuracy} |", file=fh) + + +if __name__ == "__main__": + main() diff --git a/tests/github/test_double_ws_export.py b/tests/github/test_double_ws_export.py index da15a97d9f..95c9440b31 100644 --- a/tests/github/test_double_ws_export.py +++ b/tests/github/test_double_ws_export.py @@ -10,10 +10,11 @@ from concurrent.futures import ProcessPoolExecutor import psutil -from tests.github.utils import create_certified_workspace, certify_aggregator, create_collaborator +from tests.github.utils import create_certified_workspace, certify_aggregator, create_collaborator, is_path_name_allowed from openfl.utilities.utils import getfqdn_env -if __name__ == '__main__': + +def main(): # Test the pipeline parser = argparse.ArgumentParser() workspace_choice = [] @@ -31,6 +32,12 @@ args = parser.parse_args() fed_workspace = args.fed_workspace + + # Check if the path name is allowed before creating the workspace + if not is_path_name_allowed(fed_workspace): + print(f"The path name {fed_workspace} is not allowed") + return + archive_name = f'{fed_workspace}.zip' fqdn = getfqdn_env() template = args.template @@ -81,3 +88,7 @@ dir1 = workspace_root / col1 / fed_workspace executor.submit(check_call, ['fx', 'collaborator', 'start', '-n', col1], cwd=dir1) shutil.rmtree(workspace_root) + + +if __name__ == '__main__': + main() diff --git a/tests/github/test_gandlf.py b/tests/github/test_gandlf.py index 68b00a2382..a57f9f53a0 100644 --- a/tests/github/test_gandlf.py +++ b/tests/github/test_gandlf.py @@ -10,7 +10,7 @@ from subprocess import check_call from concurrent.futures import ProcessPoolExecutor -from tests.github.utils import create_collaborator, certify_aggregator +from tests.github.utils import create_collaborator, certify_aggregator, is_path_name_allowed from openfl.utilities.utils import getfqdn_env @@ -19,7 +19,7 @@ def exec(command, directory): check_call(command) -if __name__ == '__main__': +def main(): parser = argparse.ArgumentParser() parser.add_argument('--template', default='keras_cnn_mnist') parser.add_argument('--fed_workspace', default='fed_work12345alpha81671') @@ -34,6 +34,12 @@ def exec(command, directory): origin_dir = Path().resolve() args = parser.parse_args() fed_workspace = args.fed_workspace + + # Check if the path name is allowed before creating the workspace + if not is_path_name_allowed(fed_workspace): + print(f"The path name {fed_workspace} is not allowed") + return + archive_name = f'{fed_workspace}.zip' fqdn = getfqdn_env() template = args.template @@ -116,3 +122,7 @@ def exec(command, directory): dir2 = workspace_root / col2 / fed_workspace executor.submit(exec, ['fx', 'collaborator', 'start', '-n', col2], dir2) shutil.rmtree(workspace_root) + + +if __name__ == '__main__': + main() diff --git a/tests/github/test_hello_federation.py b/tests/github/test_hello_federation.py index 7e9e676fbe..e9071eed1c 100644 --- a/tests/github/test_hello_federation.py +++ b/tests/github/test_hello_federation.py @@ -9,10 +9,11 @@ from concurrent.futures import ProcessPoolExecutor from openfl.utilities.utils import rmtree -from tests.github.utils import create_collaborator, create_certified_workspace, certify_aggregator +from tests.github.utils import create_collaborator, create_certified_workspace, certify_aggregator, is_path_name_allowed from openfl.utilities.utils import getfqdn_env -if __name__ == '__main__': + +def main(): # Test the pipeline parser = argparse.ArgumentParser() workspace_choice = [] @@ -32,6 +33,12 @@ origin_dir = Path.cwd().resolve() args = parser.parse_args() fed_workspace = args.fed_workspace + + # Check if the path name is allowed before creating the workspace + if not is_path_name_allowed(fed_workspace): + print(f"The path name {fed_workspace} is not allowed") + return + archive_name = f'{fed_workspace}.zip' fqdn = getfqdn_env() template = args.template @@ -73,3 +80,7 @@ os.chdir(origin_dir) rmtree(workspace_root) + + +if __name__ == '__main__': + main() diff --git a/tests/github/utils.py b/tests/github/utils.py index b265448111..06e7f8ae97 100644 --- a/tests/github/utils.py +++ b/tests/github/utils.py @@ -117,3 +117,22 @@ def create_signed_cert_for_collaborator(col, data_path): os.remove(f) # Remove request archive os.remove(f'col_{col}_to_agg_cert_request.zip') + + +def is_path_name_allowed(path): + """ + Check if given path name is allowed. + Allow alphanumeric characters, hyphens and underscores. + Also, / in case of a nested directory. + + Args: + path (str): The path name to check. + Returns: + bool: True if the path name is allowed, False otherwise. + """ + special_characters = "!@#$%^&*()+?=,<>" + + if any(c in special_characters for c in path): + return False + else: + return True