Skip to content

Commit

Permalink
updating vertex and edges operations return leader info when part's l…
Browse files Browse the repository at this point in the history
…eader chagne (#1353)
  • Loading branch information
zlcook authored and dangleptr committed Nov 29, 2019
1 parent 705a88d commit 0a29cd0
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 24 deletions.
12 changes: 12 additions & 0 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ class BaseProcessor {
}
}

void handleLeaderChanged(GraphSpaceID spaceId, PartitionID partId) {
auto addrRet = kvstore_->partLeader(spaceId, partId);
if (ok(addrRet)) {
auto leader = value(std::move(addrRet));
this->pushResultCode(cpp2::ErrorCode::E_LEADER_CHANGED, partId, leader);
} else {
LOG(ERROR) << "Fail to get part leader, spaceId: " << spaceId
<< ", partId: " << partId << ", ResultCode: " << error(addrRet);
this->pushResultCode(to(error(addrRet)), partId);
}
}

nebula::cpp2::HostAddr toThriftHost(const HostAddr& host) {
nebula::cpp2::HostAddr tHost;
tHost.set_ip(host.first);
Expand Down
20 changes: 6 additions & 14 deletions src/storage/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,15 @@ void BaseProcessor<RESP>::handleAsync(GraphSpaceID spaceId,
kvstore::ResultCode code) {
VLOG(3) << "partId:" << partId << ", code:" << static_cast<int32_t>(code);

cpp2::ResultCode thriftResult;
thriftResult.set_code(to(code));
thriftResult.set_part_id(partId);
if (code == kvstore::ResultCode::ERR_LEADER_CHANGED) {
nebula::cpp2::HostAddr leader;
auto addrRet = kvstore_->partLeader(spaceId, partId);
CHECK(ok(addrRet));
auto addr = value(std::move(addrRet));
leader.set_ip(addr.first);
leader.set_port(addr.second);
thriftResult.set_leader(std::move(leader));
}
bool finished = false;
{
std::lock_guard<std::mutex> lg(this->lock_);
if (thriftResult.code != cpp2::ErrorCode::SUCCEEDED) {
this->codes_.emplace_back(std::move(thriftResult));
if (code != kvstore::ResultCode::SUCCEEDED) {
if (code == kvstore::ResultCode::ERR_LEADER_CHANGED) {
handleLeaderChanged(spaceId, partId);
} else {
pushResultCode(to(code), partId);
}
}
this->callingNum_--;
if (this->callingNum_ == 0) {
Expand Down
17 changes: 12 additions & 5 deletions src/storage/UpdateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,16 +364,23 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) {
return std::string("");
},
[this, partId, edgeKey, req] (kvstore::ResultCode code) {
this->pushResultCode(this->to(code), partId);
if (code == kvstore::ResultCode::SUCCEEDED) {
onProcessFinished(req.get_return_columns().size());
} else {
LOG(ERROR) << "Failure update edge, spaceId: " << this->spaceId_
while (true) {
if (code == kvstore::ResultCode::SUCCEEDED) {
onProcessFinished(req.get_return_columns().size());
break;
}
LOG(ERROR) << "Fail to update edge, spaceId: " << this->spaceId_
<< ", partId: " << partId
<< ", src: " << edgeKey.get_src()
<< ", edge_type: " << edgeKey.get_edge_type()
<< ", dst: " << edgeKey.get_dst()
<< ", ranking: " << edgeKey.get_ranking();
if (code == kvstore::ResultCode::ERR_LEADER_CHANGED) {
handleLeaderChanged(this->spaceId_, partId);
break;
}
this->pushResultCode(to(code), partId);
break;
}
this->onFinished();
});
Expand Down
17 changes: 12 additions & 5 deletions src/storage/UpdateVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,19 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) {
return std::string("");
},
[this, partId, vId, req] (kvstore::ResultCode code) {
this->pushResultCode(this->to(code), partId);
if (code == kvstore::ResultCode::SUCCEEDED) {
onProcessFinished(req.get_return_columns().size());
} else {
LOG(ERROR) << "Failure update vertex, spaceId: " << this->spaceId_
while (true) {
if (code == kvstore::ResultCode::SUCCEEDED) {
onProcessFinished(req.get_return_columns().size());
break;
}
LOG(ERROR) << "Fail to update vertex, spaceId: " << this->spaceId_
<< ", partId: " << partId << ", vId: " << vId;
if (code == kvstore::ResultCode::ERR_LEADER_CHANGED) {
handleLeaderChanged(this->spaceId_, partId);
break;
}
this->pushResultCode(to(code), partId);
break;
}
this->onFinished();
});
Expand Down

0 comments on commit 0a29cd0

Please sign in to comment.