Skip to content

Commit

Permalink
Federated Runtime - Initial Implementation (#1190)
Browse files Browse the repository at this point in the history
* Introduction to FederatedRuntime

Signed-off-by: Ishant Thakare <[email protected]>

* Updated aggregator

Signed-off-by: Ishant Thakare <[email protected]>

* updated collaborator & added 101_MNIST federated_runtime tutorial

Signed-off-by: Ishant Thakare <[email protected]>

* Updated code & fixed stream_stdout

Signed-off-by: Ishant Thakare <[email protected]>

* Adding testcases for FederatedRuntime

Signed-off-by: Ishant Thakare <[email protected]>

* Fixed formatting issues

Signed-off-by: Ishant Thakare <[email protected]>

* Fix formatting issues

Signed-off-by: Ishant Thakare <[email protected]>

* Incorporated internal review comments

Signed-off-by: Ishant Thakare <[email protected]>

* Fix checkpoint issue

Signed-off-by: Ishant Thakare <[email protected]>

* Updated FederatedRuntime Tutorials

Signed-off-by: Ishant Thakare <[email protected]>

* Updated tutorial

Signed-off-by: Ishant Thakare <[email protected]>

* Incorporated Teo's review comments

Signed-off-by: Ishant Thakare <[email protected]>

* Incorporated Teo's review comments

Signed-off-by: Ishant Thakare <[email protected]>

* Incorporated review comment

Signed-off-by: Ishant Thakare <[email protected]>

* Incorporate review comment

Co-authored-by: Patrick Foley <[email protected]>

* Updated Workflow Interface documentation

Signed-off-by: Ishant Thakare <[email protected]>

* Fix certificates for federated_runtime.py

Signed-off-by: Ishant Thakare <[email protected]>

---------

Signed-off-by: Ishant Thakare <[email protected]>
Co-authored-by: Patrick Foley <[email protected]>
  • Loading branch information
ishant162 and psfoley authored Dec 17, 2024
1 parent bc5ba2b commit 5a18812
Show file tree
Hide file tree
Showing 153 changed files with 8,955 additions and 277 deletions.
201 changes: 178 additions & 23 deletions docs/about/features_index/workflowinterface.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.. # Copyright (C) 2020-2023 Intel Corporation
.. # Copyright (C) 2020-2024 Intel Corporation
.. # SPDX-License-Identifier: Apache-2.0
.. _workflow_interface:
Expand All @@ -9,7 +9,7 @@ Workflow Interface

**Important Note**

The OpenFL workflow interface is experimental, subject to change, and is currently limited to single node execution. To setup and launch a real federation, see :ref:`running_a_federation`
The OpenFL workflow interface is experimental and subject to change. For an overview of options supported to setup Federation and run FL experiments, see `Features <../features.rst>`_

What is it?
===========
Expand All @@ -23,7 +23,7 @@ A new OpenFL interface that gives significantly more flexility to researchers in
There are several modifications we make in our reimagined version of this interface that are necessary for federated learning:

1. *Placement*: Metaflow's `@step` decorator is replaced by placement decorators that specify where a task will run. In horizontal federated learning, there are server (or aggregator) and client (or collaborator) nodes. Tasks decorated by `@aggregator` will run on the aggregator node, and `@collaborator` will run on the collaborator node. These placement decorators are interpreted by *Runtime* implementations: these do the heavy lifting of figuring out how to get the state of the current task to another process or node.
2. *Runtime*: Each flow has a `.runtime` attribute. The runtime encapsulates the details of the infrastucture where the flow will run. In this experimental release, we support only a `LocalRuntime` single node implementation, but as this work matures, we will extend to a `FederatedRuntime` that implements distributed operation across remote infrastructure.
2. *Runtime*: Each flow has a `.runtime` attribute. The runtime encapsulates the details of the infrastucture where the flow will run. We support the LocalRuntime for simulating experiments on local node and FederatedRuntime to launch experiments on distributed infrastructure.
3. *Conditional branches*: Perform different tasks if a criteria is met
4. *Loops*: Internal loops are within a flow; this is necessary to support rounds of training where the same sequence of tasks is performed repeatedly.

Expand Down Expand Up @@ -142,7 +142,18 @@ The workflow interface formulates the experiment as a series of tasks, or a flow
Runtimes
========

A :code:`Runtime` defines where the flow will be executed, who the participants are in the experiment, and the private information that each participant has access to. In this experimental release, single node execution is supported using the :code:`LocalRuntime`. Let's see how a :code:`LocalRuntime` is created.
A :code:`Runtime` defines where the flow will be executed, who the participants are in the experiment, and the private information that each participant has access to. In the current experimental release:

* Single node execution is supported using the :code:`LocalRuntime`.
* Distributed node execution is supported using the :code:`FederatedRuntime`.

Let us see how :code:`LocalRuntime` and :code:`FederatedRuntime` are created.


LocalRuntime
---------------

You can simulate a Federated Learning experiment locally using :code:`LocalRuntime`, which supports single-node execution.. Let's see how a :code:`LocalRuntime` is created.

.. code-block:: python
Expand Down Expand Up @@ -214,7 +225,7 @@ In rare cases this can be a problem because certain python objects cannot be ser
Participant *private attributes* are returned by the callback function in form of a dictionary, where the key is the name of the attribute and the value is the object. In this example callback function :code:`callable_to_initialize_collaborator_private_attributes()` returns :code:`train_loader` and :code:`test_loader` in the form of a dictionary.

**Note:**If both callable and private attributes are provided, the initialization will prioritize the private attributes through the :code:`callable` function.
**Note:** If both callable and private attributes are provided, the initialization will prioritize the private attributes through the :code:`callable` function.

Some important points to remember while creating callback function and private attributes are:

Expand All @@ -241,8 +252,8 @@ Now let's see how the runtime for a flow is assigned, and the flow gets run:
And that's it! This will run an instance of the :code:`FederatedFlow` on a single node in a single process.

Runtime Backends
================
LocalRuntime Backends
---------------------

The Runtime defines where code will run, but the Runtime has a :code:`Backend` - which defines the underlying implementation of *how* the flow will be executed. :code:`single_process` is the default in the :code:`LocalRuntime`: it executes all code sequentially within a single python process, and is well suited to run both on high spec and low spec hardware

Expand Down Expand Up @@ -281,6 +292,151 @@ In the above example, we have used :code:`num_gpus=0.2` while instantiating Aggr

**Note:** It is not necessary to have ALL the participants use GPUs. For e.g. only the Collaborator are allocated to GPUs. In this scenario user should ensure that the artifacts returned by Collaborators to Aggregator (e.g. locally trained model object) should be loaded back to CPU before exiting the collaborator step (i.e. before the join step). As Tensorflow manages the object allocation by default therefore this step is needed only for Pytorch.

FederatedRuntime
----------------

The :code:`FederatedRuntime` facilitates distributed execution across long lived components (Director & Envoys) and enables Data scientists to deploy the experiment from the Jupyter notebook itself. Let’s explore the process of creating a :code:`FederatedRuntime`.

First step is to create the participants in the Federation: the Director and Envoys

**Director: The central node in the Federation**

The `fx director start` command is used to start the Director. You can run it with or without TLS, depending on your setup.

**With TLS:**
Use the following command:

.. code-block:: console
$ fx director start -c <path_to_director_config_yaml_file> -rc <root_certificate_path> -pk <private_key_path> -oc <api_certificate_path>
**Without TLS:**
Use the following command:

.. code-block:: console
$ fx director start --disable-tls -c <path_to_director_config_yaml_file>
**Explanation of Command Options**

- `-c <path_to_director_config_yaml_file>`: Path to the Director's configuration file.
- `-rc <root_certificate_path>`: Path to the root certificate (used with TLS).
- `-pk <private_key_path>`: Path to the private key file (used with TLS).
- `-oc <api_certificate_path>`: Path to the API certificate file (used with TLS).
- `--disable-tls`: Disables TLS encryption.

**Configuration File**
The Director requires a configuration file in YAML format. This file contains essential settings such as:

- Hostname (`listen_host`)
- Port (`listen_port`)
- Envoy health check period (`envoy_health_check_period`)
- Private attributes for the aggregator

An example configuration file `director_config.yaml` is shown below:

.. code-block:: yaml
settings:
listen_host: localhost
listen_port: 50050
envoy_health_check_period: 5
aggregator:
private_attributes: private_attributes.aggregator_attrs
**Envoy: Participating nodes in the Federation**

The `fx envoy start` command is used to start the Envoy. You can run it with or without TLS, depending on your setup.

**With TLS:**
Use the following command:

.. code-block:: console
$ fx envoy start -n <envoy_name> -ec <path_to_envoy_config_yaml_file> -dh <director_host> -dp <director_port> -rc <root_certificate_path> -pk <private_key_path> -oc <api_certificate_path>
**Without TLS:**
Use the following command:

.. code-block:: console
$ fx envoy start -n <envoy_name> --disable-tls -ec <path_to_envoy_config_yaml_file>
**Explanation of Command Options**

- `-n <envoy_name>`: Specifies the name of the Envoy.
- `-ec <path_to_envoy_config_yaml_file>`: Path to the Envoy's configuration file.
- `-dh <director_host>`: Hostname or IP address of the Director.
- `-dp <director_port>`: Port on which the Director is running.
- `-rc <root_certificate_path>`: Path to the root certificate (used with TLS).
- `-pk <private_key_path>`: Path to the private key file (used with TLS).
- `-oc <api_certificate_path>`: Path to the API certificate file (used with TLS).
- `--disable-tls`: Disables TLS encryption.

The Envoy configuration file includes details about the private attributes. An example configuration file `envoy_config.yaml` for `envoy_one` is shown below:

.. code-block:: yaml
envoy_one:
private_attributes: private_attributes.envoy_one_attrs
**Note**: Private attributes for both the Director and Envoy can be configured in two ways, similar to :code:`LocalRuntime`. If both callable and private attributes are provided, the initialization process will prioritize the private attributes through the callable function.

Now we proceed to instantiate the :code:`FederatedRuntime` to facilitate the deployment of the experiment on a distributed infrastructure. To initialize the :code:`FederatedRuntime`, the following inputs are required:

1. **director_info**

Details about the Director, including:

- Fully Qualified Domain Name (FQDN) of the Director node.
- Port number on which the Director is listening.
- (Optional) Certificate information for TLS:

- `cert_chain`: Path to the certificate chain.
- `api_cert`: Path to the API certificate.
- `api_private_key`: Path to the API private key.

2. **collaborators**

A list of collaborators participating in the federation.
Only Envoys hosting these collaborators will receive the experiment details from the Director.

3. **notebook_path**

File path to the Jupyter notebook defining the experiment logic.

Below is an example of how to set up and instantiate a `FederatedRuntime`:

.. code-block:: python
# Define director information (TLS disabled)
director_info = {
'director_node_fqdn':'localhost',
'director_port':50050,
'cert_chain': None,
'api_cert': None,
'api_private_key': None,
}
# Instantiate the FederatedRuntime
federated_runtime = FederatedRuntime(
collaborators=collaborator_names,
director=director_info,
notebook_path=<path_to_jupyter_notebook>,
tls=False
)
To distribute the experiment on the Federation, we now need to assign the federated_runtime to the flow and execute it.

.. code-block:: python
flow = FederatedFlow()
flow.runtime = federated_runtime
flow.run()
This will export the Jupyter notebook to an workspace and deploy it to the federation. The Director receives the experiment, distributes it to the Envoys, and initiates the execution of the experiment.

Debugging with the Metaflow Client
==================================

Expand All @@ -293,6 +449,8 @@ Capturing this information requires just a one line change to the Flow object in
.. code-block:: python
flow = FederatedFlow(..., checkpoint=True)
**LocalRuntime**

After the flow has started running, you can use the Metaflow Client to get intermediate information from any of the participants tasks:

Expand Down Expand Up @@ -390,22 +548,19 @@ Also, If we wanted to get the best model and the last model, you can just run:
torch.save(last_model.state_dict(), PATH)
torch.save(best_model.state_dict(), PATH)
While this information is useful for debugging, depending on your workflow it may require significant disk space. For this reason, `checkpoint` is disabled by default.
**FederatedRuntime**

Runtimes: Future Plans
======================
In a distributed environment consisting of Director, Envoys and User Node (where the experiment is launched), the following debugging support is available:

Our goal is to make it a one line change to configure where and how a flow is executed. While we only support single node execution with the :code:`LocalRuntime` today, our aim in future releases is to make going from one to multiple nodes as easy as:
1. **Director Node**: If checkpointing is enabled, Metaflow client can be launched on Director and same steps outlined for :code:`LocalRuntime` can be followed.
2. **User Node**: The stdout and stderr logs are printed directly in the Jupyter notebook.

.. code-block:: python
flow = FederatedFlow()
# Run on a single node first
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)
flow.runtime = local_runtime
flow.run()
# A future example of how the same flow could be run on distributed infrastructure
federated_runtime = FederatedRuntime(...)
flow.runtime = federated_runtime
flow.run()
**IMPORTANT**: While this information is useful for debugging, depending on your workflow it may require significant disk space. For this reason, checkpoint is disabled by default.

Future Plans
==============
Following functionalities are planned for inclusion in future releases of the Workflow Interface:

1. **Pre-trained Model Integration**: Enable the capability to pass a pre-trained model to FederatedFlow.
2. **Plan Review Mechanism**: Enable the capability for Director and Envoy admin to review submitted plans and either accept / reject them.
3. **Straggler Handling**: Implement mechanisms to manage and mitigate the impact of stragglers during federated experiments.
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "openfl-wip",
"display_name": "fed_run",
"language": "python",
"name": "python3"
},
Expand All @@ -1079,7 +1079,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.19"
"version": "3.10.15"
}
},
"nbformat": 4,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Portland:
private_attributes: private_attributes.portland_attrs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (C) 2020-2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from copy import deepcopy

import torch
import torchvision

# Download Train and Test datasets
mnist_train = torchvision.datasets.MNIST(
"../files/",
train=True,
download=True,
transform=torchvision.transforms.Compose(
[
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize((0.1307,), (0.3081,)),
]
),
)

mnist_test = torchvision.datasets.MNIST(
"../files/",
train=False,
download=True,
transform=torchvision.transforms.Compose(
[
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize((0.1307,), (0.3081,)),
]
),
)

# shard the dataset according to collaborator index
portland_col_idx = 0
n_collaborators = 2
batch_size = 32

train = deepcopy(mnist_train)
test = deepcopy(mnist_test)

train.data = mnist_train.data[portland_col_idx::n_collaborators]
train.targets = mnist_train.targets[portland_col_idx::n_collaborators]
test.data = mnist_test.data[portland_col_idx::n_collaborators]
test.targets = mnist_test.targets[portland_col_idx::n_collaborators]

portland_attrs = {
"train_loader": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=False),
"test_loader": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=False),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash
set -e
ENVOY_NAME=$1
ENVOY_CONF=$2

fx envoy start -n "$ENVOY_NAME" --disable-tls --envoy-config-path "$ENVOY_CONF" -dh localhost -dp 50050
Loading

0 comments on commit 5a18812

Please sign in to comment.