Skip to content

Commit

Permalink
refactor and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed May 15, 2020
1 parent 273aad6 commit 6c9d046
Show file tree
Hide file tree
Showing 137 changed files with 29,814 additions and 1,211 deletions.
13 changes: 8 additions & 5 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) :
_exec_env(exec_env),
_master_info(master_info),
_topic_subscriber(new TopicSubscriber()) {
// TODO(yingchun): move these code into a init() function
for (auto& path : exec_env->store_paths()) {
try {
string dpp_download_path_str = path.path + DPP_PREFIX;
Expand Down Expand Up @@ -100,22 +101,23 @@ AgentServer::~AgentServer() { }

// TODO(lingbin): each task in the batch may have it own status or FE must check and
// resend request when something is wrong(BE may need some logic to guarantee idempotence.
// TODO(yingchun): means extractly once?
void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
Status ret_st;

// TODO check master_info here if it is the same with that of heartbeat rpc
if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
if (_master_info.network_address.hostname.empty() || _master_info.network_address.port == 0) {
Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
ret_st.to_thrift(&agent_result.status);
return;
}

for (auto task : tasks) {
for (const auto& task : tasks) {
VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
TTaskType::type task_type = task.task_type;
int64_t signature = task.signature;

#define HANDLE_TYPE(t_task_type, work_pool, req_member) \
#define HANDLE_TYPE(t_task_type, work_pool, req_member) \
case t_task_type: \
if (task.__isset.req_member) { \
work_pool->submit_task(task); \
Expand Down Expand Up @@ -232,7 +234,7 @@ void AgentServer::release_snapshot(TAgentResult& t_agent_result, const std::stri
Status ret_st;
OLAPStatus err_code = SnapshotManager::instance()->release_snapshot(snapshot_path);
if (err_code != OLAP_SUCCESS) {
LOG(WARNING) << "failt to release_snapshot. snapshot_path: " << snapshot_path
LOG(WARNING) << "failed to release_snapshot. snapshot_path: " << snapshot_path
<< ", err_code: " << err_code;
ret_st = Status::RuntimeError(strings::Substitute(
"fail to release_snapshot. err_code=$0", err_code));
Expand Down Expand Up @@ -267,6 +269,7 @@ void AgentServer::get_etl_status(TMiniLoadEtlStatusResult& t_agent_result,
Status status = _exec_env->etl_job_mgr()->get_job_state(request.mini_load_id, &t_agent_result);
if (!status.ok()) {
LOG(WARNING) << "fail to get job state. [id=" << request.mini_load_id << "]";
return;
}

VLOG_RPC << "success to get job state. [id=" << request.mini_load_id << ", status="
Expand All @@ -290,4 +293,4 @@ void AgentServer::delete_etl_files(TAgentResult& t_agent_result,
status.to_thrift(&t_agent_result.status);
}

} // namesapce doris
} // namespace doris
2 changes: 1 addition & 1 deletion be/src/agent/cgroups_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class CgroupsMgr {
std::mutex _update_cgroups_mtx;

// A static mapping from fe's resource type to cgroups file
static std::map<TResourceType::type, std::string> _s_resource_cgroups;
static std::map<TResourceType::type, std::string> _s_resource_cgroups;
};
}
#endif
Loading

0 comments on commit 6c9d046

Please sign in to comment.