Skip to content

Commit

Permalink
Merge branch 'securefederatedai:develop' into Envoy-CLI-and-Config-cl…
Browse files Browse the repository at this point in the history
…eanup
  • Loading branch information
refai06 authored Jan 22, 2025
2 parents b6822ec + cafc558 commit 203d417
Show file tree
Hide file tree
Showing 15 changed files with 221 additions and 60 deletions.
1 change: 0 additions & 1 deletion .github/workflows/federated_runtime.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#---------------------------------------------------------------------------
# Workflow to run 301_MNIST_Watermarking notebook
# Authors - Noopur, Payal Chaurasiya
#---------------------------------------------------------------------------
name: Federated Runtime 301 MNIST Watermarking

Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/task_runner_basic_e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ jobs:
timeout-minutes: 30
strategy:
matrix:
# There are open issues for some of the models, so excluding them for now:
# model_name: [ "torch_cnn_mnist", "keras_cnn_mnist", "torch_cnn_histology" ]
# Models like XGBoost (xgb_higgs) and torch_cnn_histology require runners with higher memory and CPU to run.
# Thus these models are excluded from the matrix for now.
model_name: ["torch_cnn_mnist", "keras_cnn_mnist"]
python_version: ["3.10", "3.11", "3.12"]
fail-fast: false # do not immediately fail if one of the combinations fail
Expand Down Expand Up @@ -77,7 +77,7 @@ jobs:
timeout-minutes: 30
strategy:
matrix:
# Testing non TLS scenario only for torch_cnn_mnist model and python 3.10
# Testing this scenario only for torch_cnn_mnist model and python 3.10
# If required, this can be extended to other models and python versions
model_name: ["torch_cnn_mnist"]
python_version: ["3.10"]
Expand Down Expand Up @@ -120,7 +120,7 @@ jobs:
timeout-minutes: 30
strategy:
matrix:
# Testing non TLS scenario only for torch_cnn_mnist model and python 3.10
# Testing this scenario for keras_cnn_mnist model and python 3.10
# If required, this can be extended to other models and python versions
model_name: ["keras_cnn_mnist"]
python_version: ["3.10"]
Expand Down Expand Up @@ -163,7 +163,7 @@ jobs:
timeout-minutes: 30
strategy:
matrix:
# Testing non TLS scenario only for torch_cnn_mnist model and python 3.10
# Testing this scenario only for torch_cnn_mnist model and python 3.10
# If required, this can be extended to other models and python versions
model_name: ["torch_cnn_mnist"]
python_version: ["3.10"]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/task_runner_dockerized_ws_e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
strategy:
matrix:
model_name: ["keras_cnn_mnist"]
python_version: ["3.9", "3.10", "3.11"]
python_version: ["3.10", "3.11", "3.12"]
fail-fast: false # do not immediately fail if one of the combinations fail

env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def __prune_poisoned_models(num_layers, total_number_of_clients, own_client_inde

ac_e = AgglomerativeClustering(n_clusters=2, distance_threshold=None,
compute_full_tree=True,
affinity="euclidean", memory=None,
metric="euclidean", memory=None,
connectivity=None,
linkage='single',
compute_distances=True).fit(cluster_input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,8 @@
" state_dicts = [model.state_dict() for model in models]\n",
" state_dict = new_model.state_dict()\n",
" for key in models[1].state_dict():\n",
" state_dict[key] = np.sum(\n",
" [state[key] for state in state_dicts], axis=0\n",
" ) / len(models)\n",
" state_dict[key] = torch.from_numpy(\n",
" np.average([state[key].numpy() for state in state_dicts], axis=0))\n",
" new_model.load_state_dict(state_dict)\n",
" return new_model\n",
"\n",
Expand Down Expand Up @@ -558,8 +557,7 @@
" exclude=[\"private\"],\n",
" )\n",
"\n",
" # @collaborator # Uncomment if you want ro run on CPU\n",
" @collaborator(num_gpus=1) # Assuming GPU(s) is available on the machine\n",
" @collaborator\n",
" def train(self):\n",
" self.collaborator_name = self.input\n",
" print(20 * \"#\")\n",
Expand Down Expand Up @@ -669,7 +667,7 @@
"\n",
" ac_e = AgglomerativeClustering(n_clusters=2, distance_threshold=None,\n",
" compute_full_tree=True,\n",
" affinity=\"euclidean\", memory=None, connectivity=None,\n",
" metric=\"euclidean\", memory=None, connectivity=None,\n",
" linkage='single',\n",
" compute_distances=True).fit(binary_votes)\n",
" ac_e_labels: list = ac_e.labels_.tolist()\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,8 @@ def FedAvg(models): # NOQA: N802
state_dicts = [model.state_dict() for model in models]
state_dict = new_model.state_dict()
for key in models[1].state_dict():
state_dict[key] = np.sum(
[state[key] for state in state_dicts], axis=0
) / len(models)
state_dict[key] = torch.from_numpy(
np.average([state[key].numpy() for state in state_dicts], axis=0))
new_model.load_state_dict(state_dict)
return new_model

Expand Down Expand Up @@ -316,8 +315,7 @@ def start(self):
exclude=["private"],
)

# @collaborator # Uncomment if you want ro run on CPU
@collaborator(num_gpus=1) # Assuming GPU(s) is available on the machine
@collaborator
def train(self):
self.collaborator_name = self.input
print(20 * "#")
Expand Down Expand Up @@ -428,7 +426,7 @@ def defend(self, inputs):

ac_e = AgglomerativeClustering(n_clusters=2, distance_threshold=None,
compute_full_tree=True,
affinity="euclidean", memory=None, connectivity=None,
metric="euclidean", memory=None, connectivity=None,
linkage='single',
compute_distances=True).fit(binary_votes)
ac_e_labels: list = ac_e.labels_.tolist()
Expand Down
27 changes: 26 additions & 1 deletion openfl-tutorials/experimental/workflow/CrowdGuard/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,29 @@ We implemented a simple scaling-based poisoning attack to demonstrate the effect

For the local validation in CrowdGuard, each client uses its local dataset to obtain the hidden layer outputs for each local model. Then it calculates the Euclidean and Cosine Distance, before applying a PCA. Based on the first principal component, CrowdGuard employs several statistical tests to determine whether poisoned models remain and removes the poisoned models using clustering. This process is repeated until no more poisoned models are detected before sending the detected poisoned models to the server. On the server side, the votes of the individual clients are aggregated using a stacked-clustering scheme to prevent malicious clients from manipulating the aggregation process through manipulated votes. The client-side validation as well as the server-side operations, are executed with SGX to prevent privacy attacks.

[1] Rieger, P., Krauß, T., Miettinen, M., Dmitrienko, A., & Sadeghi, A. R. CrowdGuard: Federated Backdoor Detection in Federated Learning. NDSS 2024.
[1] Rieger, P., Krauß, T., Miettinen, M., Dmitrienko, A., & Sadeghi, A. R. CrowdGuard: Federated Backdoor Detection in Federated Learning. NDSS 2024.

## Running the CIFAR-10 demo script
The demo script requires a dedicated allocation of at least 18GB of RAM to run without issues.

1) Create a Python virtual environment for better isolation
```shell
python -m venv venv
source venv/bin/activate
```
2) Install OpenFL from the latest sources
```shell
git clone https://github.com/securefederatedai/openfl.git && cd openfl
pip install -e .
```
3) Install the requirements for Workflow API
```shell
cd openfl-tutorials/experimental/workflow
pip install -r workflow_interface_requirements.txt
```
4) Start the training script<br/>
Note that the number of training rounds can be adjusted via the `--comm_round` parameter:
```shell
cd CrowdGuard
python cifar10_crowdguard.py --comm_round 5
```
3 changes: 2 additions & 1 deletion openfl/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ def _save_model(self, round_number, file_path):
]
tensor_dict = {}
tensor_tuple_dict = {}
next_round_tensors = {}
for tk in tensor_keys:
tk_name, _, _, _, _ = tk
tensor_value = self.tensor_db.get_tensor_from_cache(tk)
Expand All @@ -373,7 +374,7 @@ def _save_model(self, round_number, file_path):
self.next_model_round_number, ("model",)
)
self.persistent_db.finalize_round(
tensor_tuple_dict, next_round_tensors, self.round_number, self.best_model_score
tensor_tuple_dict, next_round_tensors, round_number, self.best_model_score
)
logger.info(
"Persist model and clean task result for round %s",
Expand Down
6 changes: 6 additions & 0 deletions tests/end_to_end/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from tests.end_to_end.utils.logger import configure_logging
from tests.end_to_end.utils.logger import logger as log
from tests.end_to_end.utils.conftest_helper import parse_arguments
import tests.end_to_end.utils.docker_helper as dh


def pytest_addoption(parser):
Expand Down Expand Up @@ -192,6 +193,11 @@ def pytest_sessionfinish(session, exitstatus):
shutil.rmtree(cache_dir, ignore_errors=False)
log.debug(f"Cleared .pytest_cache directory at {cache_dir}")

# Cleanup docker containers related to aggregator and collaborators, if any.
dh.cleanup_docker_containers(list_of_containers=["aggregator", "collaborator*"])
# Cleanup docker network created for openfl, if any.
dh.remove_docker_network(["openfl"])


def pytest_configure(config):
"""
Expand Down
22 changes: 21 additions & 1 deletion tests/end_to_end/models/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import logging

import tests.end_to_end.utils.docker_helper as dh
import tests.end_to_end.utils.exceptions as ex
import tests.end_to_end.utils.federation_helper as fh


log = logging.getLogger(__name__)


Expand Down Expand Up @@ -205,3 +205,23 @@ def import_workspace(self):
except Exception as e:
log.error(f"{error_msg}: {e}")
raise e

def modify_data_file(self, data_file, index):
"""
Modify the data.yaml file for the model
Args:
data_file (str): Path to the data file including the file name
Returns:
bool: True if successful, else False
"""
try:
log.info("Data setup completed successfully. Modifying the data.yaml file..")

with open(data_file, "w") as file:
file.write(f"{self.collaborator_name},data/{index}")

except Exception as e:
log.error(f"Failed to modify the data file: {e}")
raise ex.DataSetupException(f"Failed to modify the data file: {e}")

return True
5 changes: 5 additions & 0 deletions tests/end_to_end/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ModelName(Enum):
TORCH_CNN_MNIST = "torch_cnn_mnist"
KERAS_CNN_MNIST = "keras_cnn_mnist"
TORCH_CNN_HISTOLOGY = "torch_cnn_histology"
XGB_HIGGS = "xgb_higgs"

NUM_COLLABORATORS = 2
NUM_ROUNDS = 5
Expand All @@ -31,6 +32,10 @@ class ModelName(Enum):
AGG_PLAN_PATH = "{}/aggregator/workspace/plan" # example - /tmp/my_federation/aggregator/workspace/plan
COL_PLAN_PATH = "{}/{}/workspace/plan" # example - /tmp/my_federation/collaborator1/workspace/plan

COL_DATA_FILE = "{}/{}/workspace/plan/data.yaml" # example - /tmp/my_federation/collaborator1/workspace/plan/data.yaml

DATA_SETUP_FILE = "setup_data.py" # currently xgb_higgs is using this file to setup data

AGG_COL_RESULT_FILE = "{0}/{1}/workspace/{1}.log" # example - /tmp/my_federation/aggregator/workspace/aggregator.log

AGG_WORKSPACE_ZIP_NAME = "workspace.zip"
Expand Down
53 changes: 29 additions & 24 deletions tests/end_to_end/utils/docker_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,40 @@
log = logging.getLogger(__name__)


def remove_docker_network():
def remove_docker_network(list_of_networks=[constants.DOCKER_NETWORK_NAME]):
"""
Remove docker network.
Args:
list_of_networks (list): List of network names to remove.
"""
client = get_docker_client()
networks = client.networks.list(names=[constants.DOCKER_NETWORK_NAME])
networks = client.networks.list(names=list_of_networks)
if not networks:
log.debug(f"Network {constants.DOCKER_NETWORK_NAME} does not exist")
log.debug(f"Network(s) {list_of_networks} does not exist")
return

for network in networks:
log.debug(f"Removing network: {network.name}")
network.remove()
log.debug("Docker network removed successfully")
log.debug(f"Docker network(s) {list_of_networks} removed successfully")


def create_docker_network():
def create_docker_network(list_of_networks=[constants.DOCKER_NETWORK_NAME]):
"""
Create docker network.
Args:
list_of_networks (list): List of network names to create.
"""
client = get_docker_client()
networks = client.networks.list(names=[constants.DOCKER_NETWORK_NAME])
networks = client.networks.list(names=list_of_networks)
if networks:
log.info(f"Network {constants.DOCKER_NETWORK_NAME} already exists")
log.info(f"Network(s) {list_of_networks} already exists")
return

log.debug(f"Creating network: {constants.DOCKER_NETWORK_NAME}")
network = client.networks.create(constants.DOCKER_NETWORK_NAME)
log.info(f"Network {network.name} created successfully")
for network_name in list_of_networks:
log.debug(f"Creating network: {network_name}")
_ = client.networks.create(network_name)
log.info(f"Docker network(s) {list_of_networks} created successfully")


def check_docker_image():
Expand Down Expand Up @@ -143,24 +148,24 @@ def get_docker_client():
return client


def cleanup_docker_containers():
def cleanup_docker_containers(list_of_containers=["aggregator", "collaborator*"]):
"""
Cleanup the docker containers meant for openfl.
Args:
list_of_containers: List of container names to cleanup.
"""
log.debug("Cleaning up docker containers")

client = get_docker_client()

# List all containers related to openfl
agg_containers = client.containers.list(all=True, filters={"name": "aggregator"})
col_containers = client.containers.list(all=True, filters={"name": "collaborator*"})
containers = agg_containers + col_containers
container_names = []
# Stop and remove all containers
for container in containers:
container.stop()
container.remove()
container_names.append(container.name)

if containers:
log.info(f"Docker containers {container_names} cleaned up successfully")
for container_name in list_of_containers:
containers = client.containers.list(all=True, filters={"name": container_name})
container_names = []
# Stop and remove all containers
for container in containers:
container.stop()
container.remove()
container_names.append(container.name)

if containers:
log.info(f"Docker containers {container_names} cleaned up successfully")
5 changes: 5 additions & 0 deletions tests/end_to_end/utils/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,8 @@ class EnvoyStartException(Exception):
class DirectorStartException(Exception):
"""Exception for director start"""
pass


class DataSetupException(Exception):
"""Exception for data setup for given model"""
pass
Loading

0 comments on commit 203d417

Please sign in to comment.