From 7b19bf9d3cebba677b21b135a34e1b8cc2e3fff7 Mon Sep 17 00:00:00 2001 From: ubuntu Date: Thu, 5 Jan 2023 16:41:32 +0800 Subject: [PATCH] add commit --- include/ps/base.h | 3 + include/ps/internal/customer.h | 45 +++++++++- include/ps/internal/env.h | 1 + include/ps/internal/message.h | 25 ++++++ include/ps/internal/postoffice.h | 46 +++++++--- include/ps/internal/van.h | 37 +++++--- include/ps/kv_app.h | 19 ++-- include/ps/sarray.h | 7 ++ include/ps/simple_app.h | 3 +- src/customer.cc | 27 +++++- src/postoffice.cc | 20 +++-- src/resender.h | 5 ++ src/van.cc | 149 ++++++++++++++++++++++++------- tests/local.sh | 5 ++ tests/test_kv_app.cc | 12 +-- 15 files changed, 320 insertions(+), 84 deletions(-) diff --git a/include/ps/base.h b/include/ps/base.h index a18b62ce..9c039afc 100644 --- a/include/ps/base.h +++ b/include/ps/base.h @@ -7,6 +7,9 @@ #include "ps/internal/utils.h" namespace ps { +/** + * ps-lite的key只支持int类型 + */ #if USE_KEY32 /*! \brief Use unsigned 32-bit int as the key type */ using Key = uint32_t; diff --git a/include/ps/internal/customer.h b/include/ps/internal/customer.h index e227b532..21c78097 100644 --- a/include/ps/internal/customer.h +++ b/include/ps/internal/customer.h @@ -22,7 +22,28 @@ namespace ps { * * It has its own receiving thread which is able to process any message received * from a remote node with `msg.meta.customer_id` equal to this customer's id + * 每个SimpleApp对象持有一个Customer类的成员,且Customer需要在PostOffice进行注册,该类主要负责: + * 1.跟踪由SimpleApp发送出去的消息的回复情况; + * 2.维护一个Node的消息队列,为Node接收消息; + * 一个 app 实例可以对应多个 Customer; + * Customer 需要注册到 Postoffice 之中; */ +/** + * Customer 其实有两个功能: + * 作为一个发送方,用于追踪SimpleApp发送出去每个Request对应的Response情况; + * 作为接收方,因为有自己的接受线程和接受消息队列,所以Customer实际上是作为一个接受消息处理引擎(或者说是引擎的一部分)存在; + * 具体特点如下: + * 每个SimpleApp对象持有一个Customer类的成员,且Customer需要在PostOffice进行注册。 + * 因为 Customer 同时又要处理Message 但是其本身并没有接管网络,因此实际的Response和Message需要外部调用者告诉它,所以功能和职责上有点分裂。 + * 每一个连接对应一个Customer实例,每个Customer都与某个node id相绑定,代表当前节点发送到对应node id节点。连接对方的id和Customer实例的id相同。 + * 新建一次request,会返回一个timestamp,这个timestamp会作为这次request的id,每次请求会自增1,相应的res也会自增1,调用wait时会保证 后续比如做Wait以此为ID识别。 +*/ +/** + * Van::ProcessDataMsg ---> Customer::Accept ---> Customer::recv_queue_ ---> Customer::recv_thread_ ---> Customer::recv_handle_ +*/ +/** + * 为什么 Server 端,app_id 与 customer_id 相等?猜测是在 ps 代码中,Server 端也是有多个 cusomer,但是出于精简目的,在 ps-lite 之中删除了这部分功能,因此在 ps-lite 之中,app_id 与 customer_id 相等。 +*/ class Customer { public: /** @@ -99,13 +120,29 @@ class Customer { int customer_id_; - RecvHandle recv_handle_; - ThreadsafePQueue recv_queue_; - std::unique_ptr recv_thread_; +/** + * @brief + * 绑定Customer接收到request后的处理函数(SimpleApp::Process); + * Customer会新拉起一个线程,用于在customer生命周期内,使用recv_handle_来处理接受的请求,这里是使用了一个线程安全队列,Accept()用于往队列中一直发送消息, + * 接受到的消息来自于Van的receiving thread,即每个节点的Van对象收到message后,根据message的不同,推送到不同的customer对象中。 + * 对于Worker,比如KVWorker,recv_handle_保存拉取的msg中的数据, + * 对于Server,需要使用set_request_handle来设置对应的处理函数,如KVServerDefaultHandle, + */ + RecvHandle recv_handle_; //worker 或者 server 的消息处理函数。 + ThreadsafePQueue recv_queue_; //线程安全的消息队列; + std::unique_ptr recv_thread_; //不断从 recv_queue 读取message并调用 recv_handle_; std::mutex tracker_mu_; std::condition_variable tracker_cond_; - std::vector> tracker_; + /** + * @brief + * tracker_是Customer内用来记录request和response的状态的map。记录了每个 request(使用request id)可能发送了多少节点 以及 从多少个节点返回的 response的次数, + * tracker_下标为每个request 的timestamp,即Request编号。 + * tracker_[i] . first 表示该请求发送给了多少节点,即本节点应收到的Response数量。 + * tracker_[i] . second 表示目前为止实际收到的Response数量。 + */ + std::vector> tracker_; //request & response 的同步变量。 + DISALLOW_COPY_AND_ASSIGN(Customer); }; diff --git a/include/ps/internal/env.h b/include/ps/internal/env.h index 30ec51e9..d6acf0f5 100644 --- a/include/ps/internal/env.h +++ b/include/ps/internal/env.h @@ -11,6 +11,7 @@ namespace ps { /** * \brief Environment configurations + * 一个单例模式的环境变量类,它通过一个 std::unordered_map kvs 维护了一组 kvs 借以保存所有环境变量名以及值; */ class Environment { public: diff --git a/include/ps/internal/message.h b/include/ps/internal/message.h index 11501063..93a66997 100644 --- a/include/ps/internal/message.h +++ b/include/ps/internal/message.h @@ -59,6 +59,7 @@ DataType GetDataType() { } /** * \brief information about a node + * 信息类,存储了本节点的对应信息,每个 Node 可以使用 hostname + port 来唯一标识。 */ struct Node { /** \brief the empty value */ @@ -120,6 +121,14 @@ struct Control { return ss.str(); } /** \brief all commands */ + /** + * ADD_NODE:worker和server向shceduler进行节点注册; + * BARRIER:节点间的同步阻塞消息; + * HEARTBEAT:节点间的心跳信号; + * TERMINATE:节点退出信号; + * ACK:确认消息,ACK 类型只有启用了 Resender 类才会出现。 + * EMPTY:push or pull; + */ enum Command { EMPTY, TERMINATE, ADD_NODE, BARRIER, ACK, HEARTBEAT }; /** \brief the command */ Command cmd; @@ -201,6 +210,22 @@ struct Meta { /** * \brief messages that communicated amaong nodes. */ + +/** + * Message 是要发送的信息,具体如下: + * 消息头 meta:就是元数据(使用了Protobuf 进行数据压缩),包括: + * 控制信息(Control)表示这个消息表示的意义(例如终止,确认ACK,同步等),具体包括: + * 命令类型; + * 节点列表(vector),节点包括: + * 节点的角色、ip, port、id、是否是恢复节点 + * group id表示这个控制命令对谁执行; + * 方法签名; + * 发送者; + * 接受者; + * 时间戳; + * ... + * 消息体 body:就是发送的数据,使用了自定义的 SArray 共享数据,减少数据拷贝; +*/ struct Message { /** \brief the meta info of this message */ Meta meta; diff --git a/include/ps/internal/postoffice.h b/include/ps/internal/postoffice.h index 883c5071..09bfbf04 100644 --- a/include/ps/internal/postoffice.h +++ b/include/ps/internal/postoffice.h @@ -15,6 +15,18 @@ namespace ps { /** * \brief the center of the system + * 一个单例模式的全局管理类,一个 node 在生命期内具有一个PostOffice,依赖它的类成员对Node进行管理; + * 具有如下特点: + * 三种Node角色都依赖 Postoffice 进行管理,每一个 node 在生命期内具有一个单例 PostOffice。 + * 如我们之前所说,ps-lite的特点是 worker, server, scheduler 都使用同一套代码,Postoffice也是如此,所以我们最好分开描述。 + * 在 Scheduler侧,顾名思义,Postoffice 是邮局,可以认为是一个地址簿,一个调控中心,其记录了系统(由scheduler,server, worker 集体构成的这个系统)中所有节点的信息。具体功能如下: + * 维护了一个Van对象,负责整个网络的拉起、通信、命令管理如增加节点、移除节点、恢复节点等等; + * 负责整个集群基本信息的管理,比如worker、server数的获取,管理所有节点的地址,server 端 feature分布的获取,worker/server Rank与node id的互转,节点角色身份等等; + * 负责 Barrier 功能; + * 在 Server / Worker 端,负责: + * 配置当前node的一些信息,例如当前node是哪种类型(server,worker),nodeid是啥,以及worker/server 的rank 到 node id的转换。 + * 路由功能:负责 key 与 server 的对应关系。 + * Barrier 功能; */ class Postoffice { public: @@ -33,14 +45,14 @@ class Postoffice { * \param argv0 the program name, used for logging. * \param do_barrier whether to block until every nodes are started. */ - void Start(int customer_id, const char* argv0, const bool do_barrier); + void Start(int customer_id, const char* argv0, const bool do_barrier); //建立通信初始化 /** * \brief terminate the system * * All nodes should call this function before existing. * \param do_barrier whether to do block until every node is finalized, default true. */ - void Finalize(const int customer_id, const bool do_barrier = true); + void Finalize(const int customer_id, const bool do_barrier = true); //节点阻塞退出 /** * \brief add an customer to the system. threadsafe */ @@ -71,6 +83,9 @@ class Postoffice { /** * \brief return the key ranges of all server nodes */ + /** + * 将int范围按照server个数均分 + */ const std::vector& GetServerKeyRanges(); /** * \brief the template of a callback @@ -98,6 +113,10 @@ class Postoffice { * \brief convert from a worker rank into a node id * \param rank the worker rank */ + /** + * 逻辑rankid映射到物理node id,Node id 是物理节点的唯一标识,可以和一个 host + port 的二元组唯一对应,1-7 的id表示的是node group,单个节点的id 就从 8 开始 + * 而且这个算法保证server id为偶数,worker id为奇数 + */ static inline int WorkerRankToID(int rank) { return rank * 2 + 9; } @@ -142,16 +161,17 @@ class Postoffice { * \brief barrier * \param node_id the barrier group id */ - void Barrier(int customer_id, int node_group); + void Barrier(int customer_id, int node_group); //进入 barrier 阻塞状态 /** * \brief process a control message, called by van * \param the received message */ - void Manage(const Message& recv); + void Manage(const Message& recv); //退出 barrier 阻塞状态 /** * \brief update the heartbeat record map * \param node_id the \ref Node id * \param t the last received heartbeat time + * 存储了心跳关联的节点的活跃信息。键为节点编号,值为上次收到其 HEARTBEAT 消息的时间戳。 */ void UpdateHeartbeat(int node_id, time_t t) { std::lock_guard lk(heartbeat_mu_); @@ -161,23 +181,23 @@ class Postoffice { * \brief get node ids that haven't reported heartbeats for over t seconds * \param t timeout in sec */ - std::vector GetDeadNodes(int t = 60); + std::vector GetDeadNodes(int t = 60); //根据 heartbeats_ 获取已经 dead 的节点; private: Postoffice(); ~Postoffice() { delete van_; } - void InitEnvironment(); - Van* van_; + void InitEnvironment(); //初始化环境变量,创建 van 对象; + Van* van_; //底层通信对象 mutable std::mutex mu_; // app_id -> (customer_id -> customer pointer) - std::unordered_map> customers_; - std::unordered_map> node_ids_; + std::unordered_map> customers_; //本节点目前有哪些customer + std::unordered_map> node_ids_; //node id映射表 std::mutex server_key_ranges_mu_; - std::vector server_key_ranges_; - bool is_worker_, is_server_, is_scheduler_; - int num_servers_, num_workers_; - std::unordered_map > barrier_done_; + std::vector server_key_ranges_; //Server key 区间范围对象 + bool is_worker_, is_server_, is_scheduler_; //标注了本节点类型 + int num_servers_, num_workers_; //节点心跳对象 + std::unordered_map > barrier_done_; //Barrier 同步变量 int verbose_; std::mutex barrier_mu_; std::condition_variable barrier_cond_; diff --git a/include/ps/internal/van.h b/include/ps/internal/van.h index 45c3ffe9..2e2b9eeb 100644 --- a/include/ps/internal/van.h +++ b/include/ps/internal/van.h @@ -23,6 +23,8 @@ class PBMeta; * If environment variable PS_RESEND is set to be 1, then van will resend a * message if it no ACK messsage is received within PS_RESEND_TIMEOUT * millisecond + * 通信模块,负责与其他节点的网络通信和Message的实际收发工作。PostOffice持有一个Van成员;相当于邮局里有了地址簿,就需要有货车来负责拉送物件,Van 就是整个Parameter Server的通信模块 + * Van 负责具体的节点间通信。具体来说就是负责建立起节点之间的互相连接(例如Worker与Scheduler之间的连接),并且开启本地的receiving thread用来监听收到的message。 */ class Van { public: @@ -48,7 +50,7 @@ class Van { * control message, give it to postoffice::manager, otherwise, give it to the * corresponding app. */ - virtual void Start(int customer_id); + virtual void Start(int customer_id); //建立通信初始化; /** * \brief send a message, It is thread-safe @@ -83,6 +85,7 @@ class Van { protected: /** * \brief connect to a node + * 连接节点 */ virtual void Connect(const Node &node) = 0; @@ -91,18 +94,21 @@ class Van { * do multiple retries on binding the port. since it's possible that * different nodes on the same machine picked the same port * \return return the port binded, -1 if failed. + * 绑定到自己节点之上 */ virtual int Bind(const Node &node, int max_retry) = 0; /** * \brief block until received a message * \return the number of bytes received. -1 if failed or timeout + * 接收消息,用阻塞方式 */ virtual int RecvMsg(Message *msg) = 0; /** * \brief send a mesage * \return the number of bytes sent + * 发送消息 */ virtual int SendMsg(const Message &msg) = 0; @@ -121,17 +127,17 @@ class Van { */ void UnpackMeta(const char *meta_buf, int buf_size, Meta *meta); - Node scheduler_; - Node my_node_; - bool is_scheduler_; + Node scheduler_; //Scheduler 节点参数,每一个node都会记录Scheduler 节点的信息; + Node my_node_; //本节点参数。如果本节点是Scheduler,则 my_node_ 会指向上面的 scheduler_ ; + bool is_scheduler_; //本节点是否是 scheduler; std::mutex start_mu_; private: /** thread function for receving */ - void Receiving(); + void Receiving(); //接收消息线程的处理函数; /** thread function for heartbeat */ - void Heartbeat(); + void Heartbeat(); //发送心跳线程的处理函数; // node's address string (i.e. ip:port) -> node id // this map is updated when ip:port is received for the first time @@ -147,20 +153,21 @@ class Van { int num_servers_ = 0; int num_workers_ = 0; /** the thread for receiving messages */ - std::unique_ptr receiver_thread_; + std::unique_ptr receiver_thread_; //接收消息线程指针; /** the thread for sending heartbeat */ - std::unique_ptr heartbeat_thread_; - std::vector barrier_count_; + std::unique_ptr heartbeat_thread_; //发送心跳线程指针; + std::vector barrier_count_; //barrier 计数,用来记录登记节点数目,只有所有节点都登记之后,系统才到了 ready 状态,scheduler 才会给所有节点发送 ready 消息,系统才正式启动。 /** msg resender */ - Resender *resender_ = nullptr; + Resender *resender_ = nullptr; //重新发送消息指针; int drop_rate_ = 0; - std::atomic timestamp_{0}; - int init_stage = 0; + std::atomic timestamp_{0}; //message 自增 id,原子变量; + int init_stage = 0; //记录了目前连接到哪些 nodes; /** * \brief processing logic of AddNode message for scheduler + * scheduler 的 AddNode 消息处理函数; */ - void ProcessAddNodeCommandAtScheduler(Message *msg, Meta *nodes, + void ProcessAddNodeCommandAtScheduler(Message *msg, Meta *nodes, Meta *recovery_nodes); /** @@ -170,21 +177,25 @@ class Van { /** * \brief processing logic of AddNode message (run on each node) + * worker 和 server 的 AddNode 消息处理函数; */ void ProcessAddNodeCommand(Message *msg, Meta *nodes, Meta *recovery_nodes); /** * \brief processing logic of Barrier message (run on each node) + * Barrier 消息处理函数; */ void ProcessBarrierCommand(Message *msg); /** * \brief processing logic of AddNode message (run on each node) + * 心跳包处理函数; */ void ProcessHearbeat(Message *msg); /** * \brief processing logic of Data message + * 数据消息(push & pull)处理函数; */ void ProcessDataMsg(Message *msg); diff --git a/include/ps/kv_app.h b/include/ps/kv_app.h index 50bdde6b..2d897e11 100644 --- a/include/ps/kv_app.h +++ b/include/ps/kv_app.h @@ -436,24 +436,29 @@ class KVServer : public SimpleApp { /** * \brief an example handle adding pushed kv into store + * functor,用与处理server收到的来自worker的请求 */ template struct KVServerDefaultHandle { + // req_meta 是存储该请求的一些元信息,比如请求来自于哪个节点,发送给哪个节点等等 + // req_data 是发送过来的数据 + // server 是指向当前server对象的指针 void operator()( const KVMeta& req_meta, const KVPairs& req_data, KVServer* server) { size_t n = req_data.keys.size(); KVPairs res; - if (!req_meta.pull) { + if (!req_meta.pull) { //收到的是pull请求 CHECK_EQ(n, req_data.vals.size()); - } else { - res.keys = req_data.keys; res.vals.resize(n); + } else { //收到的是push请求 + res.keys = req_data.keys; + res.vals.resize(n); } for (size_t i = 0; i < n; ++i) { Key key = req_data.keys[i]; - if (req_meta.push) { - store[key] += req_data.vals[i]; + if (req_meta.push) { //push请求 + store[key] += req_data.vals[i]; //将相同key的value相加 } - if (req_meta.pull) { + if (req_meta.pull) { //pull请求 res.vals[i] = store[key]; } } @@ -667,7 +672,7 @@ int KVWorker::AddPullCB( // do check size_t total_key = 0, total_val = 0; - for (const auto& s : kvs) { + for (const auto& s : kvs) { // 进行有效性验证 Range range = FindRange(keys, s.keys.front(), s.keys.back()+1); CHECK_EQ(range.size(), s.keys.size()) << "unmatched keys size from one server"; diff --git a/include/ps/sarray.h b/include/ps/sarray.h index 4ad17dd9..29823ae6 100644 --- a/include/ps/sarray.h +++ b/include/ps/sarray.h @@ -37,6 +37,13 @@ namespace ps { * * \tparam V the value type */ +/** + * SArray 有如下特点: + * SArray 是共享数据的智能数组,提供类似 std::vector 的功能。 + * SArray 可以从 std::vector 构建出来。 + * SArray 可以像 C 指针一样拷贝赋值,当对某个SArray的引用为0时,就自动回收该SArray的内存。 + * 可以理解为一个零拷贝的vector,能兼容vector的数据结构。 +*/ template class SArray { public: diff --git a/include/ps/simple_app.h b/include/ps/simple_app.h index 9dd84464..edcdcfcc 100644 --- a/include/ps/simple_app.h +++ b/include/ps/simple_app.h @@ -10,6 +10,7 @@ namespace ps { /** * \brief the format for a received request or reponse for \ref SimpleApp + * KVServer和KVWorker的父类,它提供了简单的Request, Wait, Response,Process功能;KVServer和KVWorker分别根据自己的使命重写了这些功能; */ struct SimpleData { /** \brief the int head */ @@ -172,7 +173,7 @@ inline void SimpleApp::Process(const Message& msg) { recv.body = msg.meta.body; recv.timestamp = msg.meta.timestamp; recv.customer_id = msg.meta.customer_id; - if (msg.meta.request) { + if (msg.meta.request) { // 判断是request还是response,调用相应handle处理 CHECK(request_handle_); request_handle_(recv, this); } else { diff --git a/src/customer.cc b/src/customer.cc index 03d12065..69436008 100644 --- a/src/customer.cc +++ b/src/customer.cc @@ -8,6 +8,14 @@ namespace ps { const int Node::kEmpty = std::numeric_limits::max(); const int Meta::kEmpty = std::numeric_limits::max(); +/** + * @brief Construct a new Customer:: + * Customer object分别用传入构造函数的参数初始化 app_id_, custom_id_ , recv_handle成员 + * 调用PostOffice::AddCustomer将当前Customer注册到PostOffice; + * PostOffice的customers_成员: 在对应的app_id的元素上添加custom_id; + * PostOffice的barrier_done_成员将该custom_id的同步状态设为false + * 新起一个Receiving线程recv_thread_; + */ Customer::Customer(int app_id, int customer_id, const Customer::RecvHandle& recv_handle) : app_id_(app_id), customer_id_(customer_id), recv_handle_(recv_handle) { Postoffice::Get()->AddCustomer(this); @@ -24,11 +32,12 @@ Customer::~Customer() { int Customer::NewRequest(int recver) { std::lock_guard lk(tracker_mu_); - int num = Postoffice::Get()->GetNodeIDs(recver).size(); + int num = Postoffice::Get()->GetNodeIDs(recver).size(); // recver 可能会代表一个group tracker_.push_back(std::make_pair(num, 0)); - return tracker_.size() - 1; + return tracker_.size() - 1; // 代表此次请求的时间戳timestamp,后续customer使用这个值代表这个request } +//Worker pull 是异步操作,如果需要等待 pull 完成,则可以调用Wait来保证customer里面的request和response两者相等,即保证Pull完成后再做其他操作; void Customer::WaitRequest(int timestamp) { std::unique_lock lk(tracker_mu_); tracker_cond_.wait(lk, [this, timestamp]{ @@ -45,7 +54,19 @@ void Customer::AddResponse(int timestamp, int num) { std::lock_guard lk(tracker_mu_); tracker_[timestamp].second += num; } - +/** + * @brief + * worker节点 或者 server节点 在程序的最开始会执行Postoffice::start()。 + * Postoffice::start()会初始化节点信息,并且调用Van::start()。 + * Van::start() 启动一个本地线程,使用Van::Receiving()来持续监听收到的message。 + * Van::Receiving()接收后消息之后,根据不同命令执行不同动作。针对数据消息,如果需要下一步处理,会调用 ProcessDataMsg: + * 依据消息中的app id找到 Customer。 + * 将消息传递给Customer::Accept函数。 + * Customer::Accept() 函数将消息添加到一个队列recv_queue_; + * Customer 对象本身也会启动一个接受线程 recv_thread_,使用 Customer::Receiving() + * 从recv_queue_队列取消息。 + * 调用注册的recv_handle_函数对消息进行处理。 + */ void Customer::Receiving() { while (true) { Message recv; diff --git a/src/postoffice.cc b/src/postoffice.cc index e9f4386d..fb346e9f 100644 --- a/src/postoffice.cc +++ b/src/postoffice.cc @@ -41,6 +41,7 @@ void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier } // init node info. + // 对于所有的worker,进行node设置 for (int i = 0; i < num_workers_; ++i) { int id = WorkerRankToID(i); for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup, @@ -49,7 +50,7 @@ void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier node_ids_[g].push_back(id); } } - + // 对于所有的server,进行node设置 for (int i = 0; i < num_servers_; ++i) { int id = ServerRankToID(i); for (int g : {id, kServerGroup, kWorkerGroup + kServerGroup, @@ -58,7 +59,7 @@ void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier node_ids_[g].push_back(id); } } - + // 设置scheduler的node for (int g : {kScheduler, kScheduler + kServerGroup + kWorkerGroup, kScheduler + kWorkerGroup, kScheduler + kServerGroup}) { node_ids_[g].push_back(kScheduler); @@ -139,6 +140,11 @@ Customer* Postoffice::GetCustomer(int app_id, int customer_id, int timeout) cons return obj; } +/** + * @brief + * 每个节点在自己指定的命令运行完后会向schedular节点发送一个Control::BARRIER命令的请求并自己阻塞直到收到schedular对应的返回后才解除阻塞; + * schedular节点收到请求后则会在本地计数,看收到的请求数是否和barrier_group的数量是否相等,相等则表示每个机器都运行完指定的命令了,此时schedular节点会向barrier_group的每个机器发送一个返回的信息,并解除其阻塞。 + */ void Postoffice::Barrier(int customer_id, int node_group) { if (GetNodeIDs(node_group).size() <= 1) return; auto role = van_->my_node().role; @@ -160,12 +166,15 @@ void Postoffice::Barrier(int customer_id, int node_group) { req.meta.customer_id = customer_id; req.meta.control.barrier_group = node_group; req.meta.timestamp = van_->GetTimestamp(); - van_->Send(req); - barrier_cond_.wait(ulk, [this, customer_id] { + van_->Send(req); //给scheduler发给BARRIER + barrier_cond_.wait(ulk, [this, customer_id] { //然后等待 return barrier_done_[0][customer_id]; }); } +/** + * 将int范围根据server个数均分 +*/ const std::vector& Postoffice::GetServerKeyRanges() { server_key_ranges_mu_.lock(); if (server_key_ranges_.empty()) { @@ -189,10 +198,11 @@ void Postoffice::Manage(const Message& recv) { barrier_done_[recv.meta.app_id][customer_id] = true; } barrier_mu_.unlock(); - barrier_cond_.notify_all(); + barrier_cond_.notify_all(); //这里解除了barrier } } +//Scheduler 在处理 ADD_NODE 消息时候,会看看是否已经有死亡节点,具体判通过当前时间戳与心跳包接收时间戳之差判断是否alive。 std::vector Postoffice::GetDeadNodes(int t) { std::vector dead_nodes; if (!van_->IsReady() || t == 0) return dead_nodes; diff --git a/src/resender.h b/src/resender.h index 16614b26..108b13e8 100644 --- a/src/resender.h +++ b/src/resender.h @@ -11,6 +11,11 @@ namespace ps { /** * \brief resend a messsage if no ack is received within a given time + * 在分布式系统中,通信也是不可靠的,丢包、延时都是必须考虑的场景。PS Lite 设计了 Resender类来提高通信的可靠性,它引入了 ACK 机制。即: + * 每一个节点,对于收到的非 ACK/TERMINATE 消息,必须响应一个 ACK 消息。 + * 每一个节点,对于发送的每一个非 ACK/TERMINATE 消息,必须在本地缓存下来。存储的数据结构是一个 MAP,根据消息的内容生产唯一的键。 + * 每一个节点,对于收到的 ACK 消息,必须根据反馈的键从本地缓存中移除对应的消息。 + * 每一个节点运行一个监控线程,每隔 PS_RESEND_TIMEOUT 毫秒检查一下本地缓存。根据每个消息的发送时间戳和当前时间,找出超时的消息进行重发,并累加其重试次数。 */ class Resender { public: diff --git a/src/van.cc b/src/van.cc index a9348238..68f8781c 100644 --- a/src/van.cc +++ b/src/van.cc @@ -46,32 +46,51 @@ void Van::ProcessTerminateCommand() { ready_ = false; } +/** + * ProcessAddNodeCommandAtScheduler 是在 Scheduler 之内运行,是对控制类型消息的处理。 + * 对于Scheduler节点来说,scheduler收到所有worker和server的ADD_NODE的消息后进行节点id分配并应答,即,需要设定 最新的所有node的 全局rank 并发送给所有Worker和Server。 + * 当接受到所有 worker & server 的注册消息之后(nodes->control.node.size() == num_nodes): + * 将节点按照 ip + port 组合排序。 + * Scheduler 与所有注册的节点建立连接、更新心跳时间戳,给 scheduler所有连接的节点分配全局 rank。 + * 向所有的worker和server发送ADD_NODE消息(携带scheduler之中的所有node信息)。 + * 会把 ready_ = true; 即 scheduler 是一个 ready 状态了,不管 worker 和 server 是否确认收到ADD_NODE消息。 + * 而在接收端(worker & server)的,每一个本地Node的全局rank等信息是由接收端 receiver_thread_(其他函数)获取,就是得到了 scheduler 返回的这些 nodes 信息。 + * 如果 !recovery_nodes->control.node.empty(),这就表明是处理某些重启节点的注册行为: + * 查出心跳包超时的id,转存到dead_set之中。 + * 与重启节点建立连接(因为接收到了一个ADD_NODE),所以只与这个新重启节点建立连接即可(在代码中有 CHECK_EQ(recovery_nodes->control.node.size(), 1) 来确认重启节点为 1 个)。 + * 更新重启节点的心跳。 + * 因为新加入了重启节点,所以用一个发送达到两个目的: + * 向所有 recovery 的worker和server发送ADD_NODE消息(携带scheduler之中的目前所有node信息)。 + * 向 alive 节点发送 recovery 节点信息。 + * 这样,收到消息的节点会则分别与新节点相互建立连接; + */ void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes, Meta* recovery_nodes) { recovery_nodes->control.cmd = Control::ADD_NODE; time_t t = time(NULL); size_t num_nodes = Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers(); - if (nodes->control.node.size() == num_nodes) { + if (nodes->control.node.size() == num_nodes) { // scheduler收到所有worker和server的ADD_NODE的消息后进行节点id分配并应答 // sort the nodes according their ip and port, std::sort(nodes->control.node.begin(), nodes->control.node.end(), [](const Node& a, const Node& b) { - return (a.hostname.compare(b.hostname) | (a.port < b.port)) > 0; + return (a.hostname.compare(b.hostname) | (a.port < b.port)) > 0; //根据IP和port给worker,server排个序 }); // assign node rank for (auto& node : nodes->control.node) { + // 建立连接、更新心跳时间戳,给 scheduler所有连接的节点分配全局 rank。 std::string node_host_ip = node.hostname + ":" + std::to_string(node.port); - if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) { - CHECK_EQ(node.id, Node::kEmpty); + if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) { //如果ip:port不存在van_中的话 + CHECK_EQ(node.id, Node::kEmpty); //判断是不是初始化节点 int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) - : Postoffice::WorkerRankToID(num_workers_); + : Postoffice::WorkerRankToID(num_workers_); //如果是sever的话,就id产生一个id号,num_servers_初始化为0 PS_VLOG(1) << "assign rank=" << id << " to node " << node.DebugString(); - node.id = id; - Connect(node); + node.id = id; //将这个新节点的id赋值为id + Connect(node); //连接这个新节点, 即建立一个socket, 然后senders_[id] = sender; 就是将目标id的socket存放起来后面使用 Postoffice::Get()->UpdateHeartbeat(node.id, t); - connected_nodes_[node_host_ip] = id; + connected_nodes_[node_host_ip] = id; //既然 worker, server 已经发message来了,scheduler要把这个节点作为已经链接的节 } else { int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) @@ -86,6 +105,7 @@ void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes, nodes->control.cmd = Control::ADD_NODE; Message back; back.meta = *nodes; + // 向所有已经和schedular建立连接的worker节点/server节点 广播此 "节点的加入信息“,并把 节点 2 请求连接的信息放入meta信息中。 for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) { int recver_id = r; if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) { @@ -96,8 +116,8 @@ void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes, } PS_VLOG(1) << "the scheduler is connected to " << num_workers_ << " workers and " << num_servers_ << " servers"; - ready_ = true; - } else if (!recovery_nodes->control.node.empty()) { + ready_ = true; //scheduler已经准备好了 + } else if (!recovery_nodes->control.node.empty()) { // 节点没有收集完全 auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_); std::unordered_set dead_set(dead_nodes.begin(), dead_nodes.end()); // send back the recovery node @@ -122,18 +142,28 @@ void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes, } } +/** + * @brief + * 此函数作用是更新节点内部的node id 信息,也是分为两种情况,函数逻辑如下: + * 如果msg->meta.sender是Meta::kEmpty,即未设定,则处理此message的一定是Scheduler,会进入 if 分支。 + * 如果目前 nodes 的control.node数目小于 "配置的server数目 + 配置的worker数目",则说明是系统启动阶段,将当前消息的node信息加入到 control.node 之中。 + * 否则说明是系统运行阶段,应该是有些节点死掉重启后再次连接。那么,就从 nodes 的control.node 之中找到一个已经死掉的且节点role 与当前消息一致(同类型)的 node id,把这个 node id 赋给这个重启的节点。并且更新 nodes->control.node 和 recovery_nodes。 + * 下面就是普通节点处理的逻辑: + 即在 scheduler 传回来的所有节点信息中查找,目的是找到与自己的ip,port一致的节点。 + 如果找到,就更新本地节点信息(因为在本节点启动时候,并没有设置 node_id 这个信息,这个需要scheduler统一设置,从注释看,目的是为了使重新注册成为可能)。包括全局 rank 信息。 + */ void Van::UpdateLocalID(Message* msg, std::unordered_set* deadnodes_set, Meta* nodes, Meta* recovery_nodes) { auto& ctrl = msg->meta.control; size_t num_nodes = Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers(); // assign an id - if (msg->meta.sender == Meta::kEmpty) { + if (msg->meta.sender == Meta::kEmpty) { //如果sender未设定,则处理此message的一定是Scheduler CHECK(is_scheduler_); - CHECK_EQ(ctrl.node.size(), 1); + CHECK_EQ(ctrl.node.size(), 1); //msg中的control命令中的节点集合就是worker自己,所以就是1个节点 if (nodes->control.node.size() < num_nodes) { nodes->control.node.push_back(ctrl.node[0]); - } else { + } else { //如果所有work和server到齐了,就进入else // some node dies and restarts CHECK(ready_.load()); for (size_t i = 0; i < nodes->control.node.size() - 1; ++i) { @@ -154,7 +184,8 @@ void Van::UpdateLocalID(Message* msg, std::unordered_set* deadnodes_set, } } - // update my id + // update my id / 对普通的node,更新其rank,scheduler 节点不会起作用(因为找不到) + // schedule发给此work节点的消息,如果发现本地的ip和port和消息中的某个一点重合,那么就把本地节点的ID(初始化时候没有ID,只是等于Empty)改为schedule发过来的 node id。 for (size_t i = 0; i < ctrl.node.size(); ++i) { const auto& node = ctrl.node[i]; if (my_node_.hostname == node.hostname && my_node_.port == node.port) { @@ -190,19 +221,19 @@ void Van::ProcessHearbeat(Message* msg) { void Van::ProcessBarrierCommand(Message* msg) { auto& ctrl = msg->meta.control; - if (msg->meta.request) { + if (msg->meta.request) { // 如果 msg->meta.request 为true,说明是 scheduler 收到消息进行处理,scheduler收到了消息,因为Postoffice::Barrier函数 会在发送时候做设置为true if (barrier_count_.empty()) { barrier_count_.resize(8, 0); } int group = ctrl.barrier_group; - ++barrier_count_[group]; + ++barrier_count_[group]; // Scheduler会对Barrier请求进行计数 PS_VLOG(1) << "Barrier count for " << group << " : " << barrier_count_[group]; if (barrier_count_[group] == - static_cast(Postoffice::Get()->GetNodeIDs(group).size())) { + static_cast(Postoffice::Get()->GetNodeIDs(group).size())) { // 如果相等,说明已经收到了最后一个请求,所以发送解除 barrier 消息 barrier_count_[group] = 0; Message res; - res.meta.request = false; + res.meta.request = false; //回复时候,这里就是false res.meta.app_id = msg->meta.app_id; res.meta.customer_id = msg->meta.customer_id; res.meta.control.cmd = Control::BARRIER; @@ -215,11 +246,19 @@ void Van::ProcessBarrierCommand(Message* msg) { } } } - } else { + } else { //说明这里收到了 barrier respones,可以解除 barrier了。具体见上面的设置为false处。 Postoffice::Get()->Manage(*msg); } } +/** + * @brief + * 在 Van 中,我们可以看到,当处理数据消息时候,会: + * 依据消息中的 app_id 从Postoffice 之中得到 customer_id; + * 依据 customer_id 从 Postoffice 之中得到 Customer; + * 调用 Customer 的 Accept 方法来处理消息; + */ + void Van::ProcessDataMsg(Message* msg) { // data msg CHECK_NE(msg->meta.sender, Meta::kEmpty); @@ -234,11 +273,26 @@ void Van::ProcessDataMsg(Message* msg) { obj->Accept(*msg); } +/** + * @brief + * 查出心跳包超时的id,转存到dead_set之中。 + * 拿到收到消息里面的control信息。 + * 调用 UpdateLocalID,在其中: + * 如果是新node,Scheduler记录这个新的node。 + * 如果这个node是重启产生的,则将旧node的信息更新。 + * 如果是 scheduler,则: + * 调用 ProcessAddNodeCommandAtScheduler 收到所有worker和server的ADD_NODE 的消息后进行节点id分配并应答,即 设定最新的所有node的rank并发送给所有Worker和Server。 + * 如果不是 scheduler,说明 work & server 收到了 scheduler 回答的 ADD_NODE 消息,则: + * 如果自身是现有节点,则在 connected_nodes_ 之中不会找到这个新节点,则先有节点会调用 Connect 与新节点建立连接。 + * 如果自身是新节点,则会连接所有现有节点(非同类型)。 + * 在 connected_nodes_ 之中更新 全局节点信息,包括 global rank(本地Node的全局rank等信息是由receiver_thread_在这里获取); + * 最后设置 ready_ = true,即本节点也可以运行了,因为主线程会阻塞在其上。 + */ void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes, Meta* recovery_nodes) { - auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_); - std::unordered_set dead_set(dead_nodes.begin(), dead_nodes.end()); - auto& ctrl = msg->meta.control; + auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_); //查出心跳包超时的id + std::unordered_set dead_set(dead_nodes.begin(), dead_nodes.end()); //转存到dead_set之中 + auto& ctrl = msg->meta.control; //拿到收到消息里面的control信息 UpdateLocalID(msg, &dead_set, nodes, recovery_nodes); @@ -247,9 +301,10 @@ void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes, } else { for (const auto& node : ctrl.node) { std::string addr_str = node.hostname + ":" + std::to_string(node.port); - if (connected_nodes_.find(addr_str) == connected_nodes_.end()) { - Connect(node); - connected_nodes_[addr_str] = node.id; + if (connected_nodes_.find(addr_str) == connected_nodes_.end()) { // 现有连接中没有这个新节点 + //现有节点会在自己连接之中查找这个新节点,发现现有连接中没有这个新节点 + Connect(node); // 与新节点进行连接 + connected_nodes_[addr_str] = node.id; // 加入已经连接的节点 } if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_; if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_; @@ -259,6 +314,26 @@ void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes, } } +/** + * @brief + * Van对象的初始化函数作用就是依据本地节点类型的不同,做不同设置,从而启动端口,建立到scheduler的连结,启动接收消息线程,心跳线程等,这样就可以进行通信了。具体如下: + * 1.首先从环境变量中得到相关信息,比如scheduler 的 "ip,port"(这两个是预先设置的),本节点的角色(Worker/Server/Scheduler)等等,然后 初始化scheduler_这个成员变量; + * 2.如果本节点是 scheduler,则把 scheduler_ 赋值给 my_node_; + * 3.如果本节点不是 scheduler,则: + * (1)从系统中获取本节点的ip信息; + * (2)使用 GetAvailablePort 获取一个port; + * 4.使用 Bind 绑定一个端口; + * 5.调用 Connect 建立到 Scheduler 的连接(scheduler也连接到自己的那个预先设置的固定端口); + * 6.启动本地Node的接收消息线程receiver_thread_,执行Van::Receiving ; + * 7.如果本节点不是 scheduler,给 Scheduler 发送一个 ADD_NODE 消息,这样可以将本地Node的信息告知Scheduler,即注册到 scheduler; + * 8.然后进入等待状态,等待Scheduler通知 Ready(scheduler 会等待所有节点都完成注册后,统一发送 ready); 注意,这里 scheduler 节点也会等,但是不影响 scheduler 节点 的 recevie 线程接受处理消息; + * 9.Ready后启动心跳线程,建立到Scheduler的Heartbeat 连接; + * 关于7,8两点的进一步说明就是: + * 当worker和server节点绑定ip和port后,便向scheduler节点发送ADD_NODE message。 + * 当 scheduler收到所有worker和server的ADD_NODE message后,则依次应答ADD_NODE message, + * 各个节点在此过程中通过原子变量ready_等待上述过程完成。 + * + */ void Van::Start(int customer_id) { // get scheduler info start_mu_.lock(); @@ -270,6 +345,7 @@ void Van::Start(int customer_id) { atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT"))); scheduler_.role = Node::SCHEDULER; scheduler_.id = kScheduler; + // 确认本节点是scheduler节点 is_scheduler_ = Postoffice::Get()->is_scheduler(); // get my node info @@ -306,11 +382,14 @@ void Van::Start(int customer_id) { } // bind. + // 绑定接口,把本节点绑定到ip:port这个socket上,理论来说这个函数就是初始化了receiver_ my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40); PS_VLOG(1) << "Bind to " << my_node_.DebugString(); CHECK_NE(my_node_.port, -1) << "bind failed"; // connect to the scheduler + // 连接上scheduler_,由于本节点就是scheduler_,其实就是初始化senders_,由于发送的节点很多,所以这里是一个map + // 在这里就是senders_[1] = socket_1, socket_1中的body设置一点字符“ps1***”, 注意链接不是sendMsg Connect(scheduler_); // for debug use @@ -318,6 +397,7 @@ void Van::Start(int customer_id) { drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG")); } // start receiver + // 开启一个接收消息的线程,这里就是处理消息 receiver_thread_ = std::unique_ptr(new std::thread(&Van::Receiving, this)); init_stage++; @@ -326,6 +406,7 @@ void Van::Start(int customer_id) { if (!is_scheduler_) { // let the scheduler know myself + // worker和server节点会通过 ADD_NODE 消息把本地节点的信息告诉scheduler,比如角色,ip,port... Message msg; Node customer_specific_node = my_node_; customer_specific_node.customer_id = customer_id; @@ -337,6 +418,7 @@ void Van::Start(int customer_id) { } // wait until ready + // 等待 ready_ 从false变成true,当是scheduler的时候,必须要有等worker和server节点过来,不然一直都是阻塞在这,如果是 worker/server,则是等待 scheduler 发送系统allready消息。 while (!ready_.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } @@ -350,11 +432,14 @@ void Van::Start(int customer_id) { if (Environment::Get()->find("PS_RESEND_TIMEOUT")) { timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT")); } + // 如果设置了超时重传,就初始化resender_这个变量 resender_ = new Resender(timeout, 10, this); } + //启动了一个线程,每一个 Worker/Server 节点,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 发送一条 HEARTBEAT 消息: if (!is_scheduler_) { // start heartbeat thread + // 初始化心跳线程 heartbeat_thread_ = std::unique_ptr(new std::thread(&Van::Heartbeat, this)); } @@ -397,8 +482,8 @@ int Van::Send(const Message& msg) { } void Van::Receiving() { - Meta nodes; - Meta recovery_nodes; // store recovery nodes + Meta nodes; //只有 scheduler 在处理 ADD_NODE 时候会用到,存储目前 scheduler 内部拥有的所有 nodes; + Meta recovery_nodes; // store recovery nodes,只有 scheduler 在处理 ADD_NODE 时候会用到,存储目前 scheduler 内部拥有的所有 recovery nodes(康复重启的节点); recovery_nodes.control.cmd = Control::ADD_NODE; while (true) { @@ -419,24 +504,24 @@ void Van::Receiving() { PS_VLOG(2) << msg.DebugString(); } // duplicated message - if (resender_ && resender_->AddIncomming(msg)) continue; + if (resender_ && resender_->AddIncomming(msg)) continue; //重传确认机制 - if (!msg.meta.control.empty()) { + if (!msg.meta.control.empty()) { //如果是控制类型的消息 // control msg auto& ctrl = msg.meta.control; if (ctrl.cmd == Control::TERMINATE) { ProcessTerminateCommand(); break; } else if (ctrl.cmd == Control::ADD_NODE) { - ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes); + ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes); //当执行到这个位置的时候继续跳转 } else if (ctrl.cmd == Control::BARRIER) { ProcessBarrierCommand(&msg); } else if (ctrl.cmd == Control::HEARTBEAT) { - ProcessHearbeat(&msg); + ProcessHearbeat(&msg); // 发回Heartbeat的ACK } else { LOG(WARNING) << "Drop unknown typed message " << msg.DebugString(); } - } else { + } else { //非控制类型的消息处理方式 ProcessDataMsg(&msg); } } diff --git a/tests/local.sh b/tests/local.sh index 089b1747..1301f996 100755 --- a/tests/local.sh +++ b/tests/local.sh @@ -13,6 +13,11 @@ bin=$1 shift arg="$@" +# 首先启动Scheduler节点。这是要固定好Server和Worker数量,Scheduler节点管理所有节点的地址。 +# 启动Worker或Server节点。每个节点要知道Scheduler节点的IP、port。启动时连接Scheduler节点,绑定本地端口,并向Scheduler节点注册自己信息(报告自己的IP,port)。 +# Scheduler等待所有Worker节点都注册后,给其分配id,并把节点信息传送出去(例如Worker节点要知道Server节点IP和端口,Server节点要知道Worker节点的IP和端口)。此时Scheduler节点已经准备好。 +# Worker或Server接收到Scheduler传送的信息后,建立对应节点的连接。此时Worker或Server已经准备好,会正式启动。 + # start the scheduler export DMLC_PS_ROOT_URI='127.0.0.1' export DMLC_PS_ROOT_PORT=8000 diff --git a/tests/test_kv_app.cc b/tests/test_kv_app.cc index e1d3e891..96a1d029 100644 --- a/tests/test_kv_app.cc +++ b/tests/test_kv_app.cc @@ -8,7 +8,7 @@ void StartServer() { return; } auto server = new KVServer(0); - server->set_request_handle(KVServerDefaultHandle()); + server->set_request_handle(KVServerDefaultHandle()); //注册functor RegisterExitCallback([server](){ delete server; }); } @@ -32,7 +32,7 @@ void RunWorker() { int repeat = 50; std::vector ts; for (int i = 0; i < repeat; ++i) { - ts.push_back(kv.Push(keys, vals)); + ts.push_back(kv.Push(keys, vals)); //kv.Push()返回的是该请求的timestamp // to avoid too frequency push, which leads huge memory usage if (i > 10) kv.Wait(ts[ts.size()-10]); @@ -63,12 +63,12 @@ void RunWorker() { int main(int argc, char *argv[]) { // start system - Start(0); + Start(0); //Postoffice::start(),每个node都会调用到这里,但是在 Start 函数之中,会依据本次设定的角色来不同处理,只有角色为 scheduler 才会启动 Scheduler。 // setup server nodes - StartServer(); + StartServer(); //Server会在其中做有效执行,其他节点不会有效执行。 // run worker nodes - RunWorker(); + RunWorker(); //Worker会在其中做有效执行,其他节点不会有效执行。 // stop system - Finalize(0, true); + Finalize(0, true); //结束。每个节点都需要执行这个函数。 return 0; }