Skip to content

Commit

Permalink
[TransferEngine] change: auto discover topology & install transport. (#…
Browse files Browse the repository at this point in the history
…73)

* [TransferEngine] change: auto discover topology & install transport.

Signed-off-by: doujiang24 <[email protected]>
  • Loading branch information
doujiang24 authored Jan 17, 2025
1 parent 8fed27b commit ab8021d
Show file tree
Hide file tree
Showing 33 changed files with 221 additions and 295 deletions.
7 changes: 2 additions & 5 deletions doc/en/p2p-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ After compiling P2P Store successfully by following the compilation guide with `
./p2p-store-example --cmd=trainer \
--metadata_server=10.0.0.1:2379 \
--local_server_name=10.0.0.2:12345 \
--device_name=erdma_0
```

3. **Start the simulated inference node.** This node will pull data from the simulated training node or other simulated inference nodes.
Expand All @@ -27,7 +26,6 @@ After compiling P2P Store successfully by following the compilation guide with `
./p2p-store-example --cmd=inferencer \
--metadata_server=10.0.0.1:2379 \
--local_server_name=10.0.0.3:12346 \
--device_name=erdma_1
```
The test is completed with the display of "ALL DONE".

Expand All @@ -37,12 +35,11 @@ In the above process, the simulated inference nodes search for data sources, whi
Mooncake P2P Store currently implements the following interfaces in Golang:

```go
func NewP2PStore(metadataUri string, localSegmentName string, nicPriorityMatrix string) (*P2PStore, error)
func NewP2PStore(metadataUri string, localSegmentName string) (*P2PStore, error)
```
Creates an instance of `P2PStore`, which internally starts a Transfer Engine service.
- `metadataUri`: The hostname or IP address of the metadata server/etcd service.
- `localSegmentName`: The local server name (hostname/IP address:port), ensuring uniqueness within the cluster.
- `nicPriorityMatrix`: The network interface card priority order matrix, see the related description in the Transfer Engine API documentation (`TransferEngine::installTransport`).
- Return value: If successful, returns a pointer to the `P2PStore` instance, otherwise returns `error`.

```go
Expand All @@ -63,7 +60,7 @@ Registers a local file to the cluster, making it downloadable by other peers. En
- `name`: The file registration name, ensuring uniqueness within the cluster.
- `addrList` and `sizeList`: These two arrays represent the memory range of the file, with `addrList` indicating the starting address and `sizeList` indicating the corresponding length. The file content corresponds logically to the order in the arrays.
- `maxShardSize`: The internal data sharding granularity, with a recommended value of 64MB.
- `location`: The device name corresponding to this memory segment, matching with `nicPriorityMatrix`.
- `location`: The device name corresponding to this memory segment.

```go
func (store *P2PStore) Unregister(ctx context.Context, name string) error
Expand Down
70 changes: 12 additions & 58 deletions doc/en/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ For instance, as illustrated in figure above, to transfer data from buffer 0 (as
To further maximize bandwidth utilization, if a single request's transfer is internally divided into multiple slices if its length exeeds 16KB.
Each slice might use a different path, enabling collaborative work among all RDMA NICs.

If you do not want to manually configure the topology matrix, we also provide a function (`mooncake::discoverTopologyMatrix` in `topology.h`) to automatically discover the toplogy between CPU/CUDA and RDMA devices. Supports for more device types are working in progress. The automatic discovery mechanism might not always be accurate, and we welcome your feedbacks and improvement ideas!

### Endpoint Management
Mooncake Store employs a pair of end-
points to represent the connection between a local RDMA
Expand Down Expand Up @@ -140,6 +138,14 @@ After successfully compiling Transfer Engine, the test program `transfer_engine_
> Tip: Advanced users can also pass in a JSON file of the network card priority matrix through `--nic_priority_matrix`, for details, refer to the developer manual of Transfer Engine.
- In network environments that only support TCP, the `--protocol=tcp` parameter can be used; in this case, there is no need to specify the `--device_name` parameter.

You can also specify `--auto_discovery` to enable discovery topology automatically, which generates a network card priority matrix based on the operating system configuration. Then, `--device_name` parameter is not required.
```
./transfer_engine_bench --mode=target \
--metadata_server=10.0.0.1:2379 \
--local_server_name=10.0.0.2:12345 \
--auto_discovery
```

1. **Start the initiator node.**
```bash
# This is 10.0.0.3
Expand Down Expand Up @@ -257,63 +263,11 @@ Recycles `BatchID`, and subsequent operations on `submitTransfer` and `getTransf
- Return value: If successful, returns 0; otherwise, returns a negative value.
### Multi-Transport Management
The `TransferEngine` class internally manages multiple backend `Transport` classes, and users can load or unload `Transport` for different backends in `TransferEngine`.
#### TransferEngine::installTransport
```cpp
Transport* installTransport(const std::string& proto, void** args);
```
Registers `Transport` in `TransferEngine`. If a `Transport` for a certain protocol already exists, it returns that `Transport`.
- `proto`: The name of the transport protocol used by `Transport`, currently supporting `tcp`, `rdma`, `nvmeof`.
- `args`: Additional parameters required for `Transport` initialization, presented as a variable-length array, with the last member being `nullptr`.
- Return value: If `proto` is within the determined range, returns the `Transport` corresponding to `proto`; otherwise, returns a null pointer.
##### TCP Transfer Mode
For TCP transfer mode, there is no need to pass `args` objects when registering the `Transport` object.
```cpp
engine->installTransport("tcp", nullptr);
```
##### RDMA Transfer Mode
For RDMA transfer mode, the network card priority marrix must be specified through `args` during the registration of `Transport`.
```cpp
void** args = (void**) malloc(2 * sizeof(void*));
args[0] = /* topology matrix */;
args[1] = nullptr;
engine->installTransport("rdma", args);
```
The network card priority marrix is a JSON string indicating the storage medium name and the list of network cards to be used preferentially, as shown in the example below:
```json
{
"cpu:0": [["mlx0", "mlx1"], ["mlx2", "mlx3"]],
"cuda:0": [["mlx1", "mlx0"]],
...
}
```
Each `key` represents the device name corresponding to a CPU socket or a GPU device.
Each `value` is a tuple of (`preferred_nic_list`, `accessable_nic_list`), each of which is a list of NIC names.
- `preferred_nic_list` indicates the preferred NICs, such as NICs directly connected to the CPU rather than across NUMA, or NICs under the same PCIe Switch for GPUs.
- `accessable_nic_list` indicates NICs that are not preferred but can theoretically connect, used for fault retry scenarios.
##### NVMeOF Transfer Mode
For NVMeOF transfer mode, the file path must be specified through `args` during the registration of `Transport`.
```cpp
void** args = (void**) malloc(2 * sizeof(void*));
args[0] = /* topology matrix */;
args[1] = nullptr;
engine->installTransport("nvmeof", args);
```
#### TransferEngine::uninstallTransport
```cpp
int uninstallTransport(const std::string& proto);
```
Unloads `Transport` from `TransferEngine`.
- `proto`: The name of the transport protocol used by `Transport`, currently supporting `rdma`, `nvmeof`.
- Return value: If successful, returns 0; otherwise, returns a negative value.
The `TransferEngine` class internally manages multiple backend `Transport` classes.
And it will discover the toplogy between CPU/CUDA and RDMA devices automatically
(more device types are working in progress, feedbacks are welcome when the automatic discovery mechanism is not accurate),
and it will install `Transport` automatically based on the topology.
### Space Registration
Expand Down
1 change: 1 addition & 0 deletions doc/en/vllm-integration-v0.2.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pip3 install -e .
}
```

Note: we will support auto-detect in the next version when the `protocol` is absent in the config file.

## Run Example
- Please change the IP addresses and ports in the following guide according to your env.
Expand Down
5 changes: 1 addition & 4 deletions doc/zh/p2p-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ P2P Store 提供的是类似 Register 和 GetReplica 的接口。Register 相当
./p2p-store-example --cmd=trainer \
--metadata_server=10.0.0.1:2379 \
--local_server_name=10.0.0.2:12345 \
--device_name=erdma_0
```

3. **启动模拟推理节点。** 该节点会从模拟训练节点或其他模拟推理节点拉取数据。
Expand All @@ -27,7 +26,6 @@ P2P Store 提供的是类似 Register 和 GetReplica 的接口。Register 相当
./p2p-store-example --cmd=inferencer \
--metadata_server=10.0.0.1:2379 \
--local_server_name=10.0.0.3:12346 \
--device_name=erdma_1
```
测试完毕显示“ALL DONE”。

Expand All @@ -48,12 +46,11 @@ P2P Store 基于 [Transfer Engine](transfer-engine.md) 构建,支持在集群
Mooncake P2P Store 目前基于 Golang 实现了下列接口:

```go
func NewP2PStore(metadataUri string, localSegmentName string, nicPriorityMatrix string) (*P2PStore, error)
func NewP2PStore(metadataUri string, localSegmentName string) (*P2PStore, error)
```
创建 P2PStore 实例,该实例内部会启动一个 Transfer Engine 服务。
- `metadataUri`:元数据服务器/etcd服务所在主机名或 IP 地址。
- `localSegmentName`:本地的服务器名称(主机名/IP地址:端口号),保证在集群内唯一。
- `nicPriorityMatrix`:网卡优先级顺序表,参见位于 Transfer Engine API 文档的相关描述(`TransferEngine::installTransport`)。
- 返回值:若成功则返回 `P2PStore` 实例指针,否则返回 `error`。

```go
Expand Down
8 changes: 8 additions & 0 deletions doc/zh/run-examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ Mooncake 支持在执行 `cmake` 命令期间添加下列高级编译选项:
> 提示:高级用户还可通过 `--nic_priority_matrix` 传入网卡优先级矩阵 JSON 文件,详细参考 Transfer Engine 的开发者手册。
- 在仅支持 TCP 的网络环境中,可使用 `--protocol=tcp` 参数,此时不需要指定 `--device_name` 参数。

也可通过拓扑自动发现功能基于操作系统配置自动生成网卡优先级矩阵,此时不需要指定传输过程使用的 RDMA 网卡名称。
```
./transfer_engine_bench --mode=target \
--metadata_server=10.0.0.1:2379 \
--local_server_name=10.0.0.2:12345 \
--auto_discovery
```
1. **启动发起节点。**
```bash
# This is 10.0.0.3
Expand Down
64 changes: 9 additions & 55 deletions doc/zh/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ BatchTransfer API 使用请求(Request)对象数组传入用户请求,需

为了进一步最大化带宽利用率,如果单个请求的传输长度超过16KB,则其内部被划分为多个切片。每个切片可能使用不同的路径,使所有RDMA NIC能够协同工作。

如果不想手动配置拓扑矩阵,也可以直接调用`mooncake::discoverTopologyMatrix`(位于`topology.h`)来自动生成拓扑矩阵。该函数能够自动探查CPU/CUDA和RDMA网卡之间的拓扑关系。对于更多设备种类的支持正在开发中。目前,拓扑自动发现机制可能无法给出准确的硬件拓扑,我们欢迎您的反馈和改进建议!

### 端点管理
Transfer Engine 使用一对端点来表示本地RDMA NIC和远程RDMA NIC之间的连接。实际上,每个端点包括一个或多个RDMA QP对象。
Transfer Engine 中的连接是按需建立的;端点在第一次请求之前保持未配对状态。
Expand Down Expand Up @@ -113,6 +111,14 @@ Transfer Engine 使用SIEVE算法来管理端点的逐出。如果由于链路
> 提示:高级用户还可通过 `--nic_priority_matrix` 传入网卡优先级矩阵 JSON 文件,详细参考 [Transfer Engine 的开发者手册](#transferengineinstallorgettransport)。
- 在仅支持 TCP 的网络环境中,可使用 `--protocol=tcp` 参数,此时不需要指定 `--device_name` 参数。

也可通过拓扑自动发现功能基于操作系统配置自动生成网卡优先级矩阵,此时不需要指定传输过程使用的 RDMA 网卡名称。
```
./transfer_engine_bench --mode=target \
--metadata_server=10.0.0.1:2379 \
--local_server_name=10.0.0.2:12345 \
--auto_discovery
```

1. **启动发起节点。**
```bash
# This is 10.0.0.3
Expand Down Expand Up @@ -230,60 +236,8 @@ int freeBatchID(BatchID batch_id);
- 返回值:若成功,返回 0;否则返回负数值。
### 多 Transport 管理
`TransferEngine` 类内部管理多后端的 `Transport` 类,用户可向 `TransferEngine` 中装载或卸载对不同后端进行传输的 `Transport`
#### TransferEngine::installTransport
```cpp
Transport* installTransport(const std::string& proto, void** args);
```
`TransferEngine` 中注册 `Transport`。如果某个协议对应的 `Transport` 已存在,则返回该 `Transport`
- `proto`: `Transport` 使用的传输协议名称,目前支持 `tcp`, `rdma`, `nvmeof`
- `args`:以变长数组形式呈现的 `Transport` 初始化需要的其他参数,数组内最后一个成员应当是 `nullptr`
- 返回值:若 `proto` 在确定范围内,返回对应 `proto``Transport`;否则返回空指针。
**TCP 传输模式:**
对于 TCP 传输模式,注册 `Transport` 期间不需要传入 `args` 对象。
```cpp
engine->installTransport("tcp", nullptr);
```
**RDMA 传输模式:**
对于 RDMA 传输模式,注册 `Transport` 期间需通过 `args` 指定网卡优先级顺序。
```cpp
void** args = (void**) malloc(2 * sizeof(void*));
args[0] = /* topology matrix */;
args[1] = nullptr;
engine->installTransport("rdma", args);
```
网卡优先级顺序是一个 JSON 字符串,表示使用的存储介质名称及优先使用的网卡列表,样例如下:
```json
{
"cpu:0": [["mlx0", "mlx1"], ["mlx2", "mlx3"]],
"cuda:0": [["mlx1", "mlx0"]],
...
}
```
其中每个 `key` 代表一个 CPU socket 或者一个 GPU device 对应的设备名称
每个 `value` 为一个 (`preferred_nic_list`, `accessable_nic_list`) 的二元组,每一项都是一个 NIC 名称的列表(list)。
- `preferred_nic_list` 表示优先选择的 NIC,比如对于 CPU 可以是当前直连而非跨 NUMA 的 NIC,对于 GPU 可以是挂在同一个 PCIe Switch 下的 NIC;
- `accessable_nic_list` 表示虽然不优选但是理论上可以连接上的 NIC,用于故障重试场景。
**NVMeOF 传输模式:** 对于 NVMeOF 传输模式,注册 `Transport` 期间需通过 `args` 指定文件路径。
```cpp
void** args = (void**) malloc(2 * sizeof(void*));
args[0] = /* topology matrix */;
args[1] = nullptr;
engine->installTransport("nvmeof", args);
```
#### TransferEngine::uinstallTransport
```cpp
int uninstallTransport(const std::string& proto);
```
`TransferEngine` 中卸载 `Transport`
- `proto`: `Transport` 使用的传输协议名称,目前支持 `rdma`, `nvmeof`
- 返回值:若成功,返回 0;否则返回负数值。
`TransferEngine` 类内部管理多后端的 `Transport` 类,并且会自动探查 CPU/CUDA 和 RDMA 网卡之间的拓扑关系(更多设备种类的支持正在开发中,如无法给出准确的硬件拓扑,欢迎您的反馈和改进建议),以及自动安装合适的 `Transport`
### 空间注册
Expand Down
5 changes: 3 additions & 2 deletions mooncake-integration/vllm/vllm_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ int VLLMAdaptor::initializeExt(const char *local_hostname,
conn_string =
std::string(metadata_type) + "://" + std::string(metadata_server);

engine_ = std::make_unique<TransferEngine>();
// TODO: remove `false` in the feature, it's for keep same API in vllm.
engine_ = std::make_unique<TransferEngine>(false);
auto hostname_port = parseHostNameWithPort(local_hostname);
int ret = engine_->init(conn_string, local_hostname,
hostname_port.first.c_str(), hostname_port.second);
Expand Down Expand Up @@ -239,4 +240,4 @@ PYBIND11_MODULE(mooncake_vllm_adaptor, m) {
.def("readBytesFromBuffer", &VLLMAdaptor::readBytesFromBuffer)
.def("expRegisterMemory", &VLLMAdaptor::expRegisterMemory)
.def("expUnregisterMemory", &VLLMAdaptor::expUnregisterMemory);
}
}
11 changes: 6 additions & 5 deletions mooncake-integration/vllm/vllm_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
#include <glog/logging.h>
#include <pybind11/pybind11.h>
#include <sys/time.h>
#include <vector>
#include <stack>

#include <cstdlib>
#include <fstream>
#include <iomanip>
#include <memory>
#include <stack>
#include <vector>

#include "transfer_engine.h"
#include "transport/rdma_transport/rdma_transport.h"
Expand All @@ -46,9 +46,9 @@ class VLLMAdaptor {

int initialize(const char *local_hostname, const char *metadata_server,
const char *protocol, const char *device_name);

int initializeExt(const char *local_hostname, const char *metadata_server,
const char *protocol, const char *device_name,
const char *protocol, const char *device_name,
const char *metadata_type);

uintptr_t allocateManagedBuffer(size_t length);
Expand All @@ -74,7 +74,8 @@ class VLLMAdaptor {
// FOR EXPERIMENT ONLY
int expRegisterMemory(uintptr_t buffer_addr, size_t capacity);

int expUnregisterMemory(uintptr_t buffer_addr); // must be called before VLLMAdaptor::~VLLMAdaptor()
// must be called before VLLMAdaptor::~VLLMAdaptor()
int expUnregisterMemory(uintptr_t buffer_addr);

private:
char *allocateRawBuffer(size_t capacity);
Expand Down
30 changes: 6 additions & 24 deletions mooncake-p2p-store/src/example/p2p-store-example.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"flag"
"fmt"
"io/ioutil"
"os"
"syscall"
"time"
Expand All @@ -28,20 +27,16 @@ import (
)

var (
command string
metadataServer string
localServerName string
deviceName string
nicPriorityMatrixPath string
fileSize int
command string
metadataServer string
localServerName string
fileSize int
)

func main() {
flag.StringVar(&command, "cmd", "trainer", "Command: trainer|inferencer")
flag.StringVar(&metadataServer, "metadata_server", "localhost:2379", "Metadata server address")
flag.StringVar(&localServerName, "local_server_name", "", "Local server name")
flag.StringVar(&deviceName, "device_name", "mlx5_2", "RNIC device name")
flag.StringVar(&nicPriorityMatrixPath, "nic_priority_matrix", "", "Path to NIC priority matrix file (Advanced)")
flag.IntVar(&fileSize, "file_size_mb", 2048, "File size in MB")
flag.Parse()

Expand Down Expand Up @@ -112,7 +107,7 @@ func trainer() {
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second)
defer cancel()

store, err := p2pstore.NewP2PStore(metadataServer, localServerName, getPriorityMatrix())
store, err := p2pstore.NewP2PStore(metadataServer, localServerName, "")
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating checkpoint engine: %v\n", err)
os.Exit(1)
Expand All @@ -129,19 +124,6 @@ func trainer() {
fmt.Println("ALL DONE")
}

func getPriorityMatrix() string {
if len(nicPriorityMatrixPath) != 0 {
data, err := ioutil.ReadFile(nicPriorityMatrixPath)
if err != nil {
fmt.Println("Error reading file:", err)
os.Exit(1)
}
return string(data)
} else {
return "{ \"cpu:0\": [[\"" + deviceName + "\"], []]}"
}
}

func doInferencer(ctx context.Context, store *p2pstore.P2PStore, name string) {
addr, err := syscall.Mmap(-1, 0, fileSize, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
if err != nil {
Expand Down Expand Up @@ -178,7 +160,7 @@ func inferencer() {
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second)
defer cancel()

store, err := p2pstore.NewP2PStore(metadataServer, localServerName, getPriorityMatrix())
store, err := p2pstore.NewP2PStore(metadataServer, localServerName, "")
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating checkpoint engine: %v\n", err)
os.Exit(1)
Expand Down
Loading

0 comments on commit ab8021d

Please sign in to comment.