diff --git a/.github/workflows/wf_functional_e2e.yml b/.github/workflows/wf_functional_e2e.yml new file mode 100644 index 0000000000..f5deac471a --- /dev/null +++ b/.github/workflows/wf_functional_e2e.yml @@ -0,0 +1,95 @@ +--- +#--------------------------------------------------------------------------- +# Workflow to run Task Runner E2E tests via Docker +# Authors - Noopur, Payal Chaurasiya +#--------------------------------------------------------------------------- +name: Workflow Functional E2E + +on: + pull_request: + branches: [ develop ] + types: [opened, synchronize, reopened, ready_for_review] + + workflow_dispatch: + inputs: + num_rounds: + description: "Number of rounds to train" + required: false + default: "2" + type: string + num_collaborators: + description: "Number of collaborators" + required: false + default: "2" + type: string + +permissions: + contents: read + +# Environment variables common for all the jobs +env: + NUM_ROUNDS: ${{ github.event.inputs.num_rounds || '2' }} + NUM_COLLABORATORS: ${{ github.event.inputs.num_collaborators || '2' }} + +jobs: + test_wf_func: + if: github.event.pull_request.draft == false + name: wf_func + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Python + id: setup_python + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python_version }} + + - name: Install dependencies + id: install_dependencies + run: | + python -m pip install --upgrade pip + pip install . + pip install -r test-requirements.txt + pip install -r openfl-tutorials/experimental/workflow/workflow_interface_requirements.txt + + - name: Run Work Flow Functional tests + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/wf_local_func_tests.py \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Work Flow Functional tests run completed" + + - name: Print test summary + id: print_test_summary + if: ${{ always() }} + run: | + export PYTHONPATH="$PYTHONPATH:." + python tests/end_to_end/utils/summary_helper.py + echo "Test summary printed" + + - name: Create Tar (exclude cert and data folders) + id: tar_files + if: ${{ always() }} + run: | + tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results + + - name: Upload Artifacts + id: upload_artifacts + uses: actions/upload-artifact@v4 + if: ${{ always() }} + with: + name: wf_func_${{ github.event.inputs.model_name || 'default_model' }}_python${{ matrix.python_version }}_${{ github.run_id }} + path: result.tar diff --git a/.gitignore b/.gitignore index 8a106933ef..f9de4ad409 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,5 @@ venv/* .eggs eggs/* *.pyi +.metaflow/* results/* \ No newline at end of file diff --git a/tests/end_to_end/test_suites/wf_local_func_tests.py b/tests/end_to_end/test_suites/wf_local_func_tests.py new file mode 100644 index 0000000000..223ecbfdaa --- /dev/null +++ b/tests/end_to_end/test_suites/wf_local_func_tests.py @@ -0,0 +1,251 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import logging +import pytest +import os +import shutil +import random +from metaflow import Step + +from tests.end_to_end.utils.common_fixtures import fx_local_federated_workflow, fx_local_federated_workflow_prvt_attr +from tests.end_to_end.workflow.exclude_flow import TestFlowExclude +from tests.end_to_end.workflow.include_exclude_flow import TestFlowIncludeExclude +from tests.end_to_end.workflow.include_flow import TestFlowInclude +from tests.end_to_end.workflow.internal_loop import TestFlowInternalLoop +from tests.end_to_end.workflow.reference_flow import TestFlowReference +from tests.end_to_end.workflow.reference_include_flow import TestFlowReferenceWithInclude +from tests.end_to_end.workflow.reference_exclude import TestFlowReferenceWithExclude +from tests.end_to_end.workflow.subset_flow import TestFlowSubsetCollaborators +from tests.end_to_end.workflow.private_attr_wo_callable import TestFlowPrivateAttributesWoCallable +from tests.end_to_end.workflow.private_attributes_flow import TestFlowPrivateAttributes +from tests.end_to_end.workflow.private_attr_both import TestFlowPrivateAttributesBoth + +from tests.end_to_end.utils import wf_helper as wf_helper + +log = logging.getLogger(__name__) + +def test_exclude_flow(request, fx_local_federated_workflow): + """ + Test if variable is excluded, variables not show in next step + and all other variables will be visible to next step + """ + log.info("Starting test_exclude_flow") + flflow = TestFlowExclude(checkpoint=True) + flflow.runtime = fx_local_federated_workflow.runtime + for i in range(request.config.num_rounds): + log.info(f"Starting round {i}...") + flflow.run() + log.info("Successfully ended test_exclude_flow") + + +def test_include_exclude_flow(request, fx_local_federated_workflow): + """ + Test variables which are excluded will not show up in next step + Test variables which are included will show up in next step + """ + log.info("Starting test_include_exclude_flow") + flflow = TestFlowIncludeExclude(checkpoint=True) + flflow.runtime = fx_local_federated_workflow.runtime + for i in range(request.config.num_rounds): + log.info(f"Starting round {i}...") + flflow.run() + log.info("Successfully ended test_include_exclude_flow") + + +def test_include_flow(request, fx_local_federated_workflow): + """ + Test if variable is included, variables will show up in next step + All other variables will not show up + """ + log.info("Starting test_include_flow") + flflow = TestFlowInclude(checkpoint=True) + flflow.runtime = fx_local_federated_workflow.runtime + for i in range(request.config.num_rounds): + log.info(f"Starting round {i}...") + flflow.run() + log.info("Successfully ended test_include_flow") + + +def test_internal_loop(request, fx_local_federated_workflow): + """ + Verify that through internal loop, rounds to train is set + """ + log.info("Starting test_internal_loop") + model = None + optimizer = None + + flflow = TestFlowInternalLoop(model, optimizer, request.config.num_rounds, checkpoint=True) + flflow.runtime = fx_local_federated_workflow.runtime + flflow.run() + + expected_flow_steps = [ + "join", + "internal_loop", + "agg_model_mean", + "collab_model_update", + "local_model_mean", + "start", + "end", + ] + + steps_present_in_cli, missing_steps_in_cli, extra_steps_in_cli = wf_helper.validate_flow( + flflow, expected_flow_steps + ) + + assert len(steps_present_in_cli) == len(expected_flow_steps), "Number of steps fetched from Datastore through CLI do not match the Expected steps provided" + assert len(missing_steps_in_cli) == 0, f"Following steps missing from Datastore: {missing_steps_in_cli}" + assert len(extra_steps_in_cli) == 0, f"Following steps are extra in Datastore: {extra_steps_in_cli}" + assert flflow.end_count == 1, "End function called more than one time" + + log.info("\n Summary of internal flow testing \n" + "No issues found and below are the tests that ran successfully\n" + "1. Number of training completed is equal to training rounds\n" + "2. CLI steps and Expected steps are matching\n" + "3. Number of tasks are aligned with number of rounds and number of collaborators\n" + "4. End function executed one time") + log.info("Successfully ended test_internal_loop") + + +@pytest.mark.parametrize("fx_local_federated_workflow", [("init_collaborator_private_attr_index", "int", None )], indirect=True) +def test_reference_flow(request, fx_local_federated_workflow): + """ + Test reference variables matched through out the flow + """ + log.info("Starting test_reference_flow") + flflow = TestFlowReference(checkpoint=True) + flflow.runtime = fx_local_federated_workflow.runtime + for i in range(request.config.num_rounds): + log.info(f"Starting round {i}...") + flflow.run() + log.info("Successfully ended test_reference_flow") + + +def test_reference_include_flow(request, fx_local_federated_workflow): + """ + Test reference variables matched if included else not + """ + log.info("Starting test_reference_include_flow") + flflow = TestFlowReferenceWithInclude(checkpoint=True) + flflow.runtime = fx_local_federated_workflow.runtime + for i in range(request.config.num_rounds): + log.info(f"Starting round {i}...") + flflow.run() + log.info("Successfully ended test_reference_include_flow") + + +def test_reference_exclude_flow(request, fx_local_federated_workflow): + """ + Test reference variables matched if not excluded + """ + log.info("Starting test_reference_exclude_flow") + flflow = TestFlowReferenceWithExclude(checkpoint=True) + flflow.runtime = fx_local_federated_workflow.runtime + for i in range(request.config.num_rounds): + log.info(f"Starting round {i}...") + flflow.run() + log.info("Successfully ended test_reference_exclude_flow") + + +@pytest.mark.parametrize("fx_local_federated_workflow", [("init_collaborator_private_attr_name", "str", None )], indirect=True) +def test_subset_collaborators(request, fx_local_federated_workflow): + """ + Test the subset of collaborators in a federated workflow. + + Parameters: + request (FixtureRequest): The request fixture provides information about the requesting test function. + fx_local_federated_workflow (Fixture): The fixture for the local federated workflow. + + Tests: + - Ensure the test starts and ends correctly. + - Verify the number of collaborators matches the expected subset. + - Check that the flow runs for each subset collaborator. + """ + log.info("Starting test_subset_collaborators") + collaborators = fx_local_federated_workflow.collaborators + + random_ints = random.sample(range(1, len(collaborators) + 1), len(collaborators)) + + collaborators = fx_local_federated_workflow.runtime.collaborators + for round_num in range(len(collaborators)): + log.info(f"Starting round {round_num}...") + + if os.path.exists(".metaflow"): + shutil.rmtree(".metaflow") + + flflow = TestFlowSubsetCollaborators(checkpoint=True, random_ints=random_ints) + flflow.runtime = fx_local_federated_workflow.runtime + flflow.run() + subset_collaborators = flflow.subset_collaborators + collaborators_ran = flflow.collaborators_ran + random_ints = flflow.random_ints + random_ints.remove(len(subset_collaborators)) + + step = Step( + f"TestFlowSubsetCollaborators/{flflow._run_id}/" + + "test_valid_collaborators" + ) + + assert len(list(step)) == len(subset_collaborators), ( + f"...Flow only ran for {len(list(step))} " + + f"instead of the {len(subset_collaborators)} expected " + + f"collaborators- Testcase Failed." + ) + log.info( + f"Found {len(list(step))} tasks for each of the " + + f"{len(subset_collaborators)} collaborators" + ) + log.info(f'subset_collaborators = {subset_collaborators}') + log.info(f'collaborators_ran = {collaborators_ran}') + for collaborator_name in subset_collaborators: + assert collaborator_name in collaborators_ran, ( + f"...Flow did not execute for " + + f"collaborator {collaborator_name}" + + f" - Testcase Failed." + ) + + log.info( + f"Testing FederatedFlow - Ending test for validating " + + f"the subset of collaborators.") + log.info("Successfully ended test_subset_collaborators") + + +def test_private_attr_wo_callable(request, fx_local_federated_workflow_prvt_attr): + """ + Set private attribute without callable function i.e through direct assignment + """ + log.info("Starting test_private_attr_wo_callable") + flflow = TestFlowPrivateAttributesWoCallable(checkpoint=True) + flflow.runtime = fx_local_federated_workflow_prvt_attr.runtime + for i in range(request.config.num_rounds): + log.info(f"Starting round {i}...") + flflow.run() + log.info("Successfully ended test_private_attr_wo_callable") + + +@pytest.mark.parametrize("fx_local_federated_workflow", [("init_collaborate_pvt_attr_np", "int", "init_agg_pvt_attr_np" )], indirect=True) +def test_private_attributes(request, fx_local_federated_workflow): + """ + Set private attribute through callable function + """ + log.info("Starting test_private_attributes") + flflow = TestFlowPrivateAttributes(checkpoint=True) + flflow.runtime = fx_local_federated_workflow.runtime + for i in range(request.config.num_rounds): + log.info(f"Starting round {i}...") + flflow.run() + log.info("Successfully ended test_private_attributes") + + +@pytest.mark.parametrize("fx_local_federated_workflow_prvt_attr", [("init_collaborate_pvt_attr_np", "int", "init_agg_pvt_attr_np" )], indirect=True) +def test_private_attr_both(request, fx_local_federated_workflow_prvt_attr): + """ + Set private attribute through callable function and direct assignment + """ + log.info("Starting test_private_attr_both") + flflow = TestFlowPrivateAttributesBoth(checkpoint=True) + flflow.runtime = fx_local_federated_workflow_prvt_attr.runtime + for i in range(5): + log.info(f"Starting round {i}...") + flflow.run() + log.info("Successfully ended test_private_attr_both") diff --git a/tests/end_to_end/utils/common_fixtures.py b/tests/end_to_end/utils/common_fixtures.py index d4912a0e0f..03b62263c8 100644 --- a/tests/end_to_end/utils/common_fixtures.py +++ b/tests/end_to_end/utils/common_fixtures.py @@ -5,18 +5,33 @@ import collections import concurrent.futures import os +import logging +import numpy as np import tests.end_to_end.utils.docker_helper as dh +from tests.end_to_end.utils.wf_helper import ( + init_collaborator_private_attr_index, + init_collaborator_private_attr_name, + init_collaborate_pvt_attr_np, + init_agg_pvt_attr_np +) import tests.end_to_end.utils.federation_helper as fh from tests.end_to_end.models import aggregator as agg_model, model_owner as mo_model + +log = logging.getLogger(__name__) + # Define a named tuple to store the objects for model owner, aggregator, and collaborators federation_fixture = collections.namedtuple( "federation_fixture", "model_owner, aggregator, collaborators, workspace_path, local_bind_path", ) +workflow_local_fixture = collections.namedtuple( + "workflow_local_fixture", + "aggregator, collaborators, runtime", +) @pytest.fixture(scope="function") def fx_federation(request): @@ -113,3 +128,126 @@ def fx_federation(request): workspace_path=workspace_path, local_bind_path=local_bind_path, ) + +@pytest.fixture(scope="function") +def fx_local_federated_workflow(request): + """ + Fixture to set up a local federated workflow for testing. + This fixture initializes an `Aggregator` and sets up a list of collaborators + based on the number specified in the test configuration. It also configures + a `LocalRuntime` with the aggregator, collaborators, and an optional backend + if specified in the test configuration. + Args: + request (FixtureRequest): The pytest request object that provides access + to the test configuration. + Yields: + LocalRuntime: An instance of `LocalRuntime` configured with the aggregator, + collaborators, and backend. + """ + # Import is done inline because Task Runner does not support importing below penfl packages + + from openfl.experimental.workflow.interface import Aggregator, Collaborator + from openfl.experimental.workflow.runtime import LocalRuntime + + collab_callback_func = request.param[0] if hasattr(request, 'param') and request.param else None + collab_value = request.param[1] if hasattr(request, 'param') and request.param else None + agg_callback_func = request.param[2] if hasattr(request, 'param') and request.param else None + + # Get the callback functions from the globals using string + collab_callback_func_name = globals()[collab_callback_func] if collab_callback_func else None + agg_callback_func_name = globals()[agg_callback_func] if agg_callback_func else None + collaborators_list = [] + + if agg_callback_func_name: + aggregator = Aggregator( name="agg", + private_attributes_callable=agg_callback_func_name) + else: + aggregator = Aggregator() + + # Setup collaborators + for i in range(request.config.num_collaborators): + func_var = i if collab_value == "int" else f"collaborator{i}" if collab_value == "str" else None + collaborators_list.append( + Collaborator( + name=f"collaborator{i}", + private_attributes_callable=collab_callback_func_name, + param = func_var + ) + ) + + backend = request.config.backend if hasattr(request.config, 'backend') else None + if backend: + local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list, backend=backend) + local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list) + + # Return the federation fixture + return workflow_local_fixture( + aggregator=aggregator, + collaborators=collaborators_list, + runtime=local_runtime, + ) + + +@pytest.fixture(scope="function") +def fx_local_federated_workflow_prvt_attr(request): + """ + Fixture to set up a local federated workflow for testing. + This fixture initializes an `Aggregator` and sets up a list of collaborators + based on the number specified in the test configuration. It also configures + a `LocalRuntime` with the aggregator, collaborators, and an optional backend + if specified in the test configuration. + Args: + request (FixtureRequest): The pytest request object that provides access + to the test configuration. + Yields: + LocalRuntime: An instance of `LocalRuntime` configured with the aggregator, + collaborators, and backend. + """ + # Import is done inline because Task Runner does not support importing below penfl packages + + from openfl.experimental.workflow.interface import Aggregator, Collaborator + from openfl.experimental.workflow.runtime import LocalRuntime + + collab_callback_func = request.param[0] if hasattr(request, 'param') and request.param else None + collab_value = request.param[1] if hasattr(request, 'param') and request.param else None + agg_callback_func = request.param[2] if hasattr(request, 'param') and request.param else None + + # Get the callback functions from the globals using string + collab_callback_func_name = globals()[collab_callback_func] if collab_callback_func else None + agg_callback_func_name = globals()[agg_callback_func] if agg_callback_func else None + collaborators_list = [] + # Setup aggregator + if agg_callback_func_name: + aggregator = Aggregator(name="agg", + private_attributes_callable=agg_callback_func_name) + else: + aggregator = Aggregator() + + aggregator.private_attributes = { + "test_loader_pvt": np.random.rand(10, 28, 28) # Random data + } + # Setup collaborators + for i in range(request.config.num_collaborators): + func_var = i if collab_value == "int" else f"collaborator{i}" if collab_value == "str" else None + collab = Collaborator( + name=f"collaborator{i}", + private_attributes_callable=collab_callback_func_name, + param = func_var + ) + collab.private_attributes = { + "train_loader_pvt": np.random.rand(i * 50, 28, 28), + "test_loader_pvt": np.random.rand(i * 10, 28, 28), + } + collaborators_list.append(collab) + + backend = request.config.backend if hasattr(request.config, 'backend') else None + if backend: + local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list, backend=backend) + local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list) + + # Return the federation fixture + return workflow_local_fixture( + aggregator=aggregator, + collaborators=collaborators_list, + runtime=local_runtime, + ) diff --git a/tests/end_to_end/utils/exceptions.py b/tests/end_to_end/utils/exceptions.py index 0e0149e392..0a81717203 100644 --- a/tests/end_to_end/utils/exceptions.py +++ b/tests/end_to_end/utils/exceptions.py @@ -56,3 +56,7 @@ class WorkspaceImportException(Exception): class CollaboratorCreationException(Exception): """Exception for aggregator creation""" pass + +class ReferenceFlowException(Exception): + """Exception for reference flow""" + pass diff --git a/tests/end_to_end/utils/wf_helper.py b/tests/end_to_end/utils/wf_helper.py new file mode 100644 index 0000000000..019d906ff5 --- /dev/null +++ b/tests/end_to_end/utils/wf_helper.py @@ -0,0 +1,110 @@ +from metaflow import Flow +import logging +import numpy as np + +log = logging.getLogger(__name__) + +def validate_flow(flow_obj, expected_flow_steps): + """ + Validate: + 1. If the given training round were completed + 2. If all the steps were executed + 3. If each collaborator step was executed + 4. If end was executed once + """ + + cli_flow_obj = Flow("TestFlowInternalLoop") # Flow object from CLI + cli_flow_steps = list(cli_flow_obj.latest_run) # Steps from CLI + cli_step_names = [step.id for step in cli_flow_steps] + + # 1. If the given training round were completed + assert flow_obj.training_rounds == flow_obj.train_count, "Number of training completed is not equal to training rounds" + + for step in cli_flow_steps: + task_count = 0 + func = getattr(flow_obj, step.id) + for task in list(step): + task_count = task_count + 1 + + # Each aggregator step should be executed for training rounds times + if ( + (func.aggregator_step is True) + and (task_count != flow_obj.training_rounds) + and (step.id != "end") + ): + assert False, f"More than one execution detected for Aggregator Step: {step}" + + # Each collaborator step is executed for (training rounds)*(number of collaborator) times + if (func.collaborator_step is True) and ( + task_count != len(flow_obj.collaborators) * flow_obj.training_rounds + ): + assert False, f"Incorrect number of execution detected for Collaborator Step: {step}. Expected: {flow_obj.training_rounds*len(flow_obj.collaborators)} Actual: {task_count}" + + steps_present_in_cli = [ + step for step in expected_flow_steps if step in cli_step_names + ] + missing_steps_in_cli = [ + step for step in expected_flow_steps if step not in cli_step_names + ] + extra_steps_in_cli = [ + step for step in cli_step_names if step not in expected_flow_steps + ] + return steps_present_in_cli, missing_steps_in_cli, extra_steps_in_cli + + +def init_collaborator_private_attr_index(param): + """ + Initialize a collaborator's private attribute index. + + Args: + param (int): The initial value for the index. + + Returns: + dict: A dictionary with the key 'index' and the value of `param` incremented by 1. + """ + return {"index": param + 1} + + +def init_collaborator_private_attr_name(param): + """ + Initialize a collaborator's private attribute name. + + Args: + param (str): The name to be assigned to the collaborator's private attribute. + + Returns: + dict: A dictionary with the key 'name' and the value of the provided parameter. + """ + return {"name": param} + + +def init_collaborate_pvt_attr_np(param): + """ + Initialize private attributes for collaboration with numpy arrays. + + This function generates random numpy arrays for training and testing loaders + based on the given parameter. + + Args: + param (int): A multiplier to determine the size of the generated arrays. + + Returns: + dict: A dictionary containing: + - "train_loader" (numpy.ndarray): A numpy array of shape (param * 50, 28, 28) with random values. + - "test_loader" (numpy.ndarray): A numpy array of shape (param * 10, 28, 28) with random values. + """ + return { + "train_loader": np.random.rand(param * 50, 28, 28), + "test_loader": np.random.rand(param * 10, 28, 28), + } + + +def init_agg_pvt_attr_np(): + """ + Initialize a dictionary with a private attribute for testing. + + Returns: + dict: A dictionary containing a single key "test_loader" with a value + of a NumPy array of shape (10, 28, 28) filled with random values. + """ + return {"test_loader": np.random.rand(10, 28, 28)} diff --git a/tests/end_to_end/workflow/exclude_flow.py b/tests/end_to_end/workflow/exclude_flow.py new file mode 100644 index 0000000000..f364dcbbaa --- /dev/null +++ b/tests/end_to_end/workflow/exclude_flow.py @@ -0,0 +1,114 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import logging + +from openfl.experimental.workflow.interface import FLSpec +from openfl.experimental.workflow.placement import aggregator, collaborator + +log = logging.getLogger(__name__) + +class TestFlowExclude(FLSpec): + """ + Testflow to validate exclude functionality in Federated Flow + """ + + @aggregator + def start(self): + """ + Flow start. + """ + log.info("Testing WorkFlow - Starting Test for Exclude Attributes") + self.collaborators = self.runtime.collaborators + + self.exclude_agg_to_agg = 10 + self.include_agg_to_agg = 100 + self.next(self.test_exclude_agg_to_agg, exclude=["exclude_agg_to_agg"]) + + @aggregator + def test_exclude_agg_to_agg(self): + """ + Testing whether attributes are excluded from agg to agg + """ + assert hasattr(self, "include_agg_to_agg") is True, "include_agg_to_agg attribute missing" + assert hasattr(self, "exclude_agg_to_agg") is False, "exclude_agg_to_agg attribute should be excluded" + log.info("Exclude test passed in test_exclude_agg_to_agg") + + self.exclude_agg_to_collab = 20 + self.include_agg_to_collab = 100 + self.next( + self.test_exclude_agg_to_collab, + foreach="collaborators", + exclude=["exclude_agg_to_collab"], + ) + + @collaborator + def test_exclude_agg_to_collab(self): + """ + Testing whether attributes are excluded from agg to collab + """ + + assert hasattr(self, "include_agg_to_agg") is True, "include_agg_to_agg attribute missing" + assert hasattr(self, "include_agg_to_collab") is True, "include_agg_to_collab attribute missing" + assert hasattr(self, "exclude_agg_to_agg") is False, "exclude_agg_to_agg attribute should be excluded" + assert hasattr(self, "exclude_agg_to_collab") is False, "exclude_agg_to_collab attribute should be excluded" + log.info("Exclude test passed in test_exclude_agg_to_collab") + + self.exclude_collab_to_collab = 10 + self.include_collab_to_collab = 44 + self.next( + self.test_exclude_collab_to_collab, + exclude=["exclude_collab_to_collab"], + ) + + @collaborator + def test_exclude_collab_to_collab(self): + """ + Testing whether attributes are excluded from collab to collab + """ + + assert hasattr(self, "include_agg_to_agg") is True, "include_agg_to_agg attribute missing" + assert hasattr(self, "include_agg_to_collab") is True, "include_agg_to_collab attribute missing" + assert hasattr(self, "include_collab_to_collab") is True, "include_collab_to_collab attribute missing" + assert hasattr(self, "exclude_agg_to_agg") is False, "exclude_agg_to_agg attribute should be excluded" + assert hasattr(self, "exclude_agg_to_collab") is False, "exclude_agg_to_collab attribute should be excluded" + assert hasattr(self, "exclude_collab_to_collab") is False, "exclude_collab_to_collab attribute should be excluded" + log.info("Exclude test passed in test_exclude_collab_to_collab") + + self.exclude_collab_to_agg = 20 + self.include_collab_to_agg = 56 + self.next(self.join, exclude=["exclude_collab_to_agg"]) + + @aggregator + def join(self, inputs): + """ + Testing whether attributes are excluded from collab to agg + """ + # Aggregator attribute check + validate = ( + hasattr(self, "include_agg_to_agg") is True + and hasattr(self, "include_agg_to_collab") is True + and hasattr(self, "exclude_agg_to_collab") is True + and hasattr(self, "exclude_agg_to_agg") is False + ) + + # Collaborator attribute check + for input in inputs: + validation = validate and ( + hasattr(input, "include_collab_to_collab") is True + and hasattr(input, "exclude_collab_to_collab") is False + and hasattr(input, "exclude_collab_to_agg") is False + and hasattr(input, "include_collab_to_agg") is True + ) + assert validation, "Exclude test failed in join" + log.info("Exclude test passed in join") + self.next(self.end) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + + """ + log.info("Testing FederatedFlow - Ending Test for Exclude Attributes") diff --git a/tests/end_to_end/workflow/include_exclude_flow.py b/tests/end_to_end/workflow/include_exclude_flow.py new file mode 100644 index 0000000000..b30e00d8d1 --- /dev/null +++ b/tests/end_to_end/workflow/include_exclude_flow.py @@ -0,0 +1,115 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import logging +from openfl.experimental.workflow.interface import FLSpec +from openfl.experimental.workflow.placement import aggregator, collaborator + +log = logging.getLogger(__name__) + +class TestFlowIncludeExclude(FLSpec): + """ + Testflow to validate include and exclude functionality in Federated Flow. + """ + + @aggregator + def start(self): + """ + Flow start. + """ + log.info("Testing FederatedFlow - Starting Test for Include and Exclude Attributes") + self.collaborators = self.runtime.collaborators + + self.exclude_agg_to_agg = 10 + self.include_agg_to_agg = 100 + self.next(self.test_include_exclude_agg_to_agg, exclude=["exclude_agg_to_agg"]) + + @aggregator + def test_include_exclude_agg_to_agg(self): + """ + Testing whether attributes are excluded from agg to agg + """ + assert hasattr(self, "include_agg_to_agg") is True and hasattr(self, "exclude_agg_to_agg") is False, \ + "Exclude test failed in test_include_exclude_agg_to_agg" + log.info("Exclude test passed in test_include_exclude_agg_to_agg") + + self.include_agg_to_collab = 100 + self.exclude_agg_to_collab = 78 + self.next( + self.test_include_exclude_agg_to_collab, + foreach="collaborators", + include=["include_agg_to_collab", "collaborators"], + ) + + @collaborator + def test_include_exclude_agg_to_collab(self): + """ + Testing whether attributes are included from agg to collab + """ + assert ( + hasattr(self, "include_agg_to_agg") is False + and hasattr(self, "exclude_agg_to_agg") is False + and hasattr(self, "exclude_agg_to_collab") is False + and hasattr(self, "include_agg_to_collab") is True + ), "Include test failed in test_include_exclude_agg_to_collab" + log.info("Include test passed in test_include_exclude_agg_to_collab") + + self.exclude_collab_to_collab = 10 + self.include_collab_to_collab = 44 + self.next( + self.test_include_exclude_collab_to_collab, + exclude=["exclude_collab_to_collab"], + ) + + @collaborator + def test_include_exclude_collab_to_collab(self): + """ + Testing whether attributes are excluded from collab to collab + """ + assert ( + hasattr(self, "include_agg_to_agg") is False + and hasattr(self, "include_agg_to_collab") is True + and hasattr(self, "include_collab_to_collab") is True + and hasattr(self, "exclude_agg_to_agg") is False + and hasattr(self, "exclude_agg_to_collab") is False + and hasattr(self, "exclude_collab_to_collab") is False + ), "Exclude test failed in test_include_exclude_collab_to_collab" + log.info("Exclude test passed in test_include_exclude_collab_to_collab") + + self.exclude_collab_to_agg = 20 + self.include_collab_to_agg = 56 + self.next(self.join, include=["include_collab_to_agg"]) + + @aggregator + def join(self, inputs): + """ + Testing whether attributes are included from collab to agg + """ + # Aggregator attribute check + validate = ( + hasattr(self, "include_agg_to_agg") is True + and hasattr(self, "include_agg_to_collab") is True + and hasattr(self, "exclude_agg_to_collab") is True + and hasattr(self, "exclude_agg_to_agg") is False + ) + + # Collaborator attribute check + for input in inputs: + validation = validate and ( + hasattr(input, "include_collab_to_collab") is False + and hasattr(input, "exclude_collab_to_collab") is False + and hasattr(input, "exclude_collab_to_agg") is False + and hasattr(input, "include_collab_to_agg") is True + ) + + assert validation, "Include and Exclude tests failed in join" + log.info("Include and Exclude tests passed in join") + self.next(self.end) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + """ + log.info("Testing FederatedFlow - Ending Test for Include and Exclude Attributes") diff --git a/tests/end_to_end/workflow/include_flow.py b/tests/end_to_end/workflow/include_flow.py new file mode 100644 index 0000000000..7009e50a46 --- /dev/null +++ b/tests/end_to_end/workflow/include_flow.py @@ -0,0 +1,107 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from openfl.experimental.workflow.interface import FLSpec +from openfl.experimental.workflow.placement import aggregator, collaborator +import logging + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + +class TestFlowInclude(FLSpec): + """ + Testflow to validate include functionality in Federated Flow + """ + + @aggregator + def start(self): + """ + Flow start. + """ + log.info("Testing FederatedFlow - Starting Test for Include Attributes") + self.collaborators = self.runtime.collaborators + + self.exclude_agg_to_agg = 10 + self.include_agg_to_agg = 100 + self.next( + self.test_include_agg_to_agg, + include=["include_agg_to_agg", "collaborators"], + ) + + @aggregator + def test_include_agg_to_agg(self): + """ + Testing whether attributes are included from agg to agg + """ + assert hasattr(self, "include_agg_to_agg") and not hasattr(self, "exclude_agg_to_agg"), \ + "Include test failed in test_include_agg_to_agg" + + log.info("Include test passed in test_include_agg_to_agg") + + self.include_agg_to_collab = 100 + self.exclude_agg_to_collab = 78 + self.next( + self.test_include_agg_to_collab, + foreach="collaborators", + include=["include_agg_to_collab", "collaborators"], + ) + + @collaborator + def test_include_agg_to_collab(self): + """ + Testing whether attributes are included from agg to collab + """ + assert not hasattr(self, "include_agg_to_agg") and not hasattr(self, "exclude_agg_to_agg") \ + and not hasattr(self, "exclude_agg_to_collab") and hasattr(self, "include_agg_to_collab"), \ + "Include test failed in test_include_agg_to_collab" + + log.info("Include test passed in test_include_agg_to_collab") + + self.exclude_collab_to_collab = 10 + self.include_collab_to_collab = 44 + self.next( + self.test_include_collab_to_collab, + include=["include_collab_to_collab"], + ) + + @collaborator + def test_include_collab_to_collab(self): + """ + Testing whether attributes are included from collab to collab + """ + assert not hasattr(self, "include_agg_to_agg") and not hasattr(self, "include_agg_to_collab") \ + and hasattr(self, "include_collab_to_collab") and not hasattr(self, "exclude_agg_to_agg") \ + and not hasattr(self, "exclude_agg_to_collab") and not hasattr(self, "exclude_collab_to_collab"), \ + "Include test failed in test_include_collab_to_collab" + + log.info("Include test passed in test_include_collab_to_collab") + + self.exclude_collab_to_agg = 20 + self.include_collab_to_agg = 56 + self.next(self.join, include=["include_collab_to_agg"]) + + @aggregator + def join(self, inputs): + """ + Testing whether attributes are included from collab to agg + """ + validate = hasattr(self, "include_agg_to_agg") and hasattr(self, "include_agg_to_collab") \ + and hasattr(self, "exclude_agg_to_collab") and not hasattr(self, "exclude_agg_to_agg") + + for input in inputs: + validation = validate and not hasattr(input, "include_collab_to_collab") \ + and not hasattr(input, "exclude_collab_to_collab") \ + and not hasattr(input, "exclude_collab_to_agg") \ + and hasattr(input, "include_collab_to_agg") + + assert validation, "Include test failed in join" + log.info("Include test passed in join") + self.next(self.end) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + """ + log.info("Testing FederatedFlow - Ending Test for Include Attributes") diff --git a/tests/end_to_end/workflow/internal_loop.py b/tests/end_to_end/workflow/internal_loop.py new file mode 100644 index 0000000000..8c506018eb --- /dev/null +++ b/tests/end_to_end/workflow/internal_loop.py @@ -0,0 +1,86 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import logging +from openfl.experimental.workflow.interface.fl_spec import FLSpec +from openfl.experimental.workflow.interface.participants import Aggregator, Collaborator +from openfl.experimental.workflow.runtime import LocalRuntime +from openfl.experimental.workflow.placement.placement import aggregator, collaborator +import numpy as np +pass + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + +class TestFlowInternalLoop(FLSpec): + def __init__(self, model=None, optimizer=None, rounds=None, **kwargs): + super().__init__(**kwargs) + self.training_rounds = rounds + self.train_count = 0 + self.end_count = 0 + + @aggregator + def start(self): + """ + Flow start. + """ + log.info(f"Testing FederatedFlow - Test for Internal Loops - Round: {self.train_count} of Training Rounds: {self.training_rounds}") + self.model = np.zeros((10, 10, 10)) # Test model + self.collaborators = self.runtime.collaborators + self.next(self.agg_model_mean, foreach="collaborators") + + @collaborator + def agg_model_mean(self): + """ + Calculating the mean of the model created in start. + """ + self.agg_mean_value = np.mean(self.model) + log.info(f": {self.input} Mean of Agg model: {self.agg_mean_value}") + self.next(self.collab_model_update) + + @collaborator + def collab_model_update(self): + """ + Initializing the model with random numbers. + """ + log.info(f": {self.input} Initializing the model randomly") + self.model = np.random.randint(1, len(self.input), (10, 10, 10)) + self.next(self.local_model_mean) + + @collaborator + def local_model_mean(self): + """ + Calculating the mean of the model created in train. + """ + self.local_mean_value = np.mean(self.model) + log.info(f": {self.input} Local mean: {self.local_mean_value}") + self.next(self.join) + + @aggregator + def join(self, inputs): + """ + Joining inputs from collaborators + """ + self.agg_mean = sum(input.local_mean_value for input in inputs) / len(inputs) + log.info(f"Aggregated mean : {self.agg_mean}") + self.next(self.internal_loop) + + @aggregator + def internal_loop(self): + """ + Internally Loop for training rounds + """ + self.train_count = self.train_count + 1 + if self.training_rounds == self.train_count: + self.next(self.end) + else: + self.next(self.start) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + """ + self.end_count = self.end_count + 1 + log.info("This is the end of the flow") diff --git a/tests/end_to_end/workflow/private_attr_both.py b/tests/end_to_end/workflow/private_attr_both.py new file mode 100644 index 0000000000..44f171f723 --- /dev/null +++ b/tests/end_to_end/workflow/private_attr_both.py @@ -0,0 +1,142 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import logging +pass +from openfl.experimental.workflow.interface import FLSpec, Aggregator, Collaborator +from openfl.experimental.workflow.runtime import LocalRuntime +from openfl.experimental.workflow.placement import aggregator, collaborator + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + +class TestFlowPrivateAttributesBoth(FLSpec): + """ + Testflow to validate Aggregator private attributes are not accessible to collaborators + and vice versa + """ + + @aggregator + def start(self): + """ + Flow start. + """ + log.info("Testing FederatedFlow - Starting Test for accessibility of private attributes") + self.collaborators = self.runtime.collaborators + + validate_collab_private_attr(self, "test_loader", "start") + + self.exclude_agg_to_agg = 10 + self.include_agg_to_agg = 100 + self.next(self.aggregator_step, exclude=["exclude_agg_to_agg"]) + + @aggregator + def aggregator_step(self): + """ + Testing whether Agg private attributes are accessible in next agg step. + Collab private attributes should not be accessible here + """ + validate_collab_private_attr(self, "test_loader", "aggregator_step") + + self.include_agg_to_collab = 42 + self.exclude_agg_to_collab = 40 + self.next( + self.collaborator_step_a, + foreach="collaborators", + exclude=["exclude_agg_to_collab"], + ) + + @collaborator + def collaborator_step_a(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + validate_agg_private_attrs( + self, "train_loader", "test_loader", "collaborator_step_a" + ) + + self.exclude_collab_to_collab = 2 + self.include_collab_to_collab = 22 + self.next(self.collaborator_step_b, exclude=["exclude_collab_to_collab"]) + + @collaborator + def collaborator_step_b(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + + validate_agg_private_attrs( + self, "train_loader", "test_loader", "collaborator_step_b" + ) + self.exclude_collab_to_agg = 10 + self.include_collab_to_agg = 12 + self.next(self.join, exclude=["exclude_collab_to_agg"]) + + @aggregator + def join(self, inputs): + """ + Testing whether attributes are excluded from collab to agg + """ + # Aggregator should only be able to access its own attributes + assert hasattr(self, "test_loader"), "aggregator_join_aggregator_attributes_missing" + + for idx, collab in enumerate(inputs): + assert not (hasattr(collab, "train_loader") or hasattr(collab, "test_loader")), \ + f"join_collaborator_attributes_found for Collaborator: {collab}" + + self.next(self.end) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + """ + log.info("Testing FederatedFlow - Ending Test for accessibility of private attributes") + log.info("...Test case passed...") + +def validate_collab_private_attr(self, private_attr, step_name): + """ + Validates the private attributes of the aggregator and collaborators. + + This method checks that the aggregator has the specified private attribute + and ensures that collaborators do not have access to private attributes. + + Args: + private_attr (str): The name of the private attribute to validate. + step_name (str): The name of the current step in the workflow. + + Raises: + AssertionError: If the aggregator does not have the specified private attribute. + AssertionError: If any collaborator has access to private attributes. + """ + # Aggregator should only be able to access its own attributes + assert hasattr(self, private_attr), f"{step_name}_aggregator_attributes_missing" + + for idx, collab in enumerate(self.collaborators): + # Collaborator private attributes should not be accessible + assert not (type(self.collaborators[idx]) is not str or hasattr(self.runtime, "_collaborators") or hasattr(self.runtime, "__collaborators")), \ + f"{step_name}_collaborator_attributes_found for collaborator {collab}" + +def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): + """ + Validates that the collaborator has access to its own private attributes + and that the runtime does not have access to the aggregator's private attributes. + + Args: + private_attr_1 (str): The name of the first private attribute to check. + private_attr_2 (str): The name of the second private attribute to check. + step_name (str): The name of the step in the workflow for error messaging. + + Raises: + AssertionError: If the collaborator does not have the specified private attributes + or if the runtime has access to the aggregator's private attributes. + """ + # Collaborator should only be able to access its own attributes + assert hasattr(self, private_attr_1) and hasattr(self, private_attr_2), \ + f"{step_name}_collab_attributes_not_found" + + assert not hasattr(self.runtime, "_aggregator"), \ + f"{step_name}_aggregator_attributes_found" diff --git a/tests/end_to_end/workflow/private_attr_wo_callable.py b/tests/end_to_end/workflow/private_attr_wo_callable.py new file mode 100644 index 0000000000..b32758178f --- /dev/null +++ b/tests/end_to_end/workflow/private_attr_wo_callable.py @@ -0,0 +1,142 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import logging +from openfl.experimental.workflow.interface import FLSpec, Aggregator, Collaborator +from openfl.experimental.workflow.placement import aggregator, collaborator + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + + +class TestFlowPrivateAttributesWoCallable(FLSpec): + """ + Testflow to validate Aggregator private attributes are not accessible to collaborators + and vice versa + """ + + @aggregator + def start(self): + """ + Flow start. + """ + log.info("Testing FederatedFlow - Starting Test for accessibility of private attributes") + self.collaborators = self.runtime.collaborators + + validate_collab_private_attr(self, "test_loader_pvt", "start") + + self.exclude_agg_to_agg = 10 + self.include_agg_to_agg = 100 + self.next(self.aggregator_step, exclude=["exclude_agg_to_agg"]) + + @aggregator + def aggregator_step(self): + """ + Testing whether Agg private attributes are accessible in next agg step. + Collab private attributes should not be accessible here + """ + validate_collab_private_attr(self, "test_loader_pvt", "aggregator_step") + + self.include_agg_to_collab = 42 + self.exclude_agg_to_collab = 40 + self.next( + self.collaborator_step_a, + foreach="collaborators", + exclude=["exclude_agg_to_collab"], + ) + + @collaborator + def collaborator_step_a(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + validate_agg_private_attrs( + self, "train_loader_pvt", "test_loader_pvt", "collaborator_step_a" + ) + + self.exclude_collab_to_collab = 2 + self.include_collab_to_collab = 22 + self.next(self.collaborator_step_b, exclude=["exclude_collab_to_collab"]) + + @collaborator + def collaborator_step_b(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + + validate_agg_private_attrs( + self, "train_loader_pvt", "test_loader_pvt", "collaborator_step_b" + ) + self.exclude_collab_to_agg = 10 + self.include_collab_to_agg = 12 + self.next(self.join, exclude=["exclude_collab_to_agg"]) + + @aggregator + def join(self, inputs): + """ + Testing whether attributes are excluded from collab to agg + """ + # Aggregator should only be able to access its own attributes + assert hasattr(self, "test_loader_pvt"), "aggregator_join_aggregator_attributes_missing" + + for idx, collab in enumerate(inputs): + assert not (hasattr(collab, "train_loader_pvt") or hasattr(collab, "test_loader_pvt")), \ + f"join_collaborator_attributes_found for Collaborator: {collab}" + + self.next(self.end) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + """ + log.info("Testing FederatedFlow - Ending Test for accessibility of private attributes") + + +def validate_collab_private_attr(self, private_attr, step_name): + """ + Validates the private attributes of the aggregator and collaborators. + + This method checks that the aggregator has the specified private attribute + and that the collaborators do not have access to certain private attributes. + + Args: + private_attr (str): The name of the private attribute to check for the aggregator. + step_name (str): The name of the current step in the workflow. + + Raises: + AssertionError: If the aggregator does not have the specified private attribute. + AssertionError: If any collaborator has access to the specified private attributes. + """ + # Aggregator should only be able to access its own attributes + assert hasattr(self, private_attr), f"{step_name}_aggregator_attributes_missing" + + for idx, collab in enumerate(self.collaborators): + # Collaborator private attributes should not be accessible + assert not (type(self.collaborators[idx]) is not str or hasattr(self.runtime, "_collaborators") or hasattr(self.runtime, "__collaborators")), \ + f"{step_name}_collaborator_attributes_found for collaborator {collab}" + + +def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): + """ + Validates that the collaborator has access only to its own private attributes + and that the runtime does not have access to the aggregator's private attributes. + + Args: + private_attr_1 (str): The name of the first private attribute to check. + private_attr_2 (str): The name of the second private attribute to check. + step_name (str): The name of the step for error messaging. + + Raises: + AssertionError: If the collaborator does not have the specified private attributes + or if the runtime has access to the aggregator's private attributes. + """ + # Collaborator should only be able to access its own attributes + assert hasattr(self, private_attr_1) and hasattr(self, private_attr_2), \ + f"{step_name}collab_attributes_not_found" + + assert not hasattr(self.runtime, "_aggregator"), \ + f"{step_name}_aggregator_attributes_found" diff --git a/tests/end_to_end/workflow/private_attributes_flow.py b/tests/end_to_end/workflow/private_attributes_flow.py new file mode 100644 index 0000000000..92c9f90d2a --- /dev/null +++ b/tests/end_to_end/workflow/private_attributes_flow.py @@ -0,0 +1,139 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +pass +import numpy as np +import logging +from openfl.experimental.workflow.interface import FLSpec, Aggregator, Collaborator +from openfl.experimental.workflow.runtime import LocalRuntime +from openfl.experimental.workflow.placement import aggregator, collaborator + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + +class TestFlowPrivateAttributes(FLSpec): + """ + Testflow to validate Aggregator private attributes are not accessible to collaborators + and vice versa + """ + + @aggregator + def start(self): + """ + Flow start. + """ + log.info("Testing FederatedFlow - Starting Test for accessibility of private attributes") + self.collaborators = self.runtime.collaborators + + validate_collab_private_attr(self, "test_loader", "start") + + self.exclude_agg_to_agg = 10 + self.include_agg_to_agg = 100 + self.next(self.aggregator_step, exclude=["exclude_agg_to_agg"]) + + @aggregator + def aggregator_step(self): + """ + Testing whether Agg private attributes are accessible in next agg step. + Collab private attributes should not be accessible here + """ + validate_collab_private_attr(self, "test_loader", "aggregator_step") + + self.include_agg_to_collab = 42 + self.exclude_agg_to_collab = 40 + self.next( + self.collaborator_step_a, + foreach="collaborators", + exclude=["exclude_agg_to_collab"], + ) + + @collaborator + def collaborator_step_a(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + validate_agg_private_attrs( + self, "train_loader", "test_loader", "collaborator_step_a" + ) + + self.exclude_collab_to_collab = 2 + self.include_collab_to_collab = 22 + self.next(self.collaborator_step_b, exclude=["exclude_collab_to_collab"]) + + @collaborator + def collaborator_step_b(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + + validate_agg_private_attrs( + self, "train_loader", "test_loader", "collaborator_step_b" + ) + self.exclude_collab_to_agg = 10 + self.include_collab_to_agg = 12 + self.next(self.join, exclude=["exclude_collab_to_agg"]) + + @aggregator + def join(self, inputs): + """ + Testing whether attributes are excluded from collab to agg + """ + # Aggregator should only be able to access its own attributes + assert hasattr(self, "test_loader"), "aggregator_join_aggregator_attributes_missing" + + for idx, collab in enumerate(inputs): + assert not (hasattr(collab, "train_loader") or hasattr(collab, "test_loader")), \ + f"join_collaborator_attributes_found for Collaborator: {collab}" + + self.next(self.end) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + """ + log.info("Testing FederatedFlow - Ending Test for accessibility of private attributes") + log.info("...Test case passed...") + +def validate_collab_private_attr(self, private_attr, step_name): + """ + Validates the private attributes of the aggregator and collaborators. + + Args: + private_attr (str): The name of the private attribute to validate. + step_name (str): The name of the current step in the workflow. + + Raises: + AssertionError: If the aggregator does not have the specified private attribute. + AssertionError: If any collaborator's private attributes are accessible. + """ + # Aggregator should only be able to access its own attributes + assert hasattr(self, private_attr), f"{step_name}_aggregator_attributes_missing" + + for idx, collab in enumerate(self.collaborators): + # Collaborator private attributes should not be accessible + assert not (type(self.collaborators[idx]) is not str or hasattr(self.runtime, "_collaborators") or hasattr(self.runtime, "__collaborators")), \ + f"{step_name}_collaborator_attributes_found for collaborator {collab}" + + +def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): + """ + Validates that the collaborator can only access its own private attributes and not the aggregator's attributes. + + Args: + private_attr_1 (str): The name of the first private attribute to check. + private_attr_2 (str): The name of the second private attribute to check. + step_name (str): The name of the current step in the workflow. + + Raises: + AssertionError: If the collaborator does not have the specified private attributes or if the aggregator's attributes are accessible. + """ + # Collaborator should only be able to access its own attributes + assert hasattr(self, private_attr_1) and hasattr(self, private_attr_2), \ + f"{step_name}collab_attributes_not_found" + + assert not hasattr(self.runtime, "_aggregator"), \ + f"{step_name}_aggregator_attributes_found" diff --git a/tests/end_to_end/workflow/reference_exclude.py b/tests/end_to_end/workflow/reference_exclude.py new file mode 100644 index 0000000000..6f8ee766d0 --- /dev/null +++ b/tests/end_to_end/workflow/reference_exclude.py @@ -0,0 +1,237 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from openfl.experimental.workflow.interface import FLSpec +from openfl.experimental.workflow.placement import aggregator, collaborator + +import torch.nn as nn +import torch.optim as optim +import inspect +from types import MethodType +import logging + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + +MIN_COLLECTION_COUNT = 2 + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.linear1 = nn.Linear(60, 100) + self.linear2 = nn.Linear(100, 10) + + def forward(self, x): + x = self.linear1(x) + x = self.linear2(x) + return x + + +class TestFlowReferenceWithExclude(FLSpec): + + """ + Testflow to validate references of collaborator attributes in Federated Flow with exclude. + """ + + step_one_collab_attrs = [] + step_two_collab_attrs = [] + + @aggregator + def start(self): + """ + Flow start. + """ + self.agg_agg_attr_dict = {key: key for key in range(5)} + log.info("Testing FederatedFlow - Starting Test for validating references") + self.next(self.test_create_agg_attr, exclude=["agg_agg_attr_dict"]) + + @aggregator + def test_create_agg_attr(self): + """ + Create different types of objects + """ + self.agg_attr_list = [1, 2, 5, 6, 7, 8] + self.agg_attr_dict = {key: key for key in range(5)} + self.agg_attr_model = Net() + self.agg_attr_optimizer = optim.SGD( + self.agg_attr_model.parameters(), lr=1e-3, momentum=1e-2 + ) + self.collaborators = self.runtime.collaborators + + self.next( + self.test_create_collab_attr, + foreach="collaborators", + exclude=["agg_attr_list"], + ) + + @collaborator + def test_create_collab_attr(self): + """ + Create different types of objects + """ + self.collab_attr_list_one = [1, 2, 3, 5, 6, 8] + self.collab_attr_dict_one = {key: key for key in range(5)} + + TestFlowReferenceWithExclude.step_one_collab_attrs.append(self) + + if ( + len(TestFlowReferenceWithExclude.step_one_collab_attrs) + >= MIN_COLLECTION_COUNT + ): + collab_attr_list = filter_attrs(inspect.getmembers(self)) + matched_ref_dict = find_matched_references( + collab_attr_list, + TestFlowReferenceWithExclude.step_one_collab_attrs, + ) + validate_references(matched_ref_dict) + + self.next(self.test_create_more_collab_attr, exclude=["collab_attr_dict_one"]) + + @collaborator + def test_create_more_collab_attr(self): + """ + Create different types of objects + """ + self.collab_attr_list_two = [1, 2, 3, 5, 6, 8] + self.collab_attr_dict_two = {key: key for key in range(5)} + + TestFlowReferenceWithExclude.step_two_collab_attrs.append(self) + + if ( + len(TestFlowReferenceWithExclude.step_two_collab_attrs) + >= MIN_COLLECTION_COUNT + ): + collab_attr_list = filter_attrs(inspect.getmembers(self)) + matched_ref_dict = find_matched_references( + collab_attr_list, + TestFlowReferenceWithExclude.step_two_collab_attrs, + ) + validate_references(matched_ref_dict) + + self.next(self.join, exclude=["collab_attr_dict_two"]) + + @aggregator + def join(self, inputs): + """ + Iterate over the references of collaborator attributes + validate uniqueness of attributes and raise assertion + """ + all_attr_list = filter_attrs(inspect.getmembers(inputs[0])) + + matched_ref_dict = find_matched_references(all_attr_list, inputs) + validate_references(matched_ref_dict) + + all_shared_attr = "" + log.info("Reference with exclude keyword test summary:") + + for val in matched_ref_dict.values(): + all_shared_attr = all_shared_attr + ",".join(val) + + if all_shared_attr: + log.error(f"...Test case failed for {all_shared_attr}") + else: + log.info("...Test case passed for all the attributes.") + self.next(self.end) + + @aggregator + def end(self): + log.info("Testing FederatedFlow - Ending test for validating the references.") + TestFlowReferenceWithExclude.step_one_collab_attrs = [] + TestFlowReferenceWithExclude.step_two_collab_attrs = [] + + +def filter_attrs(attr_list): + """ + Filters a list of attributes based on specific criteria. + + Args: + attr_list (list): A list of tuples where each tuple contains an attribute name and its value. + + Returns: + list: A list of attribute names that meet the filtering criteria. + + The filtering criteria are: + - The attribute name does not start with an underscore. + - The attribute name is not in the list of reserved words: ["next", "runtime", "execute_next"]. + - The attribute name is not an attribute of the TestFlowReferenceWithExclude class. + - The attribute value is not an instance of MethodType. + """ + valid_attrs = [] + reserved_words = ["next", "runtime", "execute_next"] + for attr in attr_list: + if ( + not attr[0].startswith("_") + and attr[0] not in reserved_words + and not hasattr(TestFlowReferenceWithExclude, attr[0]) + ): + if not isinstance(attr[1], MethodType): + valid_attrs.append(attr[0]) + return valid_attrs + + +def find_matched_references(collab_attr_list, all_collaborators): + """ + Finds and logs matched references between collaborators based on their attributes. + + Args: + collab_attr_list (list): A list of attribute names to check for shared references. + all_collaborators (list): A list of collaborator objects to be checked. + + Returns: + dict: A dictionary where keys are collaborator inputs and values are lists of attribute names + that have shared references with other collaborators. + """ + matched_ref_dict = {} + for i in range(len(all_collaborators)): + matched_ref_dict[all_collaborators[i].input] = [] + + # For each attribute in the collaborator attribute list, check if any of the collaborator + # attributes are shared with another collaborator + for attr_name in collab_attr_list: + for i, curr_collab in enumerate(all_collaborators): + # Compare the current collaborator with the collaborator(s) that come(s) after it. + for next_collab in all_collaborators[i + 1:]: + # Check if both collaborators have the current attribute + if hasattr(curr_collab, attr_name) and hasattr(next_collab, attr_name): + # Check if both collaborators are sharing same reference + if getattr(curr_collab, attr_name) is getattr( + next_collab, attr_name + ): + matched_ref_dict[curr_collab.input].append(attr_name) + log.error( + f"... Reference test failed - {curr_collab.input} sharing same " + + f"{attr_name} reference with {next_collab.input}" + ) + + return matched_ref_dict + + +def validate_references(matched_ref_dict): + """ + Validates the references in the provided dictionary and updates the + TestFlowReferenceWithExclude.step_one_collab_attrs list with collaborators + sharing references. + + Args: + matched_ref_dict (dict): A dictionary where keys are collaborator names + and values are booleans indicating if they share + a reference. + + Returns: + None + """ + collborators_sharing_ref = [] + reference_flag = False + + for collab, val in matched_ref_dict.items(): + if val: + collborators_sharing_ref.append(collab) + reference_flag = True + if collborators_sharing_ref: + for collab in collborators_sharing_ref: + if collab not in TestFlowReferenceWithExclude.step_one_collab_attrs: + TestFlowReferenceWithExclude.step_one_collab_attrs.append(collab) + + if not reference_flag: + log.info("Pass : Reference test passed") diff --git a/tests/end_to_end/workflow/reference_flow.py b/tests/end_to_end/workflow/reference_flow.py new file mode 100644 index 0000000000..24f2cc9850 --- /dev/null +++ b/tests/end_to_end/workflow/reference_flow.py @@ -0,0 +1,352 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from openfl.experimental.workflow.interface import FLSpec +from openfl.experimental.workflow.placement import aggregator, collaborator +from tests.end_to_end.utils.exceptions import ReferenceFlowException + +import io +import math +import logging +import torch.nn as nn +import torch.optim as optim +import inspect +from types import MethodType + + + +log = logging.getLogger(__name__) + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.linear1 = nn.Linear(60, 100) + self.linear2 = nn.Linear(100, 10) + + def forward(self, x): + x = self.linear1(x) + x = self.linear2(x) + return x + + +class TestFlowReference(FLSpec): + """ + Testflow to validate references of collaborator attributes in Federated Flow. + """ + step_one_collab_attrs = [] + step_two_collab_attrs = [] + all_ref_error_dict = {} + agg_attr_dict = {} + + @aggregator + def start(self): + """ + Flow start. + + """ + log.info("Testing FederatedFlow - Starting Test for validating references.") + self.next(self.test_create_agg_attr) + + @aggregator + def test_create_agg_attr(self): + """ + Create different types of objects. + """ + + self.agg_attr_int = 10 + self.agg_attr_str = "Test string data" + self.agg_attr_list = [1, 2, 5, 6, 7, 8] + self.agg_attr_dict = {key: key for key in range(5)} + self.agg_attr_file = io.StringIO("Test file data in aggregator") + self.agg_attr_math = math.sqrt(2) + self.agg_attr_complex_num = complex(2, 3) + self.agg_attr_log = logging.getLogger("Test logger data in aggregator") + self.agg_attr_model = Net() + self.agg_attr_optimizer = optim.SGD( + self.agg_attr_model.parameters(), lr=1e-3, momentum=1e-2 + ) + self.collaborators = self.runtime.collaborators + + # get aggregator attributes + agg_attr_list = filter_attrs(inspect.getmembers(self)) + for attr in agg_attr_list: + agg_attr_id = id(getattr(self, attr)) + TestFlowReference.agg_attr_dict[attr] = agg_attr_id + self.next(self.test_create_collab_attr, foreach="collaborators") + + @collaborator + def test_create_collab_attr(self): + """ + Modify the attributes of aggregator to validate the references. + Create different types of objects. + """ + + self.agg_attr_int += self.index + self.agg_attr_str = self.agg_attr_str + " " + self.input + self.agg_attr_complex_num += complex(self.index, self.index) + self.agg_attr_math += self.index + self.agg_attr_log = " " + self.input + + self.collab_attr_int_one = 20 + self.index + self.collab_attr_str_one = "Test string data in collab " + self.input + self.collab_attr_list_one = [1, 2, 5, 6, 7, 8] + self.collab_attr_dict_one = {key: key for key in range(5)} + self.collab_attr_file_one = io.StringIO("Test file data in collaborator") + self.collab_attr_math_one = math.sqrt(self.index) + self.collab_attr_complex_num_one = complex(self.index, self.index) + self.collab_attr_log_one = logging.getLogger( + "Test logger data in collaborator " + self.input + ) + + # append attributes of collaborator + TestFlowReference.step_one_collab_attrs.append(self) + + if len(TestFlowReference.step_one_collab_attrs) >= 2: + collab_attr_list = filter_attrs(inspect.getmembers(self)) + matched_ref_dict = find_matched_references( + collab_attr_list, TestFlowReference.step_one_collab_attrs + ) + validate_collab_references(matched_ref_dict) + + self.next(self.test_create_more_collab_attr) + + @collaborator + def test_create_more_collab_attr(self): + """ + Create different types of objects. + """ + + self.collab_attr_int_two = 30 + self.index + self.collab_attr_str_two = "String reference three " + self.input + self.collab_attr_list_two = [1, 2, 3, 5, 6, 8] + self.collab_attr_dict_two = {key: key for key in range(5)} + self.collab_attr_file_two = io.StringIO("Test file reference one") + self.collab_attr_math_two = math.sqrt(2) + self.collab_attr_complex_num_two = complex(2, 3) + self.collab_attr_log_two = logging.getLogger( + "Test logger data in collaborator" + self.input + ) + + TestFlowReference.step_two_collab_attrs.append(self) + + if len(TestFlowReference.step_two_collab_attrs) >= 2: + collab_attr_list = filter_attrs(inspect.getmembers(self)) + matched_ref_dict = find_matched_references( + collab_attr_list, TestFlowReference.step_two_collab_attrs + ) + validate_collab_references(matched_ref_dict) + + self.next(self.join) + + @aggregator + def join(self, inputs): + """ + Iterate over the references of collaborator attributes + validate uniqueness of attributes and raise assertion + """ + + all_attr_list = filter_attrs(inspect.getmembers(inputs[0])) + agg_attrs = filter_attrs(inspect.getmembers(self)) + + # validate aggregator references are intact after coming out of collaborators. + validate_agg_attr_ref(agg_attrs, self) + + # validate collaborators references are not shared in between. + matched_ref_dict = find_matched_references(all_attr_list, inputs) + validate_collab_references(matched_ref_dict) + + # validate aggregator references are not shared with any of the collaborators. + validate_agg_collab_references(inputs, self, agg_attrs) + + all_shared_attr = "" + log.info("Reference test summary:") + for val in TestFlowReference.all_ref_error_dict.values(): + all_shared_attr = all_shared_attr + ",".join(val) + if all_shared_attr: + e = f"...Test case failed for {all_shared_attr}" + log.error(e) + raise ReferenceFlowException(e) + else: + log.info("...Test case passed for all the attributes.") + + self.next(self.end) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + + """ + log.info("Testing FederatedFlow - Ending test for validating the references.") + TestFlowReference.step_one_collab_attrs = [] + TestFlowReference.step_two_collab_attrs = [] + + +def filter_attrs(attr_list): + """ + Filters a list of attribute tuples to return only valid attribute names. + + An attribute is considered valid if: + - It does not start with an underscore. + - It is not in the list of reserved words: ["next", "runtime", "execute_next"]. + - It is not an attribute of the TestFlowReference class. + - It is not an instance of MethodType. + + Args: + attr_list (list): A list of tuples where each tuple contains an attribute name and its value. + + Returns: + list: A list of valid attribute names. + """ + valid_attrs = [] + reserved_words = ["next", "runtime", "execute_next"] + for attr in attr_list: + if ( + not attr[0].startswith("_") + and attr[0] not in reserved_words + and not hasattr(TestFlowReference, attr[0]) + ): + if not isinstance(attr[1], MethodType): + valid_attrs.append(attr[0]) + return valid_attrs + + +def find_matched_references(collab_attr_list, all_collaborators): + """ + Finds and logs matched references between collaborators based on their attributes. + + This function iterates through a list of collaborator attributes and checks if any of the + collaborators share the same reference for a given attribute. If a shared reference is found, + it logs an error and raises a ReferenceFlowException. + + Args: + collab_attr_list (list): A list of attribute names to check for shared references. + all_collaborators (list): A list of collaborator objects to be checked. + + Returns: + dict: A dictionary where the keys are the input attributes of the collaborators and the + values are lists of attribute names that have shared references. + + Raises: + ReferenceFlowException: If any two collaborators share the same reference for a given attribute. + """ + matched_ref_dict = {} + for i in range(len(all_collaborators)): + matched_ref_dict[all_collaborators[i].input] = [] + + # For each attribute in the collaborator attribute list, check if any of the collaborator + # attributes are shared with another collaborator + for attr_name in collab_attr_list: + for i, curr_collab in enumerate(all_collaborators): + # Compare the current collaborator with the collaborator(s) that come(s) after it. + for next_collab in all_collaborators[i + 1:]: + # Check if both collaborators have the current attribute + if hasattr(curr_collab, attr_name) and hasattr(next_collab, attr_name): + # Check if both collaborators are sharing same reference + if getattr(curr_collab, attr_name) is getattr( + next_collab, attr_name + ): + matched_ref_dict[curr_collab.input].append(attr_name) + e = f"... Reference test failed - {curr_collab.input} sharing same " \ + f"{attr_name} reference with {next_collab.input}" + log.error(e) + raise ReferenceFlowException(e) + + return matched_ref_dict + + +def validate_collab_references(matched_ref_dict): + """ + Validates the references shared by collaborators. + + This function checks the provided dictionary of matched references and + identifies collaborators who have shared references. It updates the + `all_ref_error_dict` attribute of the `TestFlowReference` class with + collaborators who have shared references. If no references are shared, + it logs a message indicating that the reference test passed. + + Args: + matched_ref_dict (dict): A dictionary where keys are collaborator + identifiers and values are boolean flags + indicating whether the collaborator has + shared references. + + Returns: + None + """ + collborators_sharing_ref = [] + reference_flag = False + + for collab, val in matched_ref_dict.items(): + if val: + collborators_sharing_ref.append(collab) + reference_flag = True + if collborators_sharing_ref: + for collab in collborators_sharing_ref: + if collab not in TestFlowReference.all_ref_error_dict: + TestFlowReference.all_ref_error_dict[collab] = matched_ref_dict.get( + collab + ) + + if not reference_flag: + log.info("Pass: Reference test passed for collaborators.") + + +def validate_agg_attr_ref(agg_attrs, agg_obj): + """ + Validates that the attributes of the aggregator object are intact after + coming out of collaborators by comparing their IDs with the reference + dictionary. + + Args: + agg_attrs (list): A list of attribute names to be validated. + agg_obj (object): The aggregator object whose attributes are to be validated. + + Raises: + ReferenceFlowException: If any of the aggregator attributes' references + are not intact. + """ + attr_flag = False + for attr in agg_attrs: + if TestFlowReference.agg_attr_dict.get(attr) == id(getattr(agg_obj, attr)): + attr_flag = True + if not attr_flag: + e = "...Aggregator references are not intact after coming out of collaborators." + log.error(e) + raise ReferenceFlowException(e) + else: + log.info("Pass: Aggregator references are intact after coming out of collaborators.") + + +def validate_agg_collab_references(all_collaborators, agg_obj, agg_attrs): + """ + Validates that the attributes of the aggregator object are not shared by reference with any of the collaborators. + + Args: + all_collaborators (list): A list of collaborator objects. + agg_obj (object): The aggregator object whose attributes are to be validated. + agg_attrs (list): A list of attribute names (strings) to be checked for reference sharing. + + Raises: + ReferenceFlowException: If any attribute of the aggregator object is found to be shared by reference with any collaborator. + """ + mis_matched_ref = {} + for collab in all_collaborators: + mis_matched_ref[collab.input] = [] + + attr_ref_flag = False + for attr in agg_attrs: + agg_attr_id = id(getattr(agg_obj, attr)) + for collab in all_collaborators: + collab_attr_id = id(getattr(collab, attr)) + if agg_attr_id is collab_attr_id: + attr_ref_flag = True + mis_matched_ref.get(collab).append(attr) + + if attr_ref_flag: + e = "...Aggregator references are shared with one or more collaborators." + log.error(e) + raise ReferenceFlowException(e) + else: + log.info("Pass: Reference test passed for aggregator.") diff --git a/tests/end_to_end/workflow/reference_include_flow.py b/tests/end_to_end/workflow/reference_include_flow.py new file mode 100644 index 0000000000..65acccc866 --- /dev/null +++ b/tests/end_to_end/workflow/reference_include_flow.py @@ -0,0 +1,204 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import torch.nn as nn +import torch.optim as optim +import inspect +from types import MethodType +import logging + +from openfl.experimental.workflow.interface import FLSpec +from openfl.experimental.workflow.placement import aggregator, collaborator + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + +MIN_COLLECTION_COUNT = 2 + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.linear1 = nn.Linear(60, 100) + self.linear2 = nn.Linear(100, 10) + + def forward(self, x): + x = self.linear1(x) + x = self.linear2(x) + return x + + +class TestFlowReferenceWithInclude(FLSpec): + + step_one_collab_attrs = [] + step_two_collab_attrs = [] + + @aggregator + def start(self): + self.agg_agg_attr_dict = {key: key for key in range(5)} + log.info("Testing FederatedFlow - Starting Test for validating references") + self.next(self.test_create_agg_attr, include=["agg_agg_attr_dict"]) + + @aggregator + def test_create_agg_attr(self): + self.agg_attr_list = [1, 2, 5, 6, 7, 8] + self.agg_attr_dict = {key: key for key in range(5)} + + self.agg_attr_model = Net() + self.agg_attr_optimizer = optim.SGD( + self.agg_attr_model.parameters(), lr=1e-3, momentum=1e-2 + ) + self.collaborators = self.runtime.collaborators + self.next( + self.test_create_collab_attr, + foreach="collaborators", + include=["collaborators", "agg_attr_list"], + ) + + @collaborator + def test_create_collab_attr(self): + self.collab_attr_list_one = [1, 2, 5, 6, 7, 8] + self.collab_attr_dict_one = {key: key for key in range(5)} + + TestFlowReferenceWithInclude.step_one_collab_attrs.append(self) + + if ( + len(TestFlowReferenceWithInclude.step_one_collab_attrs) + >= MIN_COLLECTION_COUNT + ): + collab_attr_list = filter_attrs(inspect.getmembers(self)) + matched_ref_dict = find_matched_references( + collab_attr_list, + TestFlowReferenceWithInclude.step_one_collab_attrs, + ) + validate_references(matched_ref_dict) + + self.next(self.test_create_more_collab_attr, include=["collab_attr_dict_one"]) + + @collaborator + def test_create_more_collab_attr(self): + self.collab_attr_list_two = [1, 2, 3, 5, 6, 8] + self.collab_attr_dict_two = {key: key for key in range(5)} + + TestFlowReferenceWithInclude.step_two_collab_attrs.append(self) + + if ( + len(TestFlowReferenceWithInclude.step_two_collab_attrs) + >= MIN_COLLECTION_COUNT + ): + collab_attr_list = filter_attrs(inspect.getmembers(self)) + matched_ref_dict = find_matched_references( + collab_attr_list, + TestFlowReferenceWithInclude.step_two_collab_attrs, + ) + validate_references(matched_ref_dict) + + self.next(self.join, include=["collab_attr_dict_two"]) + + @aggregator + def join(self, inputs): + all_attr_list = filter_attrs(inspect.getmembers(inputs[0])) + + matched_ref_dict = find_matched_references(all_attr_list, inputs) + validate_references(matched_ref_dict) + all_shared_attr = "" + log.info("Reference test summary:") + for val in matched_ref_dict.values(): + all_shared_attr = all_shared_attr + ",".join(val) + if all_shared_attr: + log.error(f"Test case failed for {all_shared_attr}") + else: + log.info("Test case passed for all the attributes.") + self.next(self.end) + + @aggregator + def end(self): + log.info("Testing FederatedFlow - Ending test for validating the references.") + TestFlowReferenceWithInclude.step_one_collab_attrs = [] + TestFlowReferenceWithInclude.step_two_collab_attrs = [] + + +def filter_attrs(attr_list): + """ + Filters a list of attributes, returning only those that are valid. + + An attribute is considered valid if: + - It does not start with an underscore. + - It is not in the list of reserved words: ["next", "runtime", "execute_next"]. + - It is not an attribute of the TestFlowReferenceWithInclude class. + - It is not an instance of MethodType. + + Args: + attr_list (list): A list of tuples where each tuple contains an attribute name and its value. + + Returns: + list: A list of valid attribute names. + """ + valid_attrs = [] + reserved_words = ["next", "runtime", "execute_next"] + for attr in attr_list: + if ( + not attr[0].startswith("_") + and attr[0] not in reserved_words + and not hasattr(TestFlowReferenceWithInclude, attr[0]) + ): + if not isinstance(attr[1], MethodType): + valid_attrs.append(attr[0]) + return valid_attrs + + +def find_matched_references(collab_attr_list, all_collaborators): + """ + Finds and logs matched references between collaborators based on specified attributes. + + Args: + collab_attr_list (list): List of attribute names to check for matches. + all_collaborators (list): List of collaborator objects to compare. + + Returns: + dict: A dictionary where keys are collaborator inputs and values are lists of attribute names + that have matched references with other collaborators. + """ + matched_ref_dict = {} + for i in range(len(all_collaborators)): + matched_ref_dict[all_collaborators[i].input] = [] + + for attr_name in collab_attr_list: + for i, curr_collab in enumerate(all_collaborators): + for next_collab in all_collaborators[i + 1:]: + if hasattr(curr_collab, attr_name) and hasattr(next_collab, attr_name): + if getattr(curr_collab, attr_name) is getattr( + next_collab, attr_name + ): + matched_ref_dict[curr_collab.input].append(attr_name) + log.error( + f"Reference test failed - {curr_collab.input} sharing same " + + f"{attr_name} reference with {next_collab.input}" + ) + + return matched_ref_dict + + +def validate_references(matched_ref_dict): + """ + Validates the references in the given dictionary. + + This function checks if any collaborators have shared references. + If any references are found, it raises an assertion error. + + Args: + matched_ref_dict (dict): A dictionary where keys are collaborator names + and values are booleans indicating if they have + shared references. + + Raises: + AssertionError: If any collaborator has shared references. + """ + collborators_sharing_ref = [] + reference_flag = False + + for collab, val in matched_ref_dict.items(): + if val: + collborators_sharing_ref.append(collab) + reference_flag = True + + assert not reference_flag, "Reference test failed" diff --git a/tests/end_to_end/workflow/subset_flow.py b/tests/end_to_end/workflow/subset_flow.py new file mode 100644 index 0000000000..c4fa8cae39 --- /dev/null +++ b/tests/end_to_end/workflow/subset_flow.py @@ -0,0 +1,70 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import logging +import random +from openfl.experimental.workflow.interface.fl_spec import FLSpec +from openfl.experimental.workflow.placement.placement import aggregator, collaborator + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + + +class TestFlowSubsetCollaborators(FLSpec): + """ + Testflow to validate working of Subset Collaborators in Federated Flow. + """ + + def __init__(self, random_ints=[], **kwargs) -> None: + """ + Initialize the SubsetFlow class. + + Args: + random_ints (list, optional): A list of random integers. Defaults to an empty list. + **kwargs: Additional keyword arguments passed to the superclass initializer. + + Returns: + None + """ + super().__init__(**kwargs) + self.random_ints = random_ints + + @aggregator + def start(self): + """ + Starting the flow with random subset of collaborators + """ + log.info("Testing FederatedFlow - Starting Test for validating Subset of collaborators") + self.collaborators = self.runtime.collaborators + + # select subset of collaborators + self.subset_collaborators = self.collaborators[: random.choice(self.random_ints)] + + log.info(f"... Executing flow for {len(self.subset_collaborators)} collaborators out of Total: {len(self.collaborators)}") + + self.next(self.test_valid_collaborators, foreach="subset_collaborators") + + @collaborator + def test_valid_collaborators(self): + """ + set the collaborator name + """ + log.info(f"Print collaborators {self.name}") + self.collaborator_ran = self.name + self.next(self.join) + + @aggregator + def join(self, inputs): + """ + List of collaborators ran successfully + """ + log.info("inside join") + self.collaborators_ran = [i.collaborator_ran for i in inputs] + self.next(self.end) + + @aggregator + def end(self): + """ + End of the flow + """ + log.info(f"End of the test case {TestFlowSubsetCollaborators.__name__} reached.")