Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add commit #190

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/ps/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include "ps/internal/utils.h"
namespace ps {

/**
* ps-lite的key只支持int类型
*/
#if USE_KEY32
/*! \brief Use unsigned 32-bit int as the key type */
using Key = uint32_t;
Expand Down
45 changes: 41 additions & 4 deletions include/ps/internal/customer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,28 @@ namespace ps {
*
* It has its own receiving thread which is able to process any message received
* from a remote node with `msg.meta.customer_id` equal to this customer's id
* 每个SimpleApp对象持有一个Customer类的成员,且Customer需要在PostOffice进行注册,该类主要负责:
* 1.跟踪由SimpleApp发送出去的消息的回复情况;
* 2.维护一个Node的消息队列,为Node接收消息;
* 一个 app 实例可以对应多个 Customer;
* Customer 需要注册到 Postoffice 之中;
*/
/**
* Customer 其实有两个功能:
* 作为一个发送方,用于追踪SimpleApp发送出去每个Request对应的Response情况;
* 作为接收方,因为有自己的接受线程和接受消息队列,所以Customer实际上是作为一个接受消息处理引擎(或者说是引擎的一部分)存在;
* 具体特点如下:
* 每个SimpleApp对象持有一个Customer类的成员,且Customer需要在PostOffice进行注册。
* 因为 Customer 同时又要处理Message 但是其本身并没有接管网络,因此实际的Response和Message需要外部调用者告诉它,所以功能和职责上有点分裂。
* 每一个连接对应一个Customer实例,每个Customer都与某个node id相绑定,代表当前节点发送到对应node id节点。连接对方的id和Customer实例的id相同。
* 新建一次request,会返回一个timestamp,这个timestamp会作为这次request的id,每次请求会自增1,相应的res也会自增1,调用wait时会保证 后续比如做Wait以此为ID识别。
*/
/**
* Van::ProcessDataMsg ---> Customer::Accept ---> Customer::recv_queue_ ---> Customer::recv_thread_ ---> Customer::recv_handle_
*/
/**
* 为什么 Server 端,app_id 与 customer_id 相等?猜测是在 ps 代码中,Server 端也是有多个 cusomer,但是出于精简目的,在 ps-lite 之中删除了这部分功能,因此在 ps-lite 之中,app_id 与 customer_id 相等。
*/
class Customer {
public:
/**
Expand Down Expand Up @@ -99,13 +120,29 @@ class Customer {

int customer_id_;

RecvHandle recv_handle_;
ThreadsafePQueue recv_queue_;
std::unique_ptr<std::thread> recv_thread_;
/**
* @brief
* 绑定Customer接收到request后的处理函数(SimpleApp::Process);
* Customer会新拉起一个线程,用于在customer生命周期内,使用recv_handle_来处理接受的请求,这里是使用了一个线程安全队列,Accept()用于往队列中一直发送消息,
* 接受到的消息来自于Van的receiving thread,即每个节点的Van对象收到message后,根据message的不同,推送到不同的customer对象中。
* 对于Worker,比如KVWorker,recv_handle_保存拉取的msg中的数据,
* 对于Server,需要使用set_request_handle来设置对应的处理函数,如KVServerDefaultHandle,
*/
RecvHandle recv_handle_; //worker 或者 server 的消息处理函数。
ThreadsafePQueue recv_queue_; //线程安全的消息队列;
std::unique_ptr<std::thread> recv_thread_; //不断从 recv_queue 读取message并调用 recv_handle_;

std::mutex tracker_mu_;
std::condition_variable tracker_cond_;
std::vector<std::pair<int, int>> tracker_;
/**
* @brief
* tracker_是Customer内用来记录request和response的状态的map。记录了每个 request(使用request id)可能发送了多少节点 以及 从多少个节点返回的 response的次数,
* tracker_下标为每个request 的timestamp,即Request编号。
* tracker_[i] . first 表示该请求发送给了多少节点,即本节点应收到的Response数量。
* tracker_[i] . second 表示目前为止实际收到的Response数量。
*/
std::vector<std::pair<int, int>> tracker_; //request & response 的同步变量。


DISALLOW_COPY_AND_ASSIGN(Customer);
};
Expand Down
1 change: 1 addition & 0 deletions include/ps/internal/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace ps {

/**
* \brief Environment configurations
* 一个单例模式的环境变量类,它通过一个 std::unordered_map<std::string, std::string> kvs 维护了一组 kvs 借以保存所有环境变量名以及值;
*/
class Environment {
public:
Expand Down
25 changes: 25 additions & 0 deletions include/ps/internal/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ DataType GetDataType() {
}
/**
* \brief information about a node
* 信息类,存储了本节点的对应信息,每个 Node 可以使用 hostname + port 来唯一标识。
*/
struct Node {
/** \brief the empty value */
Expand Down Expand Up @@ -120,6 +121,14 @@ struct Control {
return ss.str();
}
/** \brief all commands */
/**
* ADD_NODE:worker和server向shceduler进行节点注册;
* BARRIER:节点间的同步阻塞消息;
* HEARTBEAT:节点间的心跳信号;
* TERMINATE:节点退出信号;
* ACK:确认消息,ACK 类型只有启用了 Resender 类才会出现。
* EMPTY:push or pull;
*/
enum Command { EMPTY, TERMINATE, ADD_NODE, BARRIER, ACK, HEARTBEAT };
/** \brief the command */
Command cmd;
Expand Down Expand Up @@ -201,6 +210,22 @@ struct Meta {
/**
* \brief messages that communicated amaong nodes.
*/

/**
* Message 是要发送的信息,具体如下:
* 消息头 meta:就是元数据(使用了Protobuf 进行数据压缩),包括:
* 控制信息(Control)表示这个消息表示的意义(例如终止,确认ACK,同步等),具体包括:
* 命令类型;
* 节点列表(vector),节点包括:
* 节点的角色、ip, port、id、是否是恢复节点
* group id表示这个控制命令对谁执行;
* 方法签名;
* 发送者;
* 接受者;
* 时间戳;
* ...
* 消息体 body:就是发送的数据,使用了自定义的 SArray 共享数据,减少数据拷贝;
*/
struct Message {
/** \brief the meta info of this message */
Meta meta;
Expand Down
46 changes: 33 additions & 13 deletions include/ps/internal/postoffice.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@
namespace ps {
/**
* \brief the center of the system
* 一个单例模式的全局管理类,一个 node 在生命期内具有一个PostOffice,依赖它的类成员对Node进行管理;
* 具有如下特点:
* 三种Node角色都依赖 Postoffice 进行管理,每一个 node 在生命期内具有一个单例 PostOffice。
* 如我们之前所说,ps-lite的特点是 worker, server, scheduler 都使用同一套代码,Postoffice也是如此,所以我们最好分开描述。
* 在 Scheduler侧,顾名思义,Postoffice 是邮局,可以认为是一个地址簿,一个调控中心,其记录了系统(由scheduler,server, worker 集体构成的这个系统)中所有节点的信息。具体功能如下:
* 维护了一个Van对象,负责整个网络的拉起、通信、命令管理如增加节点、移除节点、恢复节点等等;
* 负责整个集群基本信息的管理,比如worker、server数的获取,管理所有节点的地址,server 端 feature分布的获取,worker/server Rank与node id的互转,节点角色身份等等;
* 负责 Barrier 功能;
* 在 Server / Worker 端,负责:
* 配置当前node的一些信息,例如当前node是哪种类型(server,worker),nodeid是啥,以及worker/server 的rank 到 node id的转换。
* 路由功能:负责 key 与 server 的对应关系。
* Barrier 功能;
*/
class Postoffice {
public:
Expand All @@ -33,14 +45,14 @@ class Postoffice {
* \param argv0 the program name, used for logging.
* \param do_barrier whether to block until every nodes are started.
*/
void Start(int customer_id, const char* argv0, const bool do_barrier);
void Start(int customer_id, const char* argv0, const bool do_barrier); //建立通信初始化
/**
* \brief terminate the system
*
* All nodes should call this function before existing.
* \param do_barrier whether to do block until every node is finalized, default true.
*/
void Finalize(const int customer_id, const bool do_barrier = true);
void Finalize(const int customer_id, const bool do_barrier = true); //节点阻塞退出
/**
* \brief add an customer to the system. threadsafe
*/
Expand Down Expand Up @@ -71,6 +83,9 @@ class Postoffice {
/**
* \brief return the key ranges of all server nodes
*/
/**
* 将int范围按照server个数均分
*/
const std::vector<Range>& GetServerKeyRanges();
/**
* \brief the template of a callback
Expand Down Expand Up @@ -98,6 +113,10 @@ class Postoffice {
* \brief convert from a worker rank into a node id
* \param rank the worker rank
*/
/**
* 逻辑rankid映射到物理node id,Node id 是物理节点的唯一标识,可以和一个 host + port 的二元组唯一对应,1-7 的id表示的是node group,单个节点的id 就从 8 开始
* 而且这个算法保证server id为偶数,worker id为奇数
*/
static inline int WorkerRankToID(int rank) {
return rank * 2 + 9;
}
Expand Down Expand Up @@ -142,16 +161,17 @@ class Postoffice {
* \brief barrier
* \param node_id the barrier group id
*/
void Barrier(int customer_id, int node_group);
void Barrier(int customer_id, int node_group); //进入 barrier 阻塞状态
/**
* \brief process a control message, called by van
* \param the received message
*/
void Manage(const Message& recv);
void Manage(const Message& recv); //退出 barrier 阻塞状态
/**
* \brief update the heartbeat record map
* \param node_id the \ref Node id
* \param t the last received heartbeat time
* 存储了心跳关联的节点的活跃信息。键为节点编号,值为上次收到其 HEARTBEAT 消息的时间戳。
*/
void UpdateHeartbeat(int node_id, time_t t) {
std::lock_guard<std::mutex> lk(heartbeat_mu_);
Expand All @@ -161,23 +181,23 @@ class Postoffice {
* \brief get node ids that haven't reported heartbeats for over t seconds
* \param t timeout in sec
*/
std::vector<int> GetDeadNodes(int t = 60);
std::vector<int> GetDeadNodes(int t = 60); //根据 heartbeats_ 获取已经 dead 的节点;

private:
Postoffice();
~Postoffice() { delete van_; }

void InitEnvironment();
Van* van_;
void InitEnvironment(); //初始化环境变量,创建 van 对象;
Van* van_; //底层通信对象
mutable std::mutex mu_;
// app_id -> (customer_id -> customer pointer)
std::unordered_map<int, std::unordered_map<int, Customer*>> customers_;
std::unordered_map<int, std::vector<int>> node_ids_;
std::unordered_map<int, std::unordered_map<int, Customer*>> customers_; //本节点目前有哪些customer
std::unordered_map<int, std::vector<int>> node_ids_; //node id映射表
std::mutex server_key_ranges_mu_;
std::vector<Range> server_key_ranges_;
bool is_worker_, is_server_, is_scheduler_;
int num_servers_, num_workers_;
std::unordered_map<int, std::unordered_map<int, bool> > barrier_done_;
std::vector<Range> server_key_ranges_; //Server key 区间范围对象
bool is_worker_, is_server_, is_scheduler_; //标注了本节点类型
int num_servers_, num_workers_; //节点心跳对象
std::unordered_map<int, std::unordered_map<int, bool> > barrier_done_; //Barrier 同步变量
int verbose_;
std::mutex barrier_mu_;
std::condition_variable barrier_cond_;
Expand Down
37 changes: 24 additions & 13 deletions include/ps/internal/van.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class PBMeta;
* If environment variable PS_RESEND is set to be 1, then van will resend a
* message if it no ACK messsage is received within PS_RESEND_TIMEOUT
* millisecond
* 通信模块,负责与其他节点的网络通信和Message的实际收发工作。PostOffice持有一个Van成员;相当于邮局里有了地址簿,就需要有货车来负责拉送物件,Van 就是整个Parameter Server的通信模块
* Van 负责具体的节点间通信。具体来说就是负责建立起节点之间的互相连接(例如Worker与Scheduler之间的连接),并且开启本地的receiving thread用来监听收到的message。
*/
class Van {
public:
Expand All @@ -48,7 +50,7 @@ class Van {
* control message, give it to postoffice::manager, otherwise, give it to the
* corresponding app.
*/
virtual void Start(int customer_id);
virtual void Start(int customer_id); //建立通信初始化;

/**
* \brief send a message, It is thread-safe
Expand Down Expand Up @@ -83,6 +85,7 @@ class Van {
protected:
/**
* \brief connect to a node
* 连接节点
*/
virtual void Connect(const Node &node) = 0;

Expand All @@ -91,18 +94,21 @@ class Van {
* do multiple retries on binding the port. since it's possible that
* different nodes on the same machine picked the same port
* \return return the port binded, -1 if failed.
* 绑定到自己节点之上
*/
virtual int Bind(const Node &node, int max_retry) = 0;

/**
* \brief block until received a message
* \return the number of bytes received. -1 if failed or timeout
* 接收消息,用阻塞方式
*/
virtual int RecvMsg(Message *msg) = 0;

/**
* \brief send a mesage
* \return the number of bytes sent
* 发送消息
*/
virtual int SendMsg(const Message &msg) = 0;

Expand All @@ -121,17 +127,17 @@ class Van {
*/
void UnpackMeta(const char *meta_buf, int buf_size, Meta *meta);

Node scheduler_;
Node my_node_;
bool is_scheduler_;
Node scheduler_; //Scheduler 节点参数,每一个node都会记录Scheduler 节点的信息;
Node my_node_; //本节点参数。如果本节点是Scheduler,则 my_node_ 会指向上面的 scheduler_ ;
bool is_scheduler_; //本节点是否是 scheduler;
std::mutex start_mu_;

private:
/** thread function for receving */
void Receiving();
void Receiving(); //接收消息线程的处理函数;

/** thread function for heartbeat */
void Heartbeat();
void Heartbeat(); //发送心跳线程的处理函数;

// node's address string (i.e. ip:port) -> node id
// this map is updated when ip:port is received for the first time
Expand All @@ -147,20 +153,21 @@ class Van {
int num_servers_ = 0;
int num_workers_ = 0;
/** the thread for receiving messages */
std::unique_ptr<std::thread> receiver_thread_;
std::unique_ptr<std::thread> receiver_thread_; //接收消息线程指针;
/** the thread for sending heartbeat */
std::unique_ptr<std::thread> heartbeat_thread_;
std::vector<int> barrier_count_;
std::unique_ptr<std::thread> heartbeat_thread_; //发送心跳线程指针;
std::vector<int> barrier_count_; //barrier 计数,用来记录登记节点数目,只有所有节点都登记之后,系统才到了 ready 状态,scheduler 才会给所有节点发送 ready 消息,系统才正式启动。
/** msg resender */
Resender *resender_ = nullptr;
Resender *resender_ = nullptr; //重新发送消息指针;
int drop_rate_ = 0;
std::atomic<int> timestamp_{0};
int init_stage = 0;
std::atomic<int> timestamp_{0}; //message 自增 id,原子变量;
int init_stage = 0; //记录了目前连接到哪些 nodes;

/**
* \brief processing logic of AddNode message for scheduler
* scheduler 的 AddNode 消息处理函数;
*/
void ProcessAddNodeCommandAtScheduler(Message *msg, Meta *nodes,
void ProcessAddNodeCommandAtScheduler(Message *msg, Meta *nodes,
Meta *recovery_nodes);

/**
Expand All @@ -170,21 +177,25 @@ class Van {

/**
* \brief processing logic of AddNode message (run on each node)
* worker 和 server 的 AddNode 消息处理函数;
*/
void ProcessAddNodeCommand(Message *msg, Meta *nodes, Meta *recovery_nodes);

/**
* \brief processing logic of Barrier message (run on each node)
* Barrier 消息处理函数;
*/
void ProcessBarrierCommand(Message *msg);

/**
* \brief processing logic of AddNode message (run on each node)
* 心跳包处理函数;
*/
void ProcessHearbeat(Message *msg);

/**
* \brief processing logic of Data message
* 数据消息(push & pull)处理函数;
*/
void ProcessDataMsg(Message *msg);

Expand Down
Loading