Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update collective ops docs #5237

Merged
merged 12 commits into from
Nov 11, 2022
Merged
59 changes: 45 additions & 14 deletions docs/api/paddle/distributed/Overview_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ paddle.distributed 目录包含的 API 支撑飞桨框架大规模分布式训
- :ref:`环境配置和训练启动管理 <02>`
- :ref:`数据加载 <03>`
- :ref:`集合通信算法 API <04>`
- :ref:`RPC API <05>`
- :ref:`Stream 集合通信高级 API <05>`
- :ref:`RPC API <06>`

.. _01:

Fleet 分布式高层 API
::::::::::::::::::::::::::

paddle.distributed.fleet 是分布式训练的统一入口 API,用于配置分布式训练。
``paddle.distributed.fleet`` 是分布式训练的统一入口 API,用于配置分布式训练。

.. csv-table::
:header: "API 名称", "API 功能"
Expand Down Expand Up @@ -53,6 +54,8 @@ paddle.distributed.fleet 是分布式训练的统一入口 API,用于配置分
" :ref:`spawn <cn_api_distributed_spawn>` ", "启动分布式训练进程,仅支持集合通信架构"
" :ref:`get_rank <cn_api_distributed_get_rank>` ", "获取当前进程的 rank 值"
" :ref:`get_world_size <cn_api_distributed_get_world_size>` ", "获取当前进程数"
" :ref:`new_group <cn_api_distributed_new_group>` ", "创建分布式通信组"
" :ref:`destroy_process_group <cn_api_distributed_destroy_process_group>` ", "销毁分布式通信组"

.. _03:

Expand All @@ -69,28 +72,56 @@ paddle.distributed.fleet 是分布式训练的统一入口 API,用于配置分

.. _04:

集合通信算法 API
集合通信 API
::::::::::::::::::::::

在集群上,对多设备的进程组的参数数据 tensor 或 object 进行计算处理。
在集群上,对多设备的进程组的参数数据 tensor 或 object 进行计算处理,包括规约、聚合、广播、分发等

.. csv-table::
:header: "API 名称", "API 功能"
:widths: 20, 50


" :ref:`reduce <cn_api_distributed_reduce>` ", "规约,规约进程组内的 tensor,返回结果至指定进程"
" :ref:`ReduceOP <cn_api_distributed_ReduceOp>` ", "规约,指定逐元素规约操作"
" :ref:`all_reduce <cn_api_distributed_all_reduce>` ", "组规约,规约进程组内的 tensor,结果广播至每个进程"
" :ref:`all_gather <cn_api_distributed_all_gather>` ", "组聚合,聚合进程组内的 tensor,结果广播至每个进程"
" :ref:`all_gather_object <cn_api_distributed_all_gather_object>` ", "组聚合,聚合进程组内的 object,结果广播至每个进程"
" :ref:`broadcast <cn_api_distributed_broadcast>` ", "广播一个 tensor 到每个进程"
" :ref:`scatter <cn_api_distributed_scatter>` ", "分发 tensor 到每个进程"
" :ref:`split <cn_api_distributed_split>` ", "切分参数到多个设备"
" :ref:`barrier <cn_api_distributed_barrier>` ", "同步路障,进行阻塞操作,实现组内所有进程的同步"
" :ref:`ReduceOp <cn_api_distributed_ReduceOp>` ", "规约操作的类型"
" :ref:`reduce <cn_api_distributed_reduce>` ", "规约进程组内的 tensor,随后将结果发送到指定进程"
" :ref:`all_reduce <cn_api_distributed_all_reduce>` ", "规约进程组内的 tensor,随后将结果发送到每个进程"
" :ref:`all_gather <cn_api_distributed_all_gather>` ", "聚合进程组内的 tensor,随后将结果发送到每个进程"
" :ref:`all_gather_object <cn_api_distributed_all_gather_object>` ", "聚合进程组内的 object,随后将结果发送到每个进程"
" :ref:`alltoall <cn_api_distributed_alltoall>` ", "将一组 tensor 分发到每个进程并进行聚合"
" :ref:`alltoall_single <cn_api_distributed_alltoall_single>` ", "将一个 tensor 分发到每个进程并进行聚合"
" :ref:`broadcast <cn_api_distributed_broadcast>` ", "将一个 tensor 发送到每个进程"
" :ref:`scatter <cn_api_distributed_scatter>` ", "将一组 tensor 分发到每个进程"
" :ref:`reduce_scatter <cn_api_distributed_reduce_scatter>` ", "规约一组 tensor,随后将规约结果分发到每个进程"
" :ref:`isend <cn_api_distributed_isend>` ", "异步发送一个 tensor 到指定进程"
" :ref:`irecv <cn_api_distributed_irecv>` ", "异步接收一个来自指定进程的 tensor"
" :ref:`send <cn_api_distributed_send>` ", "发送一个 tensor 到指定进程"
" :ref:`recv <cn_api_distributed_recv>` ", "接收一个来自指定进程的 tensor"
" :ref:`barrier <cn_api_distributed_barrier>` ", "同步路障,阻塞操作以实现组内进程同步"

.. _05:

Stream 集合通信高级 API
::::::::::::::::::::::

``paddle.distributed.stream`` 在集合通信 API 的基础上,提供更统一的语义和对计算流的更精细的控制能力,有助于在特定场景下提高性能。

.. csv-table::
:header: "API 名称", "API 功能"
:widths: 25, 50


" :ref:`stream.reduce <cn_api_distributed_stream_reduce>` ", "规约进程组内的 tensor,随后将结果发送到指定进程"
" :ref:`stream.all_reduce <cn_api_distributed_stream_all_reduce>` ", "规约进程组内的 tensor,随后将结果发送到每个进程"
" :ref:`stream.all_gather <cn_api_distributed_stream_all_gather>` ", "聚合进程组内的 tensor,随后将结果发送到每个进程"
" :ref:`stream.alltoall <cn_api_distributed_stream_alltoall>` ", "分发一组 tensor 到每个进程并进行聚合"
" :ref:`stream.alltoall_single <cn_api_distributed_stream_alltoall_single>` ", "分发一个 tensor 到每个进程并进行聚合"
" :ref:`stream.broadcast <cn_api_distributed_stream_broadcast>` ", "发送一个 tensor 到每个进程"
" :ref:`stream.scatter <cn_api_distributed_stream_scatter>` ", "分发一个 tensor 到每个进程"
" :ref:`stream.reduce_scatter <cn_api_distributed_stream_reduce_scatter>` ", "规约一组 tensor,随后将规约结果分发到每个进程"
" :ref:`stream.send <cn_api_distributed_stream_send>` ", "发送一个 tensor 到指定进程"
" :ref:`stream.recv <cn_api_distributed_stream_recv>` ", "接收一个来自指定进程的 tensor"

.. _06:

RPC API
::::::::::::::::::::::::::

Expand Down
2 changes: 1 addition & 1 deletion docs/api/paddle/distributed/ReduceOp_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ReduceOp

.. py:class:: paddle.distributed.ReduceOp()

指定规约类操作的逐元素操作类型,需要是下述值之一
指定规约操作的类型,必须是下述值之一

ReduceOp.SUM

Expand Down
18 changes: 10 additions & 8 deletions docs/api/paddle/distributed/all_gather_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ all_gather
-------------------------------


.. py:function:: paddle.distributed.all_gather(tensor_list, tensor, group=0)
.. py:function:: paddle.distributed.all_gather(tensor_list, tensor, group=None, sync_op=True)

进程组内所有进程的指定 tensor 进行聚合操作,并返回给所有进程聚合的结果。
如下图所示,4 个 GPU 分别开启 4 个进程,每张卡上的数据用卡号代表,
经过 all_gather 算子后,每张卡都会拥有所有卡的数据。
组聚合,聚合进程组内的指定 tensor,随后将聚合后的 tensor 列表发送到每个进程。

如下图所示,4 个 GPU 分别开启 1 个进程,进程拥有的数据用其在组内的 rank 表示。
聚合操作后,每个进程都会得到所有进程拥有的数据。

.. image:: ./img/allgather.png
:width: 800
Expand All @@ -17,13 +18,14 @@ all_gather

参数
:::::::::
- **tensor_list** (list) - 操作的输出 Tensor 列表。列表中的每个元素均为 Tensor,每个 Tensor 的数据类型为:float16、float32、float64、int32、int64、int8、uint8、bool、complex64、complex128。
- **tensor** (Tensor) - 操作的输入 Tensor。Tensor 的数据类型为:float16、float32、float64、int32、int64、int8、uint8、bool、complex64、complex128。
- **group** (int,可选) - 工作的进程组编号,默认为 0。
- **tensor_list** (List[Tensor]) - 用于保存聚合结果的 tensor 列表。若不为空,其中每个 tensor 的数据类型必须与输入的 tensor 保持一致。
- **tensor** (Tensor) - 待聚合的 tensor。支持的数据类型包括:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16、complex64、complex128。
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组。
- **sync_op** (bool,可选) - 该操作是否为同步操作。默认为 True,即同步操作。

返回
:::::::::
无返回值。

代码示例
:::::::::
Expand Down
15 changes: 8 additions & 7 deletions docs/api/paddle/distributed/all_gather_object_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@ all_gather_object
-------------------------------


.. py:function:: paddle.distributed.all_gather_object(object_list, object, group=0)
.. py:function:: paddle.distributed.all_gather_object(object_list, obj, group=None)

进程组内所有进程指定的 picklable 对象进行聚合操作,并返回给所有进程聚合的结果。和 all_gather 类似,但可以传入自定义的 python 对象。
组聚合,聚合进程组内指定的 picklable 对象,随后将聚合后的对象列表发送到每个进程。
过程与 ``all_gather`` 类似,但可以传入自定义的 python 对象。

.. warning::
.. note::
该 API 只支持动态图模式。

参数
:::::::::
- **object_list** (list) - 操作的输出 Object 列表
- **object** (Any) - 操作的输入 Object,需要保证输入自定义的 Object 是 picklable 的。
- **group** (int,可选) - 工作的进程组编号,默认为 0
- **object_list** (List[Any]) - 用于保存聚合结果的列表
- **object** (Any) - 待聚合的对象。需要保证该对象是 picklable 的。
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组

返回
:::::::::
无返回值。

代码示例
:::::::::
Expand Down
18 changes: 10 additions & 8 deletions docs/api/paddle/distributed/all_reduce_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ all_reduce
-------------------------------


.. py:function:: paddle.distributed.all_reduce(tensor, op=ReduceOp.SUM, group=0)
.. py:function:: paddle.distributed.all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True)

进程组内所有进程的指定 tensor 进行归约操作,并返回给所有进程归约的结果。
如下图所示,4 个 GPU 分别开启 4 个进程,每张卡上的数据用卡号代表,规约操作为求和,
经过 all_reduce 算子后,每张卡都会拥有所有卡数据的总和。
规约进程组内的一个 tensor,随后将结果发送到每个进程。

如下图所示,4 个 GPU 分别开启 1 个进程,进程拥有的数据用其在组内的 rank 表示,规约操作为求和。
规约操作后,每个进程都会得到所有进程数据的总和。

.. image:: ./img/allreduce.png
:width: 800
Expand All @@ -17,13 +18,14 @@ all_reduce

参数
:::::::::
- **tensor** (Tensor) - 操作的输入 Tensor,同时也会将归约结果返回至此 Tensor 中。Tensor 的数据类型为:float16、float32、float64、int32、int64。
- **op** (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD,可选) - 归约的具体操作,比如求和,取最大值,取最小值和求乘积,默认为求和归约。
- **group** (int,可选) - 工作的进程组编号,默认为 0。
- **tensor** (Tensor) - 输入的 tensor。返回结果也将保存到该 tensor 中。支持的数据类型包括:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16。
- **op** (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD,可选) - 归约的操作类型,包括求和、取最大值、取最小值和求乘积。默认为求和。
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组。
- **sync_op** (bool,可选) - 该操作是否为同步操作。默认为 True,即同步操作。

返回
:::::::::
``Task``。通过 ``Task``,可以查看异步操作的执行状态以及等待异步操作的结果。

代码示例
:::::::::
Expand Down
22 changes: 12 additions & 10 deletions docs/api/paddle/distributed/alltoall_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ alltoall
-------------------------------


.. py:function:: paddle.distributed.alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True)
.. py:function:: paddle.distributed.alltoall(in_tensor_list, out_tensor_list, group=None, sync_op=True)

将 in_tensor_list 里面的 tensors 按照卡数均分并按照卡的顺序分发到所有参与的卡并将结果 tensors 汇总到 out_tensor_list。
如下图所示,GPU0 卡的 in_tensor_list 会按照两张卡拆分成 0_0 和 0_1, GPU1 卡的 in_tensor_list 同样拆分成 1_0 和 1_1,经过 alltoall 算子后,
GPU0 卡的 0_0 会发送给 GPU0,GPU0 卡的 0_1 会发送给 GPU1,GPU1 卡的 1_0 会发送给 GPU0,GPU1 卡的 1_1 会发送给 GPU1,所以 GPU0 卡的 out_tensor_list 包含 0_0 和 1_0,
GPU1 卡的 out_tensor_list 包含 0_1 和 1_1。
将 in_tensor_list 中的一组 tensor 分发到每个进程,随后在每个进程上将分发结果聚合到 out_tensor_list。

如下图所示,2 个 GPU 分别开启 1 个进程,rank=0 的进程的 in_tensor_list 包含 0_0 和 0_1 两个 tensor,rank=1 的进程的 in_tensor_list 包含 1_0 和 1_1 两个 tensor。
操作后,rank=0 的进程的 out_tensor_list 会包含 0_0 和 1_0 两个 tensor,rank=1 的进程的 out_tensor_list 会包含 0_0 和 1_1 两个 tensor。

简单来说,该操作类似于 scatter + gather。更直观地,如果将全部进程上的数据看作一个矩阵,该操作类似于对矩阵进行转置。

.. image:: ./img/alltoall.png
:width: 800
Expand All @@ -18,14 +20,14 @@ GPU1 卡的 out_tensor_list 包含 0_1 和 1_1。

参数
:::::::::
- **in_tensor_list** (list) - 包含所有输入 Tensors 的一个列表。在列表里面的所有元素都必须是一个 Tensor,Tensor 的数据类型必须是 float16、float32、 float64、int32、int64。
- **out_tensor_list** (Tensor) - 包含所有输出 Tensors 的一个列表。在列表里面的所有元素数据类型要和输入的 Tensors 数据类型一致
- **group** (Group,可选) - new_group 返回的 Group 实例,或者设置为 None 表示默认地全局组。默认值:None
- **use_calc_stream** (bool,可选) - 标识使用计算流还是通信流。默认值:True。
- **in_tensor_list** (List[Tensor]) - 输入的 tensor 列表。支持的数据类型包括:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16
- **out_tensor_list** (List[Tensor]) - 用于保存操作结果的 tensor 列表。其中每个 tensor 的数据类型必须与输入的 tensor 保持一致
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组
- **sync_op** (bool,可选) - 该操作是否为同步操作。默认为 True,即同步操作

返回
:::::::::
无返回值。

代码示例
:::::::::
Expand Down
29 changes: 29 additions & 0 deletions docs/api/paddle/distributed/alltoall_single_cn.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
.. _cn_api_distributed_alltoall_single:

alltoall_single
-------------------------------


.. py:function:: paddle.distributed.alltoall_single(in_tensor, out_tensor, in_split_sizes=None, out_split_sizes=None, group=None, sync_op=True)

将输入的 tensor 分发到每个进程,随后在每个进程上将分发结果聚合到 out_tensor 中。

.. note::
该 API 只支持动态图模式。

参数
:::::::::
- **in_tensor** (Tensor): 输入的 tensor。支持的数据类型包括:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16。
- **out_tensor** (Tensor): 用于保存操作结果的 tensor,数据类型必须与输入的 tensor 保持一致。
- **in_split_sizes** (List[int],可选): 对 in_tensor 的 dim[0] 进行切分的大小。默认为 None,即将 in_tensor 均匀地分发到各个进程中(需要确保 in_tensor 的大小能够被组中的进程数整除)。
- **out_split_sizes** (List[int],可选): 对 out_tensor 的 dim[0] 进行切分的大小。默认为 None,即 out_tensor 将均匀地聚合来自各个进程的数据(需要确保 out_tensor 的大小能够被组中的进程数整除)。
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组。
- **sync_op** (bool,可选) - 该操作是否为同步操作。默认为 True,即同步操作。

返回
:::::::::
若为同步操作,无返回值;若为异步操作,返回 ``Task``。通过 ``Task``,可以查看异步操作的执行状态以及等待异步操作的结果。

代码示例
:::::::::
COPY-FROM: paddle.distributed.alltoall_single
6 changes: 3 additions & 3 deletions docs/api/paddle/distributed/barrier_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ barrier
-------------------------------


.. py:function:: paddle.distributed.barrier(group=0)
.. py:function:: paddle.distributed.barrier(group=None)

同步进程组内的所有进程。

参数
:::::::::
- **group** (int,可选) - 工作的进程组编号,默认为 0
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组

返回
:::::::::
无返回值。

代码示例
:::::::::
Expand Down
19 changes: 12 additions & 7 deletions docs/api/paddle/distributed/broadcast_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ broadcast
-------------------------------


.. py:function:: paddle.distributed.broadcast(tensor, src, group=0)
.. py:function:: paddle.distributed.broadcast(tensor, src, group=None, sync_op=True)

广播一个 Tensor 给其他所有进程。
如下图所示,4 个 GPU 分别开启 4 个进程,GPU0 卡拥有数据,经过 broadcast 算子后,会将这个数据传播到所有卡上。
将一个 tensor 发送到每个进程。

如下图所示,4 个 GPU 分别开启 1 个进程,rank=0 的进程拥有数据 0。
广播操作后,数据 0 会被发送到所有进程上。

.. image:: ./img/broadcast.png
:width: 800
Expand All @@ -16,13 +18,16 @@ broadcast

参数
:::::::::
- **tensor** (Tensor) - 如果当前进程编号是源,那么这个 Tensor 变量将被发送给其他进程,否则这个 Tensor 将接收源发送过来的数据。Tensor 的数据类型为:float16、float32、float64、int32、int64。
- **src** (int) - 发送源的进程编号。
- **group** (int,可选) - 工作的进程组编号,默认为 0。
- **tensor** (Tensor) - 在目标进程上为待广播的 tensor,在其他进程上为用于接收广播结果的 tensor。支持的数据类型包括:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16。
- **src** (int) - 目标进程的 rank,该进程传入的 tensor 将被发送到其他进程上。
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组。
- **sync_op** (bool,可选) - 该操作是否为同步操作。默认为 True,即同步操作。

返回
:::::::::
动态图模式下,若为同步操作,无返回值;若为异步操作,返回 ``Task``。通过 ``Task``,可以查看异步操作的执行状态以及等待异步操作的结果。

静态图模式下,无返回值。

代码示例
:::::::::
Expand Down
21 changes: 21 additions & 0 deletions docs/api/paddle/distributed/destroy_process_group_cn.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
.. _cn_api_distributed_destroy_process_group:

destroy_process_group
-------------------------------


.. py:function:: destroy_process_group(group=None)

销毁一个指定的通信组。

参数
:::::::::
- group (ProcessGroup, 可选): 待销毁的通信组。所有通信组都会被销毁(包括默认的通信组),并且整个分布式环境也会回到未被初始化的状态。

返回
:::::::::
无返回值。

代码示例
::::::::::::
COPY-FROM: paddle.distributed.destroy_process_group
Loading