From ab8021d6f7a26feec10c631d4ffdbab3c251be61 Mon Sep 17 00:00:00 2001 From: doujiang24 Date: Fri, 17 Jan 2025 21:40:36 +0800 Subject: [PATCH] [TransferEngine] change: auto discover topology & install transport. (#73) * [TransferEngine] change: auto discover topology & install transport. Signed-off-by: doujiang24 --- doc/en/p2p-store.md | 7 +- doc/en/transfer-engine.md | 70 ++++--------------- doc/en/vllm-integration-v0.2.md | 1 + doc/zh/p2p-store.md | 5 +- doc/zh/run-examples.md | 8 +++ doc/zh/transfer-engine.md | 64 +++-------------- mooncake-integration/vllm/vllm_adaptor.cpp | 5 +- mooncake-integration/vllm/vllm_adaptor.h | 11 +-- .../src/example/p2p-store-example.go | 30 ++------ mooncake-p2p-store/src/p2pstore/core.go | 6 +- .../src/p2pstore/transfer_engine.go | 42 ++++++----- .../example/memory_pool.cpp | 3 +- .../example/transfer_engine_bench.cpp | 61 +++++++++------- .../include/multi_transport.h | 3 +- .../include/transfer_engine.h | 13 +++- .../transport/cxl_transport/cxl_transport.h | 7 +- .../nvmeof_transport/nvmeof_transport.h | 7 +- .../transport/rdma_transport/rdma_transport.h | 5 +- .../transport/tcp_transport/tcp_transport.h | 8 ++- .../include/transport/transport.h | 3 +- mooncake-transfer-engine/rust/src/main.rs | 5 -- .../rust/src/transfer_engine.rs | 41 ++--------- .../src/multi_transport.cpp | 4 +- mooncake-transfer-engine/src/topology.cpp | 4 +- .../src/transfer_engine.cpp | 32 ++++++++- .../transport/cxl_transport/cxl_transport.cpp | 3 +- .../nvmeof_transport/nvmeof_transport.cpp | 13 ++-- .../rdma_transport/rdma_transport.cpp | 28 +++----- .../transport/tcp_transport/tcp_transport.cpp | 3 +- .../src/transport/transport.cpp | 3 +- .../tests/rdma_transport_test.cpp | 6 +- .../tests/rdma_transport_test2.cpp | 3 +- .../tests/tcp_transport_test.cpp | 12 ++-- 33 files changed, 221 insertions(+), 295 deletions(-) diff --git a/doc/en/p2p-store.md b/doc/en/p2p-store.md index 934a70e..e29ef56 100644 --- a/doc/en/p2p-store.md +++ b/doc/en/p2p-store.md @@ -17,7 +17,6 @@ After compiling P2P Store successfully by following the compilation guide with ` ./p2p-store-example --cmd=trainer \ --metadata_server=10.0.0.1:2379 \ --local_server_name=10.0.0.2:12345 \ - --device_name=erdma_0 ``` 3. **Start the simulated inference node.** This node will pull data from the simulated training node or other simulated inference nodes. @@ -27,7 +26,6 @@ After compiling P2P Store successfully by following the compilation guide with ` ./p2p-store-example --cmd=inferencer \ --metadata_server=10.0.0.1:2379 \ --local_server_name=10.0.0.3:12346 \ - --device_name=erdma_1 ``` The test is completed with the display of "ALL DONE". @@ -37,12 +35,11 @@ In the above process, the simulated inference nodes search for data sources, whi Mooncake P2P Store currently implements the following interfaces in Golang: ```go -func NewP2PStore(metadataUri string, localSegmentName string, nicPriorityMatrix string) (*P2PStore, error) +func NewP2PStore(metadataUri string, localSegmentName string) (*P2PStore, error) ``` Creates an instance of `P2PStore`, which internally starts a Transfer Engine service. - `metadataUri`: The hostname or IP address of the metadata server/etcd service. - `localSegmentName`: The local server name (hostname/IP address:port), ensuring uniqueness within the cluster. -- `nicPriorityMatrix`: The network interface card priority order matrix, see the related description in the Transfer Engine API documentation (`TransferEngine::installTransport`). - Return value: If successful, returns a pointer to the `P2PStore` instance, otherwise returns `error`. ```go @@ -63,7 +60,7 @@ Registers a local file to the cluster, making it downloadable by other peers. En - `name`: The file registration name, ensuring uniqueness within the cluster. - `addrList` and `sizeList`: These two arrays represent the memory range of the file, with `addrList` indicating the starting address and `sizeList` indicating the corresponding length. The file content corresponds logically to the order in the arrays. - `maxShardSize`: The internal data sharding granularity, with a recommended value of 64MB. -- `location`: The device name corresponding to this memory segment, matching with `nicPriorityMatrix`. +- `location`: The device name corresponding to this memory segment. ```go func (store *P2PStore) Unregister(ctx context.Context, name string) error diff --git a/doc/en/transfer-engine.md b/doc/en/transfer-engine.md index 7b0f025..d3696af 100644 --- a/doc/en/transfer-engine.md +++ b/doc/en/transfer-engine.md @@ -61,8 +61,6 @@ For instance, as illustrated in figure above, to transfer data from buffer 0 (as To further maximize bandwidth utilization, if a single request's transfer is internally divided into multiple slices if its length exeeds 16KB. Each slice might use a different path, enabling collaborative work among all RDMA NICs. -If you do not want to manually configure the topology matrix, we also provide a function (`mooncake::discoverTopologyMatrix` in `topology.h`) to automatically discover the toplogy between CPU/CUDA and RDMA devices. Supports for more device types are working in progress. The automatic discovery mechanism might not always be accurate, and we welcome your feedbacks and improvement ideas! - ### Endpoint Management Mooncake Store employs a pair of end- points to represent the connection between a local RDMA @@ -140,6 +138,14 @@ After successfully compiling Transfer Engine, the test program `transfer_engine_ > Tip: Advanced users can also pass in a JSON file of the network card priority matrix through `--nic_priority_matrix`, for details, refer to the developer manual of Transfer Engine. - In network environments that only support TCP, the `--protocol=tcp` parameter can be used; in this case, there is no need to specify the `--device_name` parameter. + You can also specify `--auto_discovery` to enable discovery topology automatically, which generates a network card priority matrix based on the operating system configuration. Then, `--device_name` parameter is not required. + ``` + ./transfer_engine_bench --mode=target \ + --metadata_server=10.0.0.1:2379 \ + --local_server_name=10.0.0.2:12345 \ + --auto_discovery + ``` + 1. **Start the initiator node.** ```bash # This is 10.0.0.3 @@ -257,63 +263,11 @@ Recycles `BatchID`, and subsequent operations on `submitTransfer` and `getTransf - Return value: If successful, returns 0; otherwise, returns a negative value. ### Multi-Transport Management -The `TransferEngine` class internally manages multiple backend `Transport` classes, and users can load or unload `Transport` for different backends in `TransferEngine`. - -#### TransferEngine::installTransport -```cpp -Transport* installTransport(const std::string& proto, void** args); -``` - -Registers `Transport` in `TransferEngine`. If a `Transport` for a certain protocol already exists, it returns that `Transport`. -- `proto`: The name of the transport protocol used by `Transport`, currently supporting `tcp`, `rdma`, `nvmeof`. -- `args`: Additional parameters required for `Transport` initialization, presented as a variable-length array, with the last member being `nullptr`. -- Return value: If `proto` is within the determined range, returns the `Transport` corresponding to `proto`; otherwise, returns a null pointer. - -##### TCP Transfer Mode -For TCP transfer mode, there is no need to pass `args` objects when registering the `Transport` object. -```cpp -engine->installTransport("tcp", nullptr); -``` - -##### RDMA Transfer Mode -For RDMA transfer mode, the network card priority marrix must be specified through `args` during the registration of `Transport`. -```cpp -void** args = (void**) malloc(2 * sizeof(void*)); -args[0] = /* topology matrix */; -args[1] = nullptr; -engine->installTransport("rdma", args); -``` -The network card priority marrix is a JSON string indicating the storage medium name and the list of network cards to be used preferentially, as shown in the example below: -```json -{ - "cpu:0": [["mlx0", "mlx1"], ["mlx2", "mlx3"]], - "cuda:0": [["mlx1", "mlx0"]], - ... -} -``` -Each `key` represents the device name corresponding to a CPU socket or a GPU device. -Each `value` is a tuple of (`preferred_nic_list`, `accessable_nic_list`), each of which is a list of NIC names. -- `preferred_nic_list` indicates the preferred NICs, such as NICs directly connected to the CPU rather than across NUMA, or NICs under the same PCIe Switch for GPUs. -- `accessable_nic_list` indicates NICs that are not preferred but can theoretically connect, used for fault retry scenarios. - -##### NVMeOF Transfer Mode -For NVMeOF transfer mode, the file path must be specified through `args` during the registration of `Transport`. -```cpp -void** args = (void**) malloc(2 * sizeof(void*)); -args[0] = /* topology matrix */; -args[1] = nullptr; -engine->installTransport("nvmeof", args); -``` - -#### TransferEngine::uninstallTransport -```cpp -int uninstallTransport(const std::string& proto); -``` - -Unloads `Transport` from `TransferEngine`. -- `proto`: The name of the transport protocol used by `Transport`, currently supporting `rdma`, `nvmeof`. -- Return value: If successful, returns 0; otherwise, returns a negative value. +The `TransferEngine` class internally manages multiple backend `Transport` classes. +And it will discover the toplogy between CPU/CUDA and RDMA devices automatically +(more device types are working in progress, feedbacks are welcome when the automatic discovery mechanism is not accurate), +and it will install `Transport` automatically based on the topology. ### Space Registration diff --git a/doc/en/vllm-integration-v0.2.md b/doc/en/vllm-integration-v0.2.md index 5e99e64..34c2bc1 100644 --- a/doc/en/vllm-integration-v0.2.md +++ b/doc/en/vllm-integration-v0.2.md @@ -67,6 +67,7 @@ pip3 install -e . } ``` +Note: we will support auto-detect in the next version when the `protocol` is absent in the config file. ## Run Example - Please change the IP addresses and ports in the following guide according to your env. diff --git a/doc/zh/p2p-store.md b/doc/zh/p2p-store.md index ab0029d..1c91ca4 100644 --- a/doc/zh/p2p-store.md +++ b/doc/zh/p2p-store.md @@ -17,7 +17,6 @@ P2P Store 提供的是类似 Register 和 GetReplica 的接口。Register 相当 ./p2p-store-example --cmd=trainer \ --metadata_server=10.0.0.1:2379 \ --local_server_name=10.0.0.2:12345 \ - --device_name=erdma_0 ``` 3. **启动模拟推理节点。** 该节点会从模拟训练节点或其他模拟推理节点拉取数据。 @@ -27,7 +26,6 @@ P2P Store 提供的是类似 Register 和 GetReplica 的接口。Register 相当 ./p2p-store-example --cmd=inferencer \ --metadata_server=10.0.0.1:2379 \ --local_server_name=10.0.0.3:12346 \ - --device_name=erdma_1 ``` 测试完毕显示“ALL DONE”。 @@ -48,12 +46,11 @@ P2P Store 基于 [Transfer Engine](transfer-engine.md) 构建,支持在集群 Mooncake P2P Store 目前基于 Golang 实现了下列接口: ```go -func NewP2PStore(metadataUri string, localSegmentName string, nicPriorityMatrix string) (*P2PStore, error) +func NewP2PStore(metadataUri string, localSegmentName string) (*P2PStore, error) ``` 创建 P2PStore 实例,该实例内部会启动一个 Transfer Engine 服务。 - `metadataUri`:元数据服务器/etcd服务所在主机名或 IP 地址。 - `localSegmentName`:本地的服务器名称(主机名/IP地址:端口号),保证在集群内唯一。 -- `nicPriorityMatrix`:网卡优先级顺序表,参见位于 Transfer Engine API 文档的相关描述(`TransferEngine::installTransport`)。 - 返回值:若成功则返回 `P2PStore` 实例指针,否则返回 `error`。 ```go diff --git a/doc/zh/run-examples.md b/doc/zh/run-examples.md index 639df39..21f3190 100644 --- a/doc/zh/run-examples.md +++ b/doc/zh/run-examples.md @@ -106,6 +106,14 @@ Mooncake 支持在执行 `cmake` 命令期间添加下列高级编译选项: > 提示:高级用户还可通过 `--nic_priority_matrix` 传入网卡优先级矩阵 JSON 文件,详细参考 Transfer Engine 的开发者手册。 - 在仅支持 TCP 的网络环境中,可使用 `--protocol=tcp` 参数,此时不需要指定 `--device_name` 参数。 + 也可通过拓扑自动发现功能基于操作系统配置自动生成网卡优先级矩阵,此时不需要指定传输过程使用的 RDMA 网卡名称。 + ``` + ./transfer_engine_bench --mode=target \ + --metadata_server=10.0.0.1:2379 \ + --local_server_name=10.0.0.2:12345 \ + --auto_discovery + ``` + 1. **启动发起节点。** ```bash # This is 10.0.0.3 diff --git a/doc/zh/transfer-engine.md b/doc/zh/transfer-engine.md index 19e404b..54edc74 100644 --- a/doc/zh/transfer-engine.md +++ b/doc/zh/transfer-engine.md @@ -52,8 +52,6 @@ BatchTransfer API 使用请求(Request)对象数组传入用户请求,需 为了进一步最大化带宽利用率,如果单个请求的传输长度超过16KB,则其内部被划分为多个切片。每个切片可能使用不同的路径,使所有RDMA NIC能够协同工作。 -如果不想手动配置拓扑矩阵,也可以直接调用`mooncake::discoverTopologyMatrix`(位于`topology.h`)来自动生成拓扑矩阵。该函数能够自动探查CPU/CUDA和RDMA网卡之间的拓扑关系。对于更多设备种类的支持正在开发中。目前,拓扑自动发现机制可能无法给出准确的硬件拓扑,我们欢迎您的反馈和改进建议! - ### 端点管理 Transfer Engine 使用一对端点来表示本地RDMA NIC和远程RDMA NIC之间的连接。实际上,每个端点包括一个或多个RDMA QP对象。 Transfer Engine 中的连接是按需建立的;端点在第一次请求之前保持未配对状态。 @@ -113,6 +111,14 @@ Transfer Engine 使用SIEVE算法来管理端点的逐出。如果由于链路 > 提示:高级用户还可通过 `--nic_priority_matrix` 传入网卡优先级矩阵 JSON 文件,详细参考 [Transfer Engine 的开发者手册](#transferengineinstallorgettransport)。 - 在仅支持 TCP 的网络环境中,可使用 `--protocol=tcp` 参数,此时不需要指定 `--device_name` 参数。 + 也可通过拓扑自动发现功能基于操作系统配置自动生成网卡优先级矩阵,此时不需要指定传输过程使用的 RDMA 网卡名称。 + ``` + ./transfer_engine_bench --mode=target \ + --metadata_server=10.0.0.1:2379 \ + --local_server_name=10.0.0.2:12345 \ + --auto_discovery + ``` + 1. **启动发起节点。** ```bash # This is 10.0.0.3 @@ -230,60 +236,8 @@ int freeBatchID(BatchID batch_id); - 返回值:若成功,返回 0;否则返回负数值。 ### 多 Transport 管理 -`TransferEngine` 类内部管理多后端的 `Transport` 类,用户可向 `TransferEngine` 中装载或卸载对不同后端进行传输的 `Transport`。 - -#### TransferEngine::installTransport -```cpp -Transport* installTransport(const std::string& proto, void** args); -``` -在 `TransferEngine` 中注册 `Transport`。如果某个协议对应的 `Transport` 已存在,则返回该 `Transport`。 - -- `proto`: `Transport` 使用的传输协议名称,目前支持 `tcp`, `rdma`, `nvmeof`。 -- `args`:以变长数组形式呈现的 `Transport` 初始化需要的其他参数,数组内最后一个成员应当是 `nullptr`。 -- 返回值:若 `proto` 在确定范围内,返回对应 `proto` 的 `Transport`;否则返回空指针。 -**TCP 传输模式:** -对于 TCP 传输模式,注册 `Transport` 期间不需要传入 `args` 对象。 -```cpp -engine->installTransport("tcp", nullptr); -``` - -**RDMA 传输模式:** -对于 RDMA 传输模式,注册 `Transport` 期间需通过 `args` 指定网卡优先级顺序。 -```cpp -void** args = (void**) malloc(2 * sizeof(void*)); -args[0] = /* topology matrix */; -args[1] = nullptr; -engine->installTransport("rdma", args); -``` -网卡优先级顺序是一个 JSON 字符串,表示使用的存储介质名称及优先使用的网卡列表,样例如下: -```json -{ - "cpu:0": [["mlx0", "mlx1"], ["mlx2", "mlx3"]], - "cuda:0": [["mlx1", "mlx0"]], - ... -} -``` -其中每个 `key` 代表一个 CPU socket 或者一个 GPU device 对应的设备名称 -每个 `value` 为一个 (`preferred_nic_list`, `accessable_nic_list`) 的二元组,每一项都是一个 NIC 名称的列表(list)。 -- `preferred_nic_list` 表示优先选择的 NIC,比如对于 CPU 可以是当前直连而非跨 NUMA 的 NIC,对于 GPU 可以是挂在同一个 PCIe Switch 下的 NIC; -- `accessable_nic_list` 表示虽然不优选但是理论上可以连接上的 NIC,用于故障重试场景。 - -**NVMeOF 传输模式:** 对于 NVMeOF 传输模式,注册 `Transport` 期间需通过 `args` 指定文件路径。 -```cpp -void** args = (void**) malloc(2 * sizeof(void*)); -args[0] = /* topology matrix */; -args[1] = nullptr; -engine->installTransport("nvmeof", args); -``` - -#### TransferEngine::uinstallTransport -```cpp -int uninstallTransport(const std::string& proto); -``` -从 `TransferEngine` 中卸载 `Transport`。 -- `proto`: `Transport` 使用的传输协议名称,目前支持 `rdma`, `nvmeof`。 -- 返回值:若成功,返回 0;否则返回负数值。 +`TransferEngine` 类内部管理多后端的 `Transport` 类,并且会自动探查 CPU/CUDA 和 RDMA 网卡之间的拓扑关系(更多设备种类的支持正在开发中,如无法给出准确的硬件拓扑,欢迎您的反馈和改进建议),以及自动安装合适的 `Transport`。 ### 空间注册 diff --git a/mooncake-integration/vllm/vllm_adaptor.cpp b/mooncake-integration/vllm/vllm_adaptor.cpp index 37a5bf2..85f45e3 100644 --- a/mooncake-integration/vllm/vllm_adaptor.cpp +++ b/mooncake-integration/vllm/vllm_adaptor.cpp @@ -82,7 +82,8 @@ int VLLMAdaptor::initializeExt(const char *local_hostname, conn_string = std::string(metadata_type) + "://" + std::string(metadata_server); - engine_ = std::make_unique(); + // TODO: remove `false` in the feature, it's for keep same API in vllm. + engine_ = std::make_unique(false); auto hostname_port = parseHostNameWithPort(local_hostname); int ret = engine_->init(conn_string, local_hostname, hostname_port.first.c_str(), hostname_port.second); @@ -239,4 +240,4 @@ PYBIND11_MODULE(mooncake_vllm_adaptor, m) { .def("readBytesFromBuffer", &VLLMAdaptor::readBytesFromBuffer) .def("expRegisterMemory", &VLLMAdaptor::expRegisterMemory) .def("expUnregisterMemory", &VLLMAdaptor::expUnregisterMemory); -} \ No newline at end of file +} diff --git a/mooncake-integration/vllm/vllm_adaptor.h b/mooncake-integration/vllm/vllm_adaptor.h index 119659d..b20e42a 100644 --- a/mooncake-integration/vllm/vllm_adaptor.h +++ b/mooncake-integration/vllm/vllm_adaptor.h @@ -16,13 +16,13 @@ #include #include #include -#include -#include #include #include #include #include +#include +#include #include "transfer_engine.h" #include "transport/rdma_transport/rdma_transport.h" @@ -46,9 +46,9 @@ class VLLMAdaptor { int initialize(const char *local_hostname, const char *metadata_server, const char *protocol, const char *device_name); - + int initializeExt(const char *local_hostname, const char *metadata_server, - const char *protocol, const char *device_name, + const char *protocol, const char *device_name, const char *metadata_type); uintptr_t allocateManagedBuffer(size_t length); @@ -74,7 +74,8 @@ class VLLMAdaptor { // FOR EXPERIMENT ONLY int expRegisterMemory(uintptr_t buffer_addr, size_t capacity); - int expUnregisterMemory(uintptr_t buffer_addr); // must be called before VLLMAdaptor::~VLLMAdaptor() + // must be called before VLLMAdaptor::~VLLMAdaptor() + int expUnregisterMemory(uintptr_t buffer_addr); private: char *allocateRawBuffer(size_t capacity); diff --git a/mooncake-p2p-store/src/example/p2p-store-example.go b/mooncake-p2p-store/src/example/p2p-store-example.go index eeb5cdb..98804d2 100644 --- a/mooncake-p2p-store/src/example/p2p-store-example.go +++ b/mooncake-p2p-store/src/example/p2p-store-example.go @@ -18,7 +18,6 @@ import ( "context" "flag" "fmt" - "io/ioutil" "os" "syscall" "time" @@ -28,20 +27,16 @@ import ( ) var ( - command string - metadataServer string - localServerName string - deviceName string - nicPriorityMatrixPath string - fileSize int + command string + metadataServer string + localServerName string + fileSize int ) func main() { flag.StringVar(&command, "cmd", "trainer", "Command: trainer|inferencer") flag.StringVar(&metadataServer, "metadata_server", "localhost:2379", "Metadata server address") flag.StringVar(&localServerName, "local_server_name", "", "Local server name") - flag.StringVar(&deviceName, "device_name", "mlx5_2", "RNIC device name") - flag.StringVar(&nicPriorityMatrixPath, "nic_priority_matrix", "", "Path to NIC priority matrix file (Advanced)") flag.IntVar(&fileSize, "file_size_mb", 2048, "File size in MB") flag.Parse() @@ -112,7 +107,7 @@ func trainer() { ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second) defer cancel() - store, err := p2pstore.NewP2PStore(metadataServer, localServerName, getPriorityMatrix()) + store, err := p2pstore.NewP2PStore(metadataServer, localServerName, "") if err != nil { fmt.Fprintf(os.Stderr, "Error creating checkpoint engine: %v\n", err) os.Exit(1) @@ -129,19 +124,6 @@ func trainer() { fmt.Println("ALL DONE") } -func getPriorityMatrix() string { - if len(nicPriorityMatrixPath) != 0 { - data, err := ioutil.ReadFile(nicPriorityMatrixPath) - if err != nil { - fmt.Println("Error reading file:", err) - os.Exit(1) - } - return string(data) - } else { - return "{ \"cpu:0\": [[\"" + deviceName + "\"], []]}" - } -} - func doInferencer(ctx context.Context, store *p2pstore.P2PStore, name string) { addr, err := syscall.Mmap(-1, 0, fileSize, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) if err != nil { @@ -178,7 +160,7 @@ func inferencer() { ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second) defer cancel() - store, err := p2pstore.NewP2PStore(metadataServer, localServerName, getPriorityMatrix()) + store, err := p2pstore.NewP2PStore(metadataServer, localServerName, "") if err != nil { fmt.Fprintf(os.Stderr, "Error creating checkpoint engine: %v\n", err) os.Exit(1) diff --git a/mooncake-p2p-store/src/p2pstore/core.go b/mooncake-p2p-store/src/p2pstore/core.go index 5a759ed..35bec19 100644 --- a/mooncake-p2p-store/src/p2pstore/core.go +++ b/mooncake-p2p-store/src/p2pstore/core.go @@ -189,9 +189,9 @@ func (store *P2PStore) Unregister(ctx context.Context, name string) error { type PayloadInfo struct { Name string // Full name of checkpoint file - MaxShardSize uint64 // - TotalSize uint64 // - SizeList []uint64 // + MaxShardSize uint64 // + TotalSize uint64 // + SizeList []uint64 // } func (store *P2PStore) List(ctx context.Context, namePrefix string) ([]PayloadInfo, error) { diff --git a/mooncake-p2p-store/src/p2pstore/transfer_engine.go b/mooncake-p2p-store/src/p2pstore/transfer_engine.go index 94ee61e..ed701f2 100644 --- a/mooncake-p2p-store/src/p2pstore/transfer_engine.go +++ b/mooncake-p2p-store/src/p2pstore/transfer_engine.go @@ -22,8 +22,11 @@ package p2pstore * All the C functions used here follow this convention. */ -//#cgo LDFLAGS: -L../../../build/mooncake-transfer-engine/src -L../../../thirdparties/lib -ltransfer_engine -lstdc++ -lnuma -lglog -libverbs -ljsoncpp -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc -//#include "../../../mooncake-transfer-engine/include/transfer_engine_c.h" +/* +#cgo LDFLAGS: -L../../../build/mooncake-transfer-engine/src -L../../../thirdparties/lib -ltransfer_engine -lstdc++ -lnuma -lglog -libverbs -ljsoncpp -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc +#include "../../../mooncake-transfer-engine/include/transfer_engine_c.h" +#include +*/ import "C" import ( @@ -53,7 +56,7 @@ func parseServerName(serverName string) (host string, port int) { return host, port } -const ( +var ( rdmaCStr = C.CString("rdma") ) @@ -64,24 +67,27 @@ func NewTransferEngine(metadata_uri string, local_server_name string, nic_priori metadataUri := C.CString(metadata_uri) localServerName := C.CString(local_server_name) connectableName := C.CString(connectable_name) - nicPriorityMatrix := C.CString(nic_priority_matrix) defer C.free(unsafe.Pointer(metadataUri)) defer C.free(unsafe.Pointer(localServerName)) defer C.free(unsafe.Pointer(connectableName)) - defer C.free(unsafe.Pointer(nicPriorityMatrix)) native_engine := C.createTransferEngine(metadataUri, localServerName, connectableName, C.uint64_t(rpc_port)) if native_engine == nil { return nil, ErrTransferEngine } - var args [2]unsafe.Pointer - args[0] = unsafe.Pointer(nicPriorityMatrix) - args[1] = nil - xport := C.installTransport(native_engine, rdmaCStr, &args[0]) - if xport == nil { - C.destroyTransferEngine(native_engine) - return nil, ErrTransferEngine + var xport C.transport_t + if nic_priority_matrix != "" { + nicPriorityMatrix := C.CString(nic_priority_matrix) + defer C.free(unsafe.Pointer(nicPriorityMatrix)) + var args [2]unsafe.Pointer + args[0] = unsafe.Pointer(nicPriorityMatrix) + args[1] = nil + xport = C.installTransport(native_engine, rdmaCStr, &args[0]) + if xport == nil { + C.destroyTransferEngine(native_engine) + return nil, ErrTransferEngine + } } return &TransferEngine{ @@ -91,12 +97,16 @@ func NewTransferEngine(metadata_uri string, local_server_name string, nic_priori } func (engine *TransferEngine) Close() error { - ret := C.uninstallTransport(engine.engine, rdmaCStr) - if ret < 0 { - return ErrTransferEngine + if engine.xport != nil { + ret := C.uninstallTransport(engine.engine, rdmaCStr) + if ret < 0 { + return ErrTransferEngine + } } - C.destroyTransferEngine(engine.engine) + if engine.engine != nil { + C.destroyTransferEngine(engine.engine) + } return nil } diff --git a/mooncake-transfer-engine/example/memory_pool.cpp b/mooncake-transfer-engine/example/memory_pool.cpp index 2a39ea6..43b3485 100644 --- a/mooncake-transfer-engine/example/memory_pool.cpp +++ b/mooncake-transfer-engine/example/memory_pool.cpp @@ -76,7 +76,8 @@ int target() { auto nic_priority_matrix = loadNicPriorityMatrix(); const size_t dram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); void **args = (void **)malloc(2 * sizeof(void *)); args[0] = (void *)nic_priority_matrix.c_str(); diff --git a/mooncake-transfer-engine/example/transfer_engine_bench.cpp b/mooncake-transfer-engine/example/transfer_engine_bench.cpp index e633c64..fb7c8bc 100644 --- a/mooncake-transfer-engine/example/transfer_engine_bench.cpp +++ b/mooncake-transfer-engine/example/transfer_engine_bench.cpp @@ -41,7 +41,8 @@ static void checkCudaError(cudaError_t result, const char *message) { #endif -#define NR_SOCKETS (2) +const static int NR_SOCKETS = + numa_available() ? numa_num_configured_nodes() : 1; static std::string getHostname(); @@ -66,6 +67,7 @@ DEFINE_int32(batch_size, 128, "Batch size"); DEFINE_uint64(block_size, 4096, "Block size for each transfer request"); DEFINE_int32(duration, 10, "Test duration in seconds"); DEFINE_int32(threads, 4, "Task submission threads"); +DEFINE_bool(auto_discovery, false, "Enable auto discovery"); #ifdef USE_CUDA DEFINE_bool(use_vram, true, "Allocate memory from GPU VRAM"); @@ -227,27 +229,29 @@ std::string loadNicPriorityMatrix() { } int initiator() { - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(FLAGS_auto_discovery); auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name); engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(), hostname_port.first.c_str(), hostname_port.second); - Transport *xport = nullptr; - if (FLAGS_protocol == "rdma") { - auto nic_priority_matrix = loadNicPriorityMatrix(); - void **args = (void **)malloc(2 * sizeof(void *)); - args[0] = (void *)nic_priority_matrix.c_str(); - args[1] = nullptr; - xport = engine->installTransport("rdma", args); - } else if (FLAGS_protocol == "tcp") { - xport = engine->installTransport("tcp", nullptr); - } else { - LOG(ERROR) << "Unsupported protocol"; + if (!FLAGS_auto_discovery) { + Transport *xport = nullptr; + if (FLAGS_protocol == "rdma") { + auto nic_priority_matrix = loadNicPriorityMatrix(); + void **args = (void **)malloc(2 * sizeof(void *)); + args[0] = (void *)nic_priority_matrix.c_str(); + args[1] = nullptr; + xport = engine->installTransport("rdma", args); + } else if (FLAGS_protocol == "tcp") { + xport = engine->installTransport("tcp", nullptr); + } else { + LOG(ERROR) << "Unsupported protocol"; + } + LOG_ASSERT(xport); } - LOG_ASSERT(xport); - void *addr[NR_SOCKETS] = {nullptr}; int buffer_num = NR_SOCKETS; @@ -306,22 +310,25 @@ int initiator() { } int target() { - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(FLAGS_auto_discovery); auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name); engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(), hostname_port.first.c_str(), hostname_port.second); - if (FLAGS_protocol == "rdma") { - auto nic_priority_matrix = loadNicPriorityMatrix(); - void **args = (void **)malloc(2 * sizeof(void *)); - args[0] = (void *)nic_priority_matrix.c_str(); - args[1] = nullptr; - engine->installTransport("rdma", args); - } else if (FLAGS_protocol == "tcp") { - engine->installTransport("tcp", nullptr); - } else { - LOG(ERROR) << "Unsupported protocol"; + if (!FLAGS_auto_discovery) { + if (FLAGS_protocol == "rdma") { + auto nic_priority_matrix = loadNicPriorityMatrix(); + void **args = (void **)malloc(2 * sizeof(void *)); + args[0] = (void *)nic_priority_matrix.c_str(); + args[1] = nullptr; + engine->installTransport("rdma", args); + } else if (FLAGS_protocol == "tcp") { + engine->installTransport("tcp", nullptr); + } else { + LOG(ERROR) << "Unsupported protocol"; + } } void *addr[NR_SOCKETS] = {nullptr}; @@ -364,4 +371,4 @@ int main(int argc, char **argv) { LOG(ERROR) << "Unsupported mode: must be 'initiator' or 'target'"; exit(EXIT_FAILURE); -} \ No newline at end of file +} diff --git a/mooncake-transfer-engine/include/multi_transport.h b/mooncake-transfer-engine/include/multi_transport.h index aef3b69..cdbd37f 100644 --- a/mooncake-transfer-engine/include/multi_transport.h +++ b/mooncake-transfer-engine/include/multi_transport.h @@ -44,7 +44,8 @@ class MultiTransport { int getTransferStatus(BatchID batch_id, size_t task_id, TransferStatus &status); - Transport *installTransport(const std::string &proto, void **args); + Transport *installTransport(const std::string &proto, + std::shared_ptr topo); Transport *getTransport(const std::string &proto); diff --git a/mooncake-transfer-engine/include/transfer_engine.h b/mooncake-transfer-engine/include/transfer_engine.h index 4abfdd9..8f3c224 100644 --- a/mooncake-transfer-engine/include/transfer_engine.h +++ b/mooncake-transfer-engine/include/transfer_engine.h @@ -43,17 +43,20 @@ using BufferEntry = Transport::BufferEntry; class TransferEngine { public: - TransferEngine() : metadata_(nullptr) {} + TransferEngine(bool auto_discover = true) + : metadata_(nullptr), + local_topology_(std::make_shared()), + auto_discover_(auto_discover) {} ~TransferEngine() { freeEngine(); } int init(const std::string &metadata_conn_string, const std::string &local_server_name, - const std::string &ip_or_host_name, - uint64_t rpc_port = 12345); + const std::string &ip_or_host_name, uint64_t rpc_port = 12345); int freeEngine(); + // Only for testing. Transport *installTransport(const std::string &proto, void **args); int uninstallTransport(const std::string &proto); @@ -110,6 +113,10 @@ class TransferEngine { std::string local_server_name_; std::shared_ptr multi_transports_; std::vector local_memory_regions_; + std::shared_ptr local_topology_; + // Discover topology and install transports automatically when it's true. + // Set it to false only for testing. + bool auto_discover_; }; } // namespace mooncake diff --git a/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h b/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h index 4cc6f09..efc1a57 100644 --- a/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h +++ b/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h @@ -56,10 +56,11 @@ class CxlTransport : public Transport { private: int install(std::string &local_server_name, - std::shared_ptr meta, void **args) override; + std::shared_ptr meta, + std::shared_ptr topo) override; - int registerLocalMemory(void *addr, size_t length, const std::string &location, - bool remote_accessible, + int registerLocalMemory(void *addr, size_t length, + const std::string &location, bool remote_accessible, bool update_metadata) override; int unregisterLocalMemory(void *addr, diff --git a/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h b/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h index bcaeab7..1db3deb 100644 --- a/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h +++ b/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h @@ -64,10 +64,11 @@ class NVMeoFTransport : public Transport { }; int install(std::string &local_server_name, - std::shared_ptr meta, void **args) override; + std::shared_ptr meta, + std::shared_ptr topo) override; - int registerLocalMemory(void *addr, size_t length, const std::string &location, - bool remote_accessible, + int registerLocalMemory(void *addr, size_t length, + const std::string &location, bool remote_accessible, bool update_metadata) override; int unregisterLocalMemory(void *addr, diff --git a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h index 8b798cc..6bc30ee 100644 --- a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h +++ b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h @@ -54,7 +54,8 @@ class RdmaTransport : public Transport { ~RdmaTransport(); int install(std::string &local_server_name, - std::shared_ptr meta, void **args) override; + std::shared_ptr meta, + std::shared_ptr topo) override; const char *getName() const override { return "rdma"; } @@ -113,7 +114,7 @@ class RdmaTransport : public Transport { private: std::vector> context_list_; std::atomic next_segment_id_; - Topology local_topology_; + std::shared_ptr local_topology_; }; using TransferRequest = Transport::TransferRequest; diff --git a/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h b/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h index 62aba16..a302e7c 100644 --- a/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h +++ b/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h @@ -49,15 +49,17 @@ class TcpTransport : public Transport { int submitTransfer(BatchID batch_id, const std::vector &entries) override; - int submitTransferTask(const std::vector &request_list, - const std::vector &task_list) override; + int submitTransferTask( + const std::vector &request_list, + const std::vector &task_list) override; int getTransferStatus(BatchID batch_id, size_t task_id, TransferStatus &status) override; private: int install(std::string &local_server_name, - std::shared_ptr meta, void **args) override; + std::shared_ptr meta, + std::shared_ptr topo); int allocateLocalSegmentID(); diff --git a/mooncake-transfer-engine/include/transport/transport.h b/mooncake-transfer-engine/include/transport/transport.h index c59ee40..c9bdcce 100644 --- a/mooncake-transfer-engine/include/transport/transport.h +++ b/mooncake-transfer-engine/include/transport/transport.h @@ -186,7 +186,8 @@ class Transport { protected: virtual int install(std::string &local_server_name, - std::shared_ptr meta, void **args); + std::shared_ptr meta, + std::shared_ptr topo); std::string local_server_name_; std::shared_ptr metadata_; diff --git a/mooncake-transfer-engine/rust/src/main.rs b/mooncake-transfer-engine/rust/src/main.rs index a32fff2..cc3a729 100644 --- a/mooncake-transfer-engine/rust/src/main.rs +++ b/mooncake-transfer-engine/rust/src/main.rs @@ -40,9 +40,6 @@ pub struct Args { help = "Operation type: read or write")] pub operation: String, - #[clap(long, default_value_t = String::from("{\"cpu:0\": [[\"mlx5_0\"], []]}"))] - pub nic_priority_matrix: String, - #[clap(long, default_value_t = String::from("127.0.0.1"), help = "Segment ID to access data")] pub segment_id: String, @@ -161,7 +158,6 @@ fn initiator(args: Args) -> Result<()> { let engine = Arc::new(TransferEngine::new( &args.metadata_server, &get_host_ip()?, - &args.nic_priority_matrix, 12345, )?); @@ -216,7 +212,6 @@ fn target(args: Args) -> Result<()> { let engine = TransferEngine::new( &args.metadata_server, &get_host_ip()?, - &args.nic_priority_matrix, 12345, )?; diff --git a/mooncake-transfer-engine/rust/src/transfer_engine.rs b/mooncake-transfer-engine/rust/src/transfer_engine.rs index c9466fb..1621a1d 100644 --- a/mooncake-transfer-engine/rust/src/transfer_engine.rs +++ b/mooncake-transfer-engine/rust/src/transfer_engine.rs @@ -62,66 +62,35 @@ pub struct BufferEntry { pub struct TransferEngine { engine: bindings::transfer_engine_t, - xport: bindings::transport_t, } impl TransferEngine { pub fn new( metadata_uri: &str, local_server_name: &str, - nic_priority_matrix: &str, rpc_port: u64, ) -> Result { let metadata_uri_c = CString::new(metadata_uri).map_err(|_| anyhow!("CString::new failed"))?; let local_server_name_c = CString::new(local_server_name).map_err(|_| anyhow!("CString::new failed"))?; - let nic_priority_matrix_c = - CString::new(nic_priority_matrix).map_err(|_| anyhow!("CString::new failed"))?; - let proto_c = CString::new("rdma").map_err(|_| anyhow!("CString::new failed"))?; - let engine = unsafe { bindings::createTransferEngine(metadata_uri_c.as_ptr()) }; - if engine.is_null() { - bail!("Failed to create TransferEngine") - } - - let ret = unsafe { - bindings::initTransferEngine( - engine, + let engine = unsafe { + bindings::createTransferEngine( + metadata_uri_c.as_ptr(), local_server_name_c.as_ptr(), local_server_name_c.as_ptr(), rpc_port, ) }; - if ret < 0 { - unsafe { - bindings::destroyTransferEngine(engine); - } + if engine.is_null() { bail!("Failed to create TransferEngine") } - let mut args = vec![ - nic_priority_matrix_c.into_raw() as *mut c_void, - std::ptr::null_mut(), - ]; - let xport = - unsafe { bindings::installTransport(engine, proto_c.as_ptr(), args.as_mut_ptr()) }; - - if xport.is_null() { - unsafe { - bindings::destroyTransferEngine(engine); - } - bail!("Failed to install or get Transport") - } - Ok(Self { engine, xport }) + Ok(Self { engine }) } pub fn close(&mut self) -> Result<()> { - let proto_c = CString::new("rdma").map_err(|_| anyhow!("CString::new failed"))?; - let ret = unsafe { bindings::uninstallTransport(self.engine, proto_c.as_ptr()) }; - if ret < 0 { - bail!("Failed to uninstall Transport"); - } unsafe { bindings::destroyTransferEngine(self.engine); } diff --git a/mooncake-transfer-engine/src/multi_transport.cpp b/mooncake-transfer-engine/src/multi_transport.cpp index f2c1cc3..5d489c9 100644 --- a/mooncake-transfer-engine/src/multi_transport.cpp +++ b/mooncake-transfer-engine/src/multi_transport.cpp @@ -124,7 +124,7 @@ int MultiTransport::getTransferStatus(BatchID batch_id, size_t task_id, } Transport *MultiTransport::installTransport(const std::string &proto, - void **args) { + std::shared_ptr topo) { Transport *transport = nullptr; if (std::string(proto) == "rdma") { transport = new RdmaTransport(); @@ -143,7 +143,7 @@ Transport *MultiTransport::installTransport(const std::string &proto, return nullptr; } - if (transport->install(local_server_name_, metadata_, args)) { + if (transport->install(local_server_name_, metadata_, topo)) { return nullptr; } diff --git a/mooncake-transfer-engine/src/topology.cpp b/mooncake-transfer-engine/src/topology.cpp index 41f5627..34cf065 100644 --- a/mooncake-transfer-engine/src/topology.cpp +++ b/mooncake-transfer-engine/src/topology.cpp @@ -49,7 +49,7 @@ static std::vector listInfiniBandDevices() { std::vector devices; if (dir == NULL) { - LOG(WARNING) << "Failed to list /sys/class/infiniband"; + PLOG(WARNING) << "Failed to open /sys/class/infiniband"; return {}; } while ((entry = readdir(dir))) { @@ -91,7 +91,7 @@ static std::vector discoverCpuTopology( std::vector topology; if (dir == NULL) { - LOG(WARNING) << "Failed to list /sys/devices/system/node"; + PLOG(WARNING) << "Failed to open /sys/devices/system/node"; return {}; } while ((entry = readdir(dir))) { diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp index 6b3768b..69046f1 100644 --- a/mooncake-transfer-engine/src/transfer_engine.cpp +++ b/mooncake-transfer-engine/src/transfer_engine.cpp @@ -26,10 +26,27 @@ int TransferEngine::init(const std::string &metadata_conn_string, metadata_ = std::make_shared(metadata_conn_string); multi_transports_ = std::make_shared(metadata_, local_server_name_); + TransferMetadata::RpcMetaDesc desc; desc.ip_or_host_name = ip_or_host_name; desc.rpc_port = rpc_port; - return metadata_->addRpcMetaEntry(local_server_name_, desc); + int ret = metadata_->addRpcMetaEntry(local_server_name_, desc); + if (ret) return ret; + + if (auto_discover_) { + // discover topology automatically + local_topology_->discover(); + + if (local_topology_->getHcaList().size() > 0) { + // only install RDMA transport when there is at least one HCA + multi_transports_->installTransport("rdma", local_topology_); + } else { + multi_transports_->installTransport("tcp", nullptr); + } + // TODO: install other transports automatically + } + + return 0; } int TransferEngine::freeEngine() { @@ -40,6 +57,7 @@ int TransferEngine::freeEngine() { return 0; } +// Only for testing Transport *TransferEngine::installTransport(const std::string &proto, void **args) { Transport *transport = multi_transports_->getTransport(proto); @@ -47,7 +65,17 @@ Transport *TransferEngine::installTransport(const std::string &proto, LOG(INFO) << "Transport " << proto << " already installed"; return transport; } - transport = multi_transports_->installTransport(proto, args); + + if (args != nullptr && args[0] != nullptr) { + const std::string nic_priority_matrix = static_cast(args[0]); + int ret = local_topology_->parse(nic_priority_matrix); + if (ret) { + LOG(ERROR) << "Failed to parse NIC priority matrix"; + return nullptr; + } + } + + transport = multi_transports_->installTransport(proto, local_topology_); if (!transport) return nullptr; for (auto &entry : local_memory_regions_) { int ret = transport->registerLocalMemory( diff --git a/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp b/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp index 284f88e..862e723 100644 --- a/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp +++ b/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp @@ -54,7 +54,8 @@ int CxlTransport::submitTransfer(BatchID batch_id, int CxlTransport::freeBatchID(BatchID batch_id) { return 0; } int CxlTransport::install(std::string &local_server_name, - std::shared_ptr meta, void **args) { + std::shared_ptr meta, + std::shared_ptr topo) { return 0; } diff --git a/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp b/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp index 6a6e5e5..852273c 100644 --- a/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp +++ b/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp @@ -86,7 +86,8 @@ int NVMeoFTransport::getTransferStatus(BatchID batch_id, size_t task_id, for (size_t i = slice_id; i < slice_id + slice_num; ++i) { // LOG(INFO) << "task " << task_id << " i " << i << " upper bound " << // slice_num; - auto event = desc_pool_->getTransferStatus(nvmeof_desc.desc_idx_, slice_id); + auto event = + desc_pool_->getTransferStatus(nvmeof_desc.desc_idx_, slice_id); transfer_status.s = from_cufile_transfer_status(event.status); // TODO(FIXME): what to do if multi slices have different status? if (transfer_status.s == COMPLETED) { @@ -113,7 +114,8 @@ int NVMeoFTransport::submitTransfer( size_t task_id = batch_desc.task_list.size(); size_t slice_id = desc_pool_->getSliceNum(nvmeof_desc.desc_idx_); batch_desc.task_list.resize(task_id + entries.size()); - std::unordered_map> segment_desc_map; + std::unordered_map> + segment_desc_map; // segment_desc_map[LOCAL_SEGMENT_ID] = // getSegmentDescByID(LOCAL_SEGMENT_ID); for (auto &request : entries) { @@ -202,8 +204,8 @@ int NVMeoFTransport::freeBatchID(BatchID batch_id) { int NVMeoFTransport::install(std::string &local_server_name, std::shared_ptr meta, - void **args) { - return Transport::install(local_server_name, meta, args); + std::shared_ptr topo) { + return Transport::install(local_server_name, meta, topo); } int NVMeoFTransport::registerLocalMemory(void *addr, size_t length, @@ -244,7 +246,8 @@ void NVMeoFTransport::addSliceToCUFileBatch( uint64_t desc_id, TransferRequest::OpCode op, CUfileHandle_t fh) { CUfileIOParams_t params; params.mode = CUFILE_BATCH; - params.opcode = op == Transport::TransferRequest::READ ? CUFILE_READ : CUFILE_WRITE; + params.opcode = + op == Transport::TransferRequest::READ ? CUFILE_READ : CUFILE_WRITE; params.cookie = (void *)0; params.u.batch.devPtr_base = source_addr; params.u.batch.devPtr_offset = 0; diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp index 8277c0a..bedd9a9 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp @@ -44,23 +44,17 @@ RdmaTransport::~RdmaTransport() { int RdmaTransport::install(std::string &local_server_name, std::shared_ptr meta, - void **args) { - const std::string nic_priority_matrix = static_cast(args[0]); - bool dry_run = args[1] ? *static_cast(args[1]) : false; - - if (dry_run) return 0; + std::shared_ptr topo) { + if (topo == nullptr) { + LOG(ERROR) << "RdmaTransport: missing topology"; + return ERR_INVALID_ARGUMENT; + } metadata_ = meta; local_server_name_ = local_server_name; + local_topology_ = topo; - int ret = local_topology_.parse(nic_priority_matrix); - if (ret) { - LOG(ERROR) << "RdmaTransport: incorrect NIC priority matrix: " - << nic_priority_matrix; - return ret; - } - - ret = initializeRdmaResources(); + auto ret = initializeRdmaResources(); if (ret) { LOG(ERROR) << "RdmaTransport: cannot initialize RDMA resources"; return ret; @@ -134,7 +128,7 @@ int RdmaTransport::allocateLocalSegmentID() { device_desc.gid = entry->gid(); desc->devices.push_back(device_desc); } - desc->topology = local_topology_; + desc->topology = *(local_topology_.get()); metadata_->addLocalSegment(LOCAL_SEGMENT_ID, local_server_name_, std::move(desc)); return 0; @@ -353,7 +347,7 @@ int RdmaTransport::onSetupRdmaConnections(const HandShakeDesc &peer_desc, std::shared_ptr context; int index = 0; - for (auto &entry : local_topology_.getHcaList()) { + for (auto &entry : local_topology_->getHcaList()) { if (entry == local_nic_name) { context = context_list_[index]; break; @@ -372,13 +366,13 @@ int RdmaTransport::onSetupRdmaConnections(const HandShakeDesc &peer_desc, } int RdmaTransport::initializeRdmaResources() { - if (local_topology_.empty()) { + if (local_topology_->empty()) { LOG(ERROR) << "RdmaTransport: No available RNIC"; return ERR_DEVICE_NOT_FOUND; } std::vector device_speed_list; - for (auto &device_name : local_topology_.getHcaList()) { + for (auto &device_name : local_topology_->getHcaList()) { auto context = std::make_shared(*this, device_name); if (!context) return ERR_MEMORY; diff --git a/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp b/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp index bce4e1d..ef74d9f 100644 --- a/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp +++ b/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp @@ -202,7 +202,8 @@ TcpTransport::~TcpTransport() { } int TcpTransport::install(std::string &local_server_name, - std::shared_ptr meta, void **args) { + std::shared_ptr meta, + std::shared_ptr topo) { metadata_ = meta; local_server_name_ = local_server_name; diff --git a/mooncake-transfer-engine/src/transport/transport.cpp b/mooncake-transfer-engine/src/transport/transport.cpp index 3350eee..12df7eb 100644 --- a/mooncake-transfer-engine/src/transport/transport.cpp +++ b/mooncake-transfer-engine/src/transport/transport.cpp @@ -51,7 +51,8 @@ int Transport::freeBatchID(BatchID batch_id) { } int Transport::install(std::string &local_server_name, - std::shared_ptr meta, void **args) { + std::shared_ptr meta, + std::shared_ptr topo) { local_server_name_ = local_server_name; metadata_ = meta; return 0; diff --git a/mooncake-transfer-engine/tests/rdma_transport_test.cpp b/mooncake-transfer-engine/tests/rdma_transport_test.cpp index a2c7a29..7af0662 100644 --- a/mooncake-transfer-engine/tests/rdma_transport_test.cpp +++ b/mooncake-transfer-engine/tests/rdma_transport_test.cpp @@ -240,7 +240,8 @@ std::string loadNicPriorityMatrix() { int initiator() { const size_t ram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name); engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(), @@ -285,7 +286,8 @@ int initiator() { int target() { const size_t ram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name); engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(), diff --git a/mooncake-transfer-engine/tests/rdma_transport_test2.cpp b/mooncake-transfer-engine/tests/rdma_transport_test2.cpp index 617fc52..2758d67 100644 --- a/mooncake-transfer-engine/tests/rdma_transport_test2.cpp +++ b/mooncake-transfer-engine/tests/rdma_transport_test2.cpp @@ -121,7 +121,8 @@ class RDMATransportTest : public ::testing::Test { LOG(INFO) << "HERE \n"; google::InitGoogleLogging("RDMATransportTest"); FLAGS_logtostderr = 1; - engine = std::make_unique(); + // disable topology auto discovery for testing. + engine = std::make_unique(false); hostname_port = parseHostNameWithPort(FLAGS_local_server_name); engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(), hostname_port.first.c_str(), diff --git a/mooncake-transfer-engine/tests/tcp_transport_test.cpp b/mooncake-transfer-engine/tests/tcp_transport_test.cpp index 862d88c..d64a3bf 100644 --- a/mooncake-transfer-engine/tests/tcp_transport_test.cpp +++ b/mooncake-transfer-engine/tests/tcp_transport_test.cpp @@ -87,7 +87,8 @@ static void *allocateMemoryPool(size_t size, int socket_id, } TEST_F(TCPTransportTest, GetTcpTest) { - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(local_server_name); auto rc = engine->init(metadata_server, local_server_name, hostname_port.first.c_str(), hostname_port.second); @@ -101,7 +102,8 @@ TEST_F(TCPTransportTest, Writetest) { const size_t kDataLength = 4096000; void *addr = nullptr; const size_t ram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(local_server_name); auto rc = engine->init(metadata_server, local_server_name, hostname_port.first.c_str(), hostname_port.second); @@ -145,7 +147,8 @@ TEST_F(TCPTransportTest, WriteAndReadtest) { const size_t kDataLength = 4096000; void *addr = nullptr; const size_t ram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(local_server_name); engine->init(metadata_server, local_server_name, hostname_port.first.c_str(), hostname_port.second); @@ -216,7 +219,8 @@ TEST_F(TCPTransportTest, WriteAndRead2test) { const size_t kDataLength = 4096000; void *addr = nullptr; const size_t ram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(local_server_name); engine->init(metadata_server, local_server_name, hostname_port.first.c_str(), hostname_port.second);