From 29b0e219b9d38d8dc50cc437dfdd646904b33bd7 Mon Sep 17 00:00:00 2001 From: Shailesh Pant Date: Thu, 9 Jan 2025 20:03:59 +0530 Subject: [PATCH] - add --selected_task_group option to aggregator cli, default to "learning" - enhance Aggregator to take selected_task_group attribute to enable fedeval or learning switching at aggregator level - rebase 10.Jan.2 - fix aggregator cli test cases as per new "selected_task_group" field in start - changed default assigner task_group name to "learning" and "evaluation" - updated worspaces to use new task_group names - learning / evaluation - updated as per review comments - update the FedEval documentation with e2e usage steps - Rebased 15-Jan.1 - Fixed docs indentation issue Signed-off-by: Shailesh Pant --- docs/about/features_index/fed_eval.rst | 837 +++++++++++++++++- .../torch_cnn_mnist/plan/plan.yaml | 2 +- .../workspace/plan/defaults/assigner.yaml | 2 +- .../federated-evaluation/assigner.yaml | 2 +- openfl/component/aggregator/aggregator.py | 10 +- openfl/interface/aggregator.py | 49 +- tests/openfl/interface/test_aggregator_api.py | 44 +- 7 files changed, 913 insertions(+), 33 deletions(-) diff --git a/docs/about/features_index/fed_eval.rst b/docs/about/features_index/fed_eval.rst index e35c0f5afa4..1b9ecbf0e05 100644 --- a/docs/about/features_index/fed_eval.rst +++ b/docs/about/features_index/fed_eval.rst @@ -24,34 +24,837 @@ In general pipeline is as follows: Example Using the Task Runner API (Aggregator-based Workflow) -------------------------------------------------------------- -To demonstrate usage of the task runner API (aggregator-based workflow) for federated evaluation, consider the `Hello Federation example `_. This sample script creates a simple federation with two collaborator nodes and one aggregator node, and executes based on a user specified workspace template. We provide a ``torch_cnn_mnist_fed_eval`` template, which is a federated evaluation template adapted from ``torch_cnn_mnist``. +The following steps can be leveraged to achieve practical e2e usage of FedEval -This script can be directly executed as follows: +*N.B*: We will be using torch_cnn_mnist plan itself for both training and with some minor changes for evaluation as well +*Prerequisites*: Please ensure that latest OpenFL version>1.6 is installed, you can also choose to install from source. +With latest OpenFL version>1.6 aggregator start command is enhanced to have an optional argument '--task_group' which, +as the help suggest, will select the provided task_groups task to assigner for execution in the collaborator(s). Since this defaults to 'learning' +``` code-block bash +Usage: fx aggregator start [OPTIONS] -.. code-block:: shell +Start the aggregator service. - $ python test_hello_federation.py --template torch_cnn_mnist_fed_eval - -In order to adapt this template for federated evaluation, the following defaults were added for assigner, aggregator and tasks and same referenced in the ``plan.yaml``: +Args: plan (str): Path to plan config file authorized_cols (str): Path to authorized collaborators file +task_group (str): Selected task-group for assignement - defaults to 'learning' -.. literalinclude:: ../../../openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml +Options: +-p, --plan PATH Federated learning plan [plan/plan.yaml] +-c, --authorized_cols PATH Authorized collaborator list [plan/cols.yaml] +--task_group TEXT Selected task-group for assignment - defaults to learning +--help Show this message and exit. +``` -.. literalinclude:: ../../../openfl-workspace/workspace/plan/defaults/federated-evaluation/aggregator.yaml +1. **Setup** +We will use `torch_cnn_mnist`` workspace for training +Lets first configure a workspace with all necesary certificates +```code-block bash +fx workspace create --prefix ./cnn_train_eval --template torch_cnn_mnist +cd cnn_train_eval +fx workspace certify +fx aggregator generate-cert-request +fx aggregator certify --silent +``` -.. literalinclude:: ../../../openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml +Succesful run of this will show in console both the FL plan details and confirmation of certificates generations +```code-block bash +05:22:23] INFO Loading DEFAULTS for section network from file + plan.py:147 + /home/devvm/src/clean/openfl/venv/lib/python3.10/site-packages/openfl-workspace/workspace/plan/defaults/network.yaml. + INFO FL-Plan hash is 560862c82b416a7e88d7209aeacf99cd91f670d085947b5f6f4aae71e7dd61d1847a100895699c849feab8d0e0829103 plan.py:288 +[05:22:24] INFO FL-Plan hash is 4cdde92af05e1f7a80e10ec05d9c10f5cfb84e1b0aa4f81dfb4979d43cab0dc130ca52e7f4fe6f6a34ed7c5c47d0e552 plan.py:288 + INFO Parsing Federated Learning Plan : SUCCESS : + plan.py:193 + /home/devvm/src/clean/openfl/venv/lib/python3.10/site-packages/openfl-workspace/torch_cnn_mnist/plan/plan.yaml. + INFO aggregator: + plan.py:198 + settings: -.. literalinclude:: ../../../openfl-workspace/workspace/plan/defaults/federated-evaluation/tasks_torch.yaml + best_state_path: save/best.pbuf -Key Changes for Federated Evaluation by baking in defaults for: + db_store_rounds: 2 -1. **aggregator.settings.rounds_to_train**: Set to 1 -2. **assigner**: Assign to aggregated_model_validation instead of default assignments -3. **tasks**: Set to aggregated_model_validation instead of default tasks + init_state_path: save/init.pbuf -**Optional**: modify ``src/pt_cnn.py`` to remove optimizer initialization and definition of loss function as these are not needed for evaluation + last_state_path: save/last.pbuf -This sample script will create a federation based on the `torch_cnn_mnist_fed_eval` template using the `plan.yaml` file defined above, spawning two collaborator nodes and a single aggregator node. The model will be sent to the two collaborator nodes, where each collaborator will perform model validation on its own local data. The accuracy from this model validation will then be send back to the aggregator where it will aggregated into a final accuracy metric. The federation will then be shutdown. + rounds_to_train: 2 + write_logs: false + + template: openfl.component.aggregator.Aggregator + + assigner: + + settings: + + task_groups: + + - name: learning + + percentage: 1.0 + + tasks: + + - aggregated_model_validation + + - train + + - locally_tuned_model_validation + + template: openfl.component.RandomGroupedAssigner + + collaborator: + + settings: + + db_store_rounds: 1 + + delta_updates: false + + opt_treatment: RESET + + template: openfl.component.collaborator.Collaborator + + compression_pipeline: + + settings: {} + + template: openfl.pipelines.NoCompressionPipeline + + data_loader: + + settings: + + batch_size: 64 + + collaborator_count: 2 + + template: src.dataloader.PyTorchMNISTInMemory + + network: + + settings: + + agg_addr: devvm###.com + + agg_port: 55529 + + cert_folder: cert + + client_reconnect_interval: 5 + + hash_salt: auto + + require_client_auth: true + + use_tls: true + + template: openfl.federation.Network + + task_runner: + + settings: {} + + template: src.taskrunner.TemplateTaskRunner + + tasks: + + aggregated_model_validation: + + function: validate_task + + kwargs: + + apply: global + + metrics: + + - acc + + locally_tuned_model_validation: + + function: validate_task + + kwargs: + + apply: local + + metrics: + + - acc + + settings: {} + + train: + + function: train_task + + kwargs: + + epochs: 1 + + metrics: + + - loss + + + + +New workspace directory structure: +cnn_train_eval +├── requirements.txt +├── .workspace +├── logs +├── data +├── cert +├── README.md +├── src +│ ├── __init__.py +│ ├── taskrunner.py +│ ├── cnn_model.py +│ └── dataloader.py +├── plan +│ ├── cols.yaml +│ ├── plan.yaml +│ ├── data.yaml +│ └── defaults +└── save + +6 directories, 11 files + +✔️ OK +Setting Up Certificate Authority... + +1. Create Root CA +1.1 Create Directories +1.2 Create Database +1.3 Create CA Request and Certificate +2. Create Signing Certificate +2.1 Create Directories +2.2 Create Database +2.3 Create Signing Certificate CSR +2.4 Sign Signing Certificate CSR +3 Create Certificate Chain + +Done. + +✔️ OK +Creating AGGREGATOR certificate key pair with following settings: CN=devvm###.com, SAN=DNS:devvm###.com +Writing AGGREGATOR certificate key pair to: /home/devvm/src/clean/openfl/cnn_train_eval/cert/server +The CSR Hash 3affa56ce391a084961c5f1ba634f223536173665daa6191e705e13557f36d58c844133758f804d1f85d93bfc113fd7b + +✔️ OK +The CSR Hash for file server/agg_devvm###.com.csr = 3affa56ce391a084961c5f1ba634f223536173665daa6191e705e13557f36d58c844133758f804d1f85d93bfc113fd7b +Warning: manual check of certificate hashes is bypassed in silent mode. +Signing AGGREGATOR certificate + +✔️ OK +``` + +2. Initialize the plan +```code-block bash +cd ~/src/clean/openfl/cnn_train_eval +fx plan initialize >~/plan.log 2>&1 & +tail -f ~/plan.log +``` + +This should initialize the plan with random initial weights in init.pbuf +```code-block bash +[05:58:37] INFO MNIST > X_train Shape : torch.Size([30000, 1, 28, 28]) + dataloader.py:82 + INFO MNIST > y_train Shape : torch.Size([30000]) + dataloader.py:83 + INFO MNIST > Train Samples : 30000 + dataloader.py:84 + INFO MNIST > Valid Samples : 5000 + dataloader.py:85 + INFO Building `src.taskrunner.TemplateTaskRunner` Module. + plan.py:226 + WARNING tried to remove tensor: __opt_state_needed not present in the tensor dict + split.py:94 + INFO Building `openfl.pipelines.NoCompressionPipeline` Module. + plan.py:226 + WARNING tried to remove tensor: __opt_state_needed not present in the tensor dict + split.py:94 + WARNING Following parameters omitted from global initial model, local initialization will determine values: [] plan.py:186 + INFO Creating Initial Weights File 🠆 save/init.pbuf + plan.py:196 + INFO FL-Plan hash is 7f1ccc745b48a5311d61fb5c0588587f08f27880c92e1a4d5a04ee54282908f19af9d76d9e23e08eb2909bd3c3a27d10 plan.py:288 + INFO ['plan_7f1ccc74'] + plan.py:223 + +✔️ OK +``` + +3. Next run the 'learning' federation with two collaborators +```code-block bash +## Create two collaborators +cd ~/src/clean/openfl/cnn_train_eval +fx collaborator create -n collaborator1 -d 1 +fx collaborator generate-cert-request -n collaborator1 +fx collaborator certify -n collaborator1 --silent +fx collaborator create -n collaborator2 -d 2 +fx collaborator generate-cert-request -n collaborator2 +fx collaborator certify -n collaborator2 --silent + +## start the fedeval federation +fx aggregator start > ~/fx_aggregator.log 2>&1 & +fx collaborator start -n collaborator1 > ~/collab1.log 2>&1 & +fx collaborator start -n collaborator2 > ~/collab2.log 2>&1 & +cd ~ +tail -f plan.log fx_aggregator.log collab1.log collab2.log +``` + +This script will run two collaborator and start the aggregator with default --task_group 'learning' + +The same is defined in the assigner section of the plan which comes from the defaults itself +```code-block bash + assigner: + + settings: + + task_groups: + + - name: learning + + percentage: 1.0 + + tasks: + + - aggregated_model_validation + + - train + + - locally_tuned_model_validation +``` + +This will run the 2 rounds of training across the collaborators +```code-block bash +[06:08:14] INFO MNIST > X_train Shape : torch.Size([29999, 1, 28, 28]) + dataloader.py:82 + INFO MNIST > y_train Shape : torch.Size([29999]) + dataloader.py:83 + INFO MNIST > Train Samples : 29999 + dataloader.py:84 + INFO MNIST > Valid Samples : 4999 + dataloader.py:85 + INFO Building `src.taskrunner.TemplateTaskRunner` Module. + plan.py:226 + WARNING tried to remove tensor: __opt_state_needed not present in the tensor dict + split.py:94 + INFO Building `openfl.pipelines.NoCompressionPipeline` Module. + plan.py:226 + INFO Building `openfl.component.collaborator.Collaborator` Module. + plan.py:226 + INFO Waiting for tasks... + collaborator.py:234 + +==> fx_aggregator.log <== + INFO Sending tasks to collaborator collaborator2 for round 0 + aggregator.py:409 + +==> collab2.log <== + INFO Received Tasks: [name: "aggregated_model_validation" + collaborator.py:184 + , name: "train" + + , name: "locally_tuned_model_validation" + + ] + + +==> fx_aggregator.log <== +[06:08:17] INFO Collaborator collaborator1 is sending task results for aggregated_model_validation, round 0 aggregator.py:629 + +==> collab1.log <== +[06:08:17] INFO Run 0 epoch of 0 round + runner_pt.py:161 + +==> fx_aggregator.log <== + INFO Collaborator collaborator2 is sending task results for aggregated_model_validation, round 0 aggregator.py:629 + +==> collab2.log <== +[06:08:17] INFO Run 0 epoch of 0 round + runner_pt.py:161 +``` + +Post the end of learning federation we can note what is the best model accuracy reported and save the best.pbuf file for next step - evaluation +```code-block bash +==> fx_aggregator.log <== +[06:09:27] INFO Collaborator collaborator1 is sending task results for train, round 1 + aggregator.py:629 +[06:09:28] INFO Collaborator collaborator1 is sending task results for locally_tuned_model_validation, round 1 aggregator.py:629 + INFO Round 1: Collaborators that have completed all tasks: ['collaborator2', 'collaborator1'] aggregator.py:1049 + INFO Round 1: saved the best model with score 0.960096 + aggregator.py:955 + INFO Saving round 1 model... + aggregator.py:994 + INFO Experiment Completed. Cleaning up... + aggregator.py:1005 + +==> collab1.log <== +[06:09:28] INFO Waiting for tasks... + collaborator.py:234 + +==> fx_aggregator.log <== + INFO Sending signal to collaborator collaborator1 to shutdown... + aggregator.py:356 + +==> collab1.log <== + INFO Received shutdown signal. Exiting... + collaborator.py:199 + +✔️ OK + +==> collab2.log <== +[06:09:36] INFO Waiting for tasks... + collaborator.py:234 + +==> fx_aggregator.log <== +[06:09:36] INFO Sending signal to collaborator collaborator2 to shutdown... + aggregator.py:356 + +==> collab2.log <== + INFO Received shutdown signal. Exiting... + collaborator.py:199 + +✔️ OK + +==> fx_aggregator.log <== + +✔️ OK + +``` + +In this case we can confirm that post the 2 rounds of training the model reported an accuracy of 0.960096 +```code-block bash +Round 1: saved the best model with score 0.960096 + aggregator.py:955 +``` + +Lets save this model (best.pbuf) for later usage +```code-block bash +cp cnn_train_eval/save/best.pbuf ~/trained_model.pbuf +devuser@devvm:~/src/clean/openfl$ +``` + +Now lets create another workspace using the same plan: +```code-block bash +FL-Plan hash is 560862c82b416a7e88d7209aeacf99cd91f670d085947b5f6f4aae71e7dd61d1847a100895699c849feab8d0e0829103 plan.py:288 + INFO FL-Plan hash is 4cdde92af05e1f7a80e10ec05d9c10f5cfb84e1b0aa4f81dfb4979d43cab0dc130ca52e7f4fe6f6a34ed7c5c47d0e552 plan.py:288 + INFO Parsing Federated Learning Plan : SUCCESS : + plan.py:193 + /home/devvm/src/clean/openfl/venv/lib/python3.10/site-packages/openfl-workspace/torch_cnn_mnist/plan/plan.yaml. + INFO aggregator: + plan.py:198 + settings: + + best_state_path: save/best.pbuf + + db_store_rounds: 2 + + init_state_path: save/init.pbuf + + last_state_path: save/last.pbuf + + rounds_to_train: 2 + + write_logs: false + + template: openfl.component.aggregator.Aggregator + + assigner: + + settings: + + task_groups: + + - name: learning + + percentage: 1.0 + + tasks: + + - aggregated_model_validation + + - train + + - locally_tuned_model_validation + + template: openfl.component.RandomGroupedAssigner + + collaborator: + + settings: + + db_store_rounds: 1 + + delta_updates: false + + opt_treatment: RESET + + template: openfl.component.collaborator.Collaborator + + compression_pipeline: + + settings: {} + + template: openfl.pipelines.NoCompressionPipeline + + data_loader: + + settings: + + batch_size: 64 + + collaborator_count: 2 + + template: src.dataloader.PyTorchMNISTInMemory + + network: + + settings: + + agg_addr: devvm###.com + + agg_port: 55529 + + cert_folder: cert + + client_reconnect_interval: 5 + + hash_salt: auto + + require_client_auth: true + + use_tls: true + + template: openfl.federation.Network + + task_runner: + + settings: {} + + template: src.taskrunner.TemplateTaskRunner + + tasks: + + aggregated_model_validation: + + function: validate_task + + kwargs: + + apply: global + + metrics: + + - acc + + locally_tuned_model_validation: + + function: validate_task + + kwargs: + + apply: local + + metrics: + + - acc + + settings: {} + + train: + + function: train_task + + kwargs: + + epochs: 1 + + metrics: + + - loss + + + + +New workspace directory structure: +cnn_eval +├── requirements.txt +├── .workspace +├── logs +├── data +├── cert +├── README.md +├── src +│ ├── __init__.py +│ ├── taskrunner.py +│ ├── cnn_model.py +│ └── dataloader.py +├── plan +│ ├── cols.yaml +│ ├── plan.yaml +│ ├── data.yaml +│ └── defaults +└── save + +6 directories, 11 files + +✔️ OK +Setting Up Certificate Authority... + +1. Create Root CA +1.1 Create Directories +1.2 Create Database +1.3 Create CA Request and Certificate +2. Create Signing Certificate +2.1 Create Directories +2.2 Create Database +2.3 Create Signing Certificate CSR +2.4 Sign Signing Certificate CSR +3 Create Certificate Chain + +Done. + +✔️ OK +Creating AGGREGATOR certificate key pair with following settings: CN=devvm###.com, SAN=DNS:devvm###.com +Writing AGGREGATOR certificate key pair to: /home/devvm/src/clean/openfl/cnn_eval/cert/server +The CSR Hash 50d8834541196c0049e1ff250de310344c7c21743b9ff02528303d9ab77423c1ac5f58b68dcd0b53e9f46ac5ae0a6498 + +✔️ OK +The CSR Hash for file server/agg_devvm###.com.csr = 50d8834541196c0049e1ff250de310344c7c21743b9ff02528303d9ab77423c1ac5f58b68dcd0b53e9f46ac5ae0a6498 +Warning: manual check of certificate hashes is bypassed in silent mode. +Signing AGGREGATOR certificate + +✔️ OK +``` + +Post this we will do plan initialize and we shall replace the init.pbuf with the previously saved best.pbuf and then re-adjust the plan +to use "evaluation" defaults. +Once all the pieces are in place we then run the aggregator in evaluation mode by supplying the --task_group as "evaluation" validating the +accuracy of the previously trained model +The updated plan post initialization with edits to make it ready for evaluation will be as follows: +```code-block bash +aggregator: +settings: + best_state_path: save/best.pbuf + db_store_rounds: 2 + init_state_path: save/init.pbuf + last_state_path: save/last.pbuf + rounds_to_train: 1 + write_logs: false +template: openfl.component.aggregator.Aggregator +assigner: +settings: + task_groups: + - name: evaluation + percentage: 1.0 + tasks: + - aggregated_model_validation +template: openfl.component.RandomGroupedAssigner +collaborator: +settings: + db_store_rounds: 1 + delta_updates: false + opt_treatment: RESET +template: openfl.component.collaborator.Collaborator +compression_pipeline: +settings: {} +template: openfl.pipelines.NoCompressionPipeline +data_loader: +settings: + batch_size: 64 + collaborator_count: 2 +template: src.dataloader.PyTorchMNISTInMemory +network: +settings: + agg_addr: devvm###.com + agg_port: 55529 + cert_folder: cert + client_reconnect_interval: 5 + hash_salt: auto + require_client_auth: true + use_tls: true +template: openfl.federation.Network +task_runner: +settings: {} +template: src.taskrunner.TemplateTaskRunner +tasks: +aggregated_model_validation: + function: validate_task + kwargs: + apply: global + metrics: + - acc +locally_tuned_model_validation: + function: validate_task + kwargs: + apply: local + metrics: + - acc +settings: {} +train: + function: train_task + kwargs: + epochs: 1 + metrics: + - loss +``` + +We have done following changes to the initialized torch_cnn_mnist plan in the new workspace: + - Set the rounds_to_train to 1 as evaluation needs just one round of federation run across the collaborators + - Removed all other training related tasks from assigner settings except "aggregated_model_validation" +Now lets replace the init.pbuf with the previously saved trained_model.pbuf +```code-block bash +ll cnn_eval/save/init.pbuf +-rw------- 1 devuser devuser 1722958 Jan 14 09:44 cnn_eval/save/init.pbuf +(venv) devuser@devvm:~/src/clean/openfl$ cp ~/trained_model.pbuf cnn_eval/save/init.pbuf +(venv) devuser@devvm:~/src/clean/openfl$ ll cnn_eval/save/init.pbuf +-rw------- 1 devuser devuser 1722974 Jan 14 09:52 cnn_eval/save/init.pbuf +(venv) devuser@devvm:~/src/clean/openfl$ +``` + +Notice the size changes in the init.pbuf as its replaced by the trained model we saved from the training run of the federation +Now finally lets run the federation and this time we will launch the aggregator with overriding the default value of --task_group + to "evaluation" +```code-block bash +## Create two collaborators +cd ~/src/clean/openfl/cnn_eval +fx collaborator create -n collaborator1 -d 1 +fx collaborator generate-cert-request -n collaborator1 +fx collaborator certify -n collaborator1 --silent +fx collaborator create -n collaborator2 -d 2 +fx collaborator generate-cert-request -n collaborator2 +fx collaborator certify -n collaborator2 --silent + +## start the fedeval federation +fx aggregator start --task_group evaluation > ~/fx_aggregator.log 2>&1 & +fx collaborator start -n collaborator1 > ~/collab1.log 2>&1 & +fx collaborator start -n collaborator2 > ~/collab2.log 2>&1 & +cd ~ +tail -f plan.log fx_aggregator.log collab1.log collab2.log +``` + +Notice the only change in fedration run steps from previous training round is the additional argument --task_group to aggregator start +Now since the aggregators' task_group is set to "evaluation" it will skip the round_number_check and use the init model supplied just for evaluation +```code-block bash + INFO Setting aggregator to assign: evaluation task_group + aggregator.py:101 + INFO 🧿 Starting the Aggregator Service. + aggregator.py:103 + INFO Building `openfl.component.RandomGroupedAssigner` Module. + plan.py:226 + INFO Building `openfl.pipelines.NoCompressionPipeline` Module. + plan.py:226 + INFO Building `openfl.component.straggler_handling_functions.CutoffTimeBasedStragglerHandling` Module. plan.py:226 + WARNING CutoffTimeBasedStragglerHandling is disabled as straggler_cutoff_time is set to np.inf. cutoff_time_based_straggler_handling.py:46 + INFO Building `openfl.component.aggregator.Aggregator` Module. + plan.py:226 + INFO Skipping round_number check for evaluation task_group + aggregator.py:215 + INFO Starting Aggregator gRPC Server + aggregator_server.py:347 +``` + +In each collaborator logs we can see that the assigned task is only the evaluation task +```code-block bash +=> collab1.log <== +[10:00:12] INFO MNIST > X_train Shape : torch.Size([30000, 1, 28, 28]) + dataloader.py:82 + INFO MNIST > y_train Shape : torch.Size([30000]) + dataloader.py:83 + INFO MNIST > Train Samples : 30000 + dataloader.py:84 + INFO MNIST > Valid Samples : 5000 + dataloader.py:85 + INFO Building `src.taskrunner.TemplateTaskRunner` Module. + plan.py:226 + WARNING tried to remove tensor: __opt_state_needed not present in the tensor dict + split.py:94 + INFO Building `openfl.pipelines.NoCompressionPipeline` Module. + plan.py:226 + INFO Building `openfl.component.collaborator.Collaborator` Module. + plan.py:226 + INFO Waiting for tasks... + collaborator.py:234 + INFO Received Tasks: [name: "aggregated_model_validation" + collaborator.py:184 + ] +==> collab2.log <== +[10:00:12] INFO MNIST > X_train Shape : torch.Size([29999, 1, 28, 28]) + dataloader.py:82 + INFO MNIST > y_train Shape : torch.Size([29999]) + dataloader.py:83 + INFO MNIST > Train Samples : 29999 + dataloader.py:84 + INFO MNIST > Valid Samples : 4999 + dataloader.py:85 + INFO Building `src.taskrunner.TemplateTaskRunner` Module. + plan.py:226 + WARNING tried to remove tensor: __opt_state_needed not present in the tensor dict + split.py:94 + INFO Building `openfl.pipelines.NoCompressionPipeline` Module. + plan.py:226 + INFO Building `openfl.component.collaborator.Collaborator` Module. + plan.py:226 + INFO Waiting for tasks... + collaborator.py:234 + INFO Received Tasks: [name: "aggregated_model_validation" + collaborator.py:184 + ] + +``` + +And post the federation run, since its only evaluation run, we get from the collaborator the accuracy of the init model which, as per successful +evaluation, is same as previously trained best models' accuracy, in our case that was 0.960096 +```code-block bash +==> fx_aggregator.log <== +[10:00:15] INFO Collaborator collaborator2 is sending task results for aggregated_model_validation, round 0 aggregator.py:629 + INFO Round 0: Collaborators that have completed all tasks: ['collaborator2'] + aggregator.py:1049 + INFO Collaborator collaborator1 is sending task results for aggregated_model_validation, round 0 aggregator.py:629 + INFO Round 0: Collaborators that have completed all tasks: ['collaborator2', 'collaborator1'] aggregator.py:1049 + INFO Round 0: saved the best model with score 0.960096 + aggregator.py:955 + INFO Saving round 0 model... + aggregator.py:994 + INFO Experiment Completed. Cleaning up... + aggregator.py:1005 + INFO Sending signal to collaborator collaborator1 to shutdown... + aggregator.py:356 + +==> collab1.log <== +[10:00:15] INFO Waiting for tasks... + collaborator.py:234 + INFO Received shutdown signal. Exiting... + collaborator.py:199 + +✔️ OK + +==> collab2.log <== +[10:00:15] INFO Waiting for tasks... + collaborator.py:234 + +==> fx_aggregator.log <== +[10:00:25] INFO Sending signal to collaborator collaborator2 to shutdown... + aggregator.py:356 + +==> collab2.log <== +[10:00:25] INFO Waiting for tasks... + collaborator.py:234 + INFO Received shutdown signal. Exiting... + collaborator.py:199 + +✔️ OK + +==> fx_aggregator.log <== + +✔️ OK +``` --- -Congratulations, you have successfully performed federated evaluation across two decentralized collaborator nodes with minor default reference changes to plan \ No newline at end of file +Congratulations, you have successfully performed federated evaluation across two decentralized collaborator nodes using the same plan with minor evaluation related changes leveraging +a previously trained OpenFL model protobuf as input \ No newline at end of file diff --git a/openfl-workspace/torch_cnn_mnist/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist/plan/plan.yaml index 7ad310a3e93..cae2fd0028b 100644 --- a/openfl-workspace/torch_cnn_mnist/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist/plan/plan.yaml @@ -13,7 +13,7 @@ aggregator: assigner: settings: task_groups: - - name: train_and_validate + - name: learning percentage: 1.0 tasks: - aggregated_model_validation diff --git a/openfl-workspace/workspace/plan/defaults/assigner.yaml b/openfl-workspace/workspace/plan/defaults/assigner.yaml index 0b7e7444759..6a5903794f4 100644 --- a/openfl-workspace/workspace/plan/defaults/assigner.yaml +++ b/openfl-workspace/workspace/plan/defaults/assigner.yaml @@ -1,7 +1,7 @@ template : openfl.component.RandomGroupedAssigner settings : task_groups : - - name : train_and_validate + - name : learning percentage : 1.0 tasks : - aggregated_model_validation diff --git a/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml b/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml index 9d583fa0c48..c660659e839 100644 --- a/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml +++ b/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml @@ -1,7 +1,7 @@ template : openfl.component.RandomGroupedAssigner settings : task_groups : - - name : validate + - name : evaluation percentage : 1.0 tasks : - aggregated_model_validation \ No newline at end of file diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 6a2da516ccf..1680c98a30f 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -1,7 +1,6 @@ # Copyright 2020-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 - """Aggregator module.""" import logging @@ -85,6 +84,7 @@ def __init__( callbacks: Optional[List] = None, persist_checkpoint=True, persistent_db_path=None, + task_group: str = "learning", ): """Initializes the Aggregator. @@ -111,7 +111,9 @@ def __init__( Defaults to 1. initial_tensor_dict (dict, optional): Initial tensor dictionary. callbacks: List of callbacks to be used during the experiment. + task_group (str, optional): Selected task_group for assignment. """ + self.task_group = task_group self.round_number = 0 self.next_model_round_number = 0 @@ -298,9 +300,13 @@ def _load_initial_tensors(self): self.model, compression_pipeline=self.compression_pipeline ) - if round_number > self.round_number: + # Check selected task_group before updating round number + if self.task_group == "evaluation": + logger.info(f"Skipping round_number check for {self.task_group} task_group") + elif round_number > self.round_number: logger.info(f"Starting training from round {round_number} of previously saved model") self.round_number = round_number + tensor_key_dict = { TensorKey(k, self.uuid, self.round_number, False, ("model",)): v for k, v in tensor_dict.items() diff --git a/openfl/interface/aggregator.py b/openfl/interface/aggregator.py index 80ce56e32e5..2b6ab7afbbc 100644 --- a/openfl/interface/aggregator.py +++ b/openfl/interface/aggregator.py @@ -1,18 +1,33 @@ # Copyright 2020-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 - """Aggregator module.""" import sys from logging import getLogger from pathlib import Path -from click import Path as ClickPath -from click import confirm, echo, group, option, pass_context, style +from click import ( + Path as ClickPath, +) +from click import ( + confirm, + echo, + group, + option, + pass_context, + style, +) from openfl.cryptography.ca import sign_certificate -from openfl.cryptography.io import get_csr_hash, read_crt, read_csr, read_key, write_crt, write_key +from openfl.cryptography.io import ( + get_csr_hash, + read_crt, + read_csr, + read_key, + write_crt, + write_key, +) from openfl.cryptography.participant import generate_csr from openfl.federated import Plan from openfl.interface.cli_helper import CERT_DIR @@ -52,9 +67,20 @@ def aggregator(context): default="plan/cols.yaml", type=ClickPath(exists=True), ) -def start_(plan, authorized_cols): - """Start the aggregator service.""" +@option( + "--task_group", + required=False, + default="learning", + help="Selected task-group for assignment - defaults to learning", +) +def start_(plan, authorized_cols, task_group): + """Start the aggregator service. + Args: + plan (str): Path to plan config file + authorized_cols (str): Path to authorized collaborators file + task_group (str): Selected task-group for assignement - defaults to 'learning' + """ if is_directory_traversal(plan): echo("Federated learning plan path is out of the openfl workspace scope.") sys.exit(1) @@ -62,14 +88,21 @@ def start_(plan, authorized_cols): echo("Authorized collaborator list file path is out of the openfl workspace scope.") sys.exit(1) - plan = Plan.parse( + # Parse plan and override mode if specified + parsed_plan = Plan.parse( plan_config_path=Path(plan).absolute(), cols_config_path=Path(authorized_cols).absolute(), ) + # Set task_group in aggregator settings + if "settings" not in parsed_plan.config["aggregator"]: + parsed_plan.config["aggregator"]["settings"] = {} + parsed_plan.config["aggregator"]["settings"]["task_group"] = task_group + logger.info(f"Setting aggregator to assign: {task_group} task_group") + logger.info("🧿 Starting the Aggregator Service.") - plan.get_server().serve() + parsed_plan.get_server().serve() @aggregator.command(name="generate-cert-request") diff --git a/tests/openfl/interface/test_aggregator_api.py b/tests/openfl/interface/test_aggregator_api.py index 14572cf8abd..79866343687 100644 --- a/tests/openfl/interface/test_aggregator_api.py +++ b/tests/openfl/interface/test_aggregator_api.py @@ -17,7 +17,19 @@ def test_aggregator_start(mock_parse): plan_config = plan_path.joinpath('plan.yaml') cols_config = plan_path.joinpath('cols.yaml') - mock_parse.return_value = mock.Mock() + # Create a mock plan with the required fields + mock_plan = mock.MagicMock() + mock_plan.__getitem__.side_effect = {'task_group': 'learning'}.get + mock_plan.get = {'task_group': 'learning'}.get + # Add the config attribute with proper nesting + mock_plan.config = { + 'aggregator': { + 'settings': { + 'task_group': 'learning' + } + } + } + mock_parse.return_value = mock_plan ret = start_(['-p', plan_config, '-c', cols_config], standalone_mode=False) @@ -32,7 +44,20 @@ def test_aggregator_start_illegal_plan(mock_parse, mock_is_directory_traversal): plan_config = plan_path.joinpath('plan.yaml') cols_config = plan_path.joinpath('cols.yaml') - mock_parse.return_value = mock.Mock() + # Create a mock plan with the required fields + mock_plan = mock.MagicMock() + mock_plan.__getitem__.side_effect = {'task_group': 'learning'}.get + mock_plan.get = {'task_group': 'learning'}.get + # Add the config attribute with proper nesting + mock_plan.config = { + 'aggregator': { + 'settings': { + 'task_group': 'learning' + } + } + } + mock_parse.return_value = mock_plan + mock_is_directory_traversal.side_effect = [True, False] with TestCase.assertRaises(test_aggregator_start_illegal_plan, SystemExit): @@ -48,7 +73,20 @@ def test_aggregator_start_illegal_cols(mock_parse, mock_is_directory_traversal): plan_config = plan_path.joinpath('plan.yaml') cols_config = plan_path.joinpath('cols.yaml') - mock_parse.return_value = mock.Mock() + # Create a mock plan with the required fields + mock_plan = mock.MagicMock() + mock_plan.__getitem__.side_effect = {'task_group': 'learning'}.get + mock_plan.get = {'task_group': 'learning'}.get + # Add the config attribute with proper nesting + mock_plan.config = { + 'aggregator': { + 'settings': { + 'task_group': 'learning' + } + } + } + mock_parse.return_value = mock_plan + mock_is_directory_traversal.side_effect = [False, True] with TestCase.assertRaises(test_aggregator_start_illegal_cols, SystemExit):