Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc #18

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

misc #18

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) :
_exec_env(exec_env),
_master_info(master_info),
_topic_subscriber(new TopicSubscriber()) {
// TODO(yingchun): move these code into a init() function
for (auto& path : exec_env->store_paths()) {
try {
string dpp_download_path_str = path.path + DPP_PREFIX;
Expand Down Expand Up @@ -99,22 +100,23 @@ AgentServer::~AgentServer() { }

// TODO(lingbin): each task in the batch may have it own status or FE must check and
// resend request when something is wrong(BE may need some logic to guarantee idempotence.
// TODO(yingchun): means extractly once?
void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
Status ret_st;

// TODO check master_info here if it is the same with that of heartbeat rpc
if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
if (_master_info.network_address.hostname.empty() || _master_info.network_address.port == 0) {
Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
ret_st.to_thrift(&agent_result.status);
return;
}

for (auto task : tasks) {
for (const auto& task : tasks) {
VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
TTaskType::type task_type = task.task_type;
int64_t signature = task.signature;

#define HANDLE_TYPE(t_task_type, work_pool, req_member) \
#define HANDLE_TYPE(t_task_type, work_pool, req_member) \
case t_task_type: \
if (task.__isset.req_member) { \
work_pool->submit_task(task); \
Expand Down Expand Up @@ -231,7 +233,7 @@ void AgentServer::release_snapshot(TAgentResult& t_agent_result, const std::stri
Status ret_st;
OLAPStatus err_code = SnapshotManager::instance()->release_snapshot(snapshot_path);
if (err_code != OLAP_SUCCESS) {
LOG(WARNING) << "failt to release_snapshot. snapshot_path: " << snapshot_path
LOG(WARNING) << "failed to release_snapshot. snapshot_path: " << snapshot_path
<< ", err_code: " << err_code;
ret_st = Status::RuntimeError(strings::Substitute(
"fail to release_snapshot. err_code=$0", err_code));
Expand Down Expand Up @@ -266,6 +268,7 @@ void AgentServer::get_etl_status(TMiniLoadEtlStatusResult& t_agent_result,
Status status = _exec_env->etl_job_mgr()->get_job_state(request.mini_load_id, &t_agent_result);
if (!status.ok()) {
LOG(WARNING) << "fail to get job state. [id=" << request.mini_load_id << "]";
return;
}

VLOG_RPC << "success to get job state. [id=" << request.mini_load_id << ", status="
Expand All @@ -289,4 +292,4 @@ void AgentServer::delete_etl_files(TAgentResult& t_agent_result,
status.to_thrift(&t_agent_result.status);
}

} // namesapce doris
} // namespace doris
2 changes: 1 addition & 1 deletion be/src/agent/cgroups_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class CgroupsMgr {
std::mutex _update_cgroups_mtx;

// A static mapping from fe's resource type to cgroups file
static std::map<TResourceType::type, std::string> _s_resource_cgroups;
static std::map<TResourceType::type, std::string> _s_resource_cgroups;
};
}
#endif
Loading