Skip to content

Commit

Permalink
Add StatsManager in graphd (#1301)
Browse files Browse the repository at this point in the history
  • Loading branch information
laura-ding authored and dangleptr committed Nov 29, 2019
1 parent 0a29cd0 commit d79ed19
Show file tree
Hide file tree
Showing 60 changed files with 564 additions and 287 deletions.
2 changes: 2 additions & 0 deletions src/common/filter/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ set(FILTER_TEST_LIBS
$<TARGET_OBJECTS:filter_obj>
$<TARGET_OBJECTS:storage_client>
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
Expand Down
1 change: 1 addition & 0 deletions src/common/stats/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ nebula_add_library(
stats_obj
OBJECT
StatsManager.cpp
Stats.cpp
)

nebula_add_subdirectory(test)
Expand Down
54 changes: 54 additions & 0 deletions src/common/stats/Stats.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "stats/StatsManager.h"
#include "stats/Stats.h"

DEFINE_int32(histogram_bucketSize, 1000, "The width of each bucket");
DEFINE_uint32(histogram_min, 1, "The smallest value for the bucket range");
DEFINE_uint32(histogram_max, 1000 * 1000, "The largest value for the bucket range");

namespace nebula {
namespace stats {

Stats::Stats(const std::string& serverName, const std::string& moduleName) {
qpsStatId_ = StatsManager::registerStats(serverName + "_" + moduleName + "_qps");
errorQpsStatId_ = StatsManager::registerStats(serverName + "_" + moduleName + "_error_qps");
latencyStatId_ = StatsManager::registerHisto(serverName + "_" + moduleName + "_latency",
FLAGS_histogram_bucketSize, FLAGS_histogram_min, FLAGS_histogram_max);
}

// static
void Stats::addStatsValue(const Stats *stats, bool ok, int64_t latency, uint32_t count) {
if (stats == nullptr) {
return;
}
if (ok && stats->getQpsStatId() != 0) {
StatsManager::addValue(stats->getQpsStatId(), count);
}
if (!ok && stats->getErrorQpsStatId() != 0) {
StatsManager::addValue(stats->getErrorQpsStatId(), count);
}
if (stats->getLatencyStatId() != 0) {
StatsManager::addValue(stats->getLatencyStatId(), latency);
}
}

int32_t Stats::getQpsStatId() const {
return qpsStatId_;
}

int32_t Stats::getErrorQpsStatId() const {
return errorQpsStatId_;
}

int32_t Stats::getLatencyStatId() const {
return latencyStatId_;
}

} // namespace stats
} // namespace nebula

39 changes: 39 additions & 0 deletions src/common/stats/Stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef COMMON_STATS_STATS_H_
#define COMMON_STATS_STATS_H_

#include "stats/StatsManager.h"

namespace nebula {
namespace stats {

class Stats final {
public:
Stats() = default;
~Stats() = default;

explicit Stats(const std::string& serverName, const std::string& moduleName);

public:
static void addStatsValue(const Stats *stats, bool ok, int64_t latency = 0, uint32_t count = 1);

int32_t getQpsStatId() const;

int32_t getErrorQpsStatId() const;

int32_t getLatencyStatId() const;

private:
int32_t qpsStatId_{0};
int32_t errorQpsStatId_{0};
int32_t latencyStatId_{0};
};
} // namespace stats
} // namespace nebula

#endif // COMMON_STATS_STATS_H_
2 changes: 2 additions & 0 deletions src/common/stats/StatsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ int32_t StatsManager::registerHisto(folly::StringPiece counterName,
StatsManager::VT bucketSize,
StatsManager::VT min,
StatsManager::VT max) {
LOG(INFO) << "registerHisto, bucketSize: " << bucketSize
<< ", min: " << min << ", max: " << max;
using std::chrono::seconds;

auto& sm = get();
Expand Down
6 changes: 6 additions & 0 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ nebula_add_executable(
$<TARGET_OBJECTS:storage_client>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:thrift_obj>
Expand Down Expand Up @@ -59,6 +61,8 @@ nebula_add_executable(
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:storage_client>
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:thrift_obj>
Expand Down Expand Up @@ -98,6 +102,8 @@ nebula_add_executable(
$<TARGET_OBJECTS:storage_client>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:raftex_obj>
$<TARGET_OBJECTS:raftex_thrift_obj>
Expand Down
27 changes: 13 additions & 14 deletions src/graph/DeleteVertexExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ void DeleteVertexExecutor::execute() {
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(Status::Error("Internal Error"));
doError(Status::Error("Internal Error"),
ectx()->getGraphStats()->getDeleteVertexStats());
return;
}
auto rpcResp = std::move(resp).value();
Expand All @@ -63,8 +63,8 @@ void DeleteVertexExecutor::execute() {

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
doError(Status::Error("Internal Error"),
ectx()->getGraphStats()->getDeleteVertexStats());
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
Expand All @@ -76,8 +76,8 @@ void DeleteVertexExecutor::deleteEdges(std::vector<storage::cpp2::EdgeKey>* edge
auto cb = [this] (auto &&resp) {
auto completeness = resp.completeness();
if (completeness != 100) {
DCHECK(onError_);
onError_(Status::Error("Internal Error"));
doError(Status::Error("Internal Error"),
ectx()->getGraphStats()->getDeleteVertexStats());
return;
}
deleteVertex();
Expand All @@ -86,8 +86,8 @@ void DeleteVertexExecutor::deleteEdges(std::vector<storage::cpp2::EdgeKey>* edge

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
doError(Status::Error("Internal Error"),
ectx()->getGraphStats()->getDeleteVertexStats());
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
Expand All @@ -98,19 +98,18 @@ void DeleteVertexExecutor::deleteVertex() {
auto *runner = ectx()->rctx()->runner();
auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(Status::Error("Internal Error"));
doError(Status::Error("Internal Error"),
ectx()->getGraphStats()->getDeleteVertexStats());
return;
}
DCHECK(onFinish_);
onFinish_(Executor::ProcessControl::kNext);
doFinish(Executor::ProcessControl::kNext, ectx()->getGraphStats()->getDeleteVertexStats());
return;
};

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
doError(Status::Error("Internal Error"),
ectx()->getGraphStats()->getDeleteVertexStats());
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
Expand Down
10 changes: 9 additions & 1 deletion src/graph/ExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "base/Base.h"
#include "cpp/helpers.h"
#include "graph/RequestContext.h"
#include "graph/GraphStats.h"
#include "parser/SequentialSentences.h"
#include "meta/SchemaManager.h"
#include "meta/ClientBasedGflagsManager.h"
Expand All @@ -33,13 +34,15 @@ class ExecutionContext final : public cpp::NonCopyable, public cpp::NonMovable {
meta::SchemaManager *sm,
meta::ClientBasedGflagsManager *gflagsManager,
storage::StorageClient *storage,
meta::MetaClient *metaClient) {
meta::MetaClient *metaClient,
GraphStats* stats) {
rctx_ = std::move(rctx);
sm_ = sm;
gflagsManager_ = gflagsManager;
storageClient_ = storage;
metaClient_ = metaClient;
variableHolder_ = std::make_unique<VariableHolder>();
stats_ = stats;
}

~ExecutionContext();
Expand Down Expand Up @@ -68,13 +71,18 @@ class ExecutionContext final : public cpp::NonCopyable, public cpp::NonMovable {
return metaClient_;
}

GraphStats* getGraphStats() const {
return stats_;
}

private:
RequestContextPtr rctx_;
meta::SchemaManager *sm_{nullptr};
meta::ClientBasedGflagsManager *gflagsManager_{nullptr};
storage::StorageClient *storageClient_{nullptr};
meta::MetaClient *metaClient_{nullptr};
std::unique_ptr<VariableHolder> variableHolder_;
GraphStats *stats_{nullptr};
};

} // namespace graph
Expand Down
17 changes: 14 additions & 3 deletions src/graph/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ Status ExecutionEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExec
if (!addrs.ok()) {
return addrs.status();
}
metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor, std::move(addrs.value()));

stats_ = std::make_unique<GraphStats>();

metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor,
std::move(addrs.value()),
HostAddr(0, 0),
0,
false,
stats_->getMetaClientStats());
// load data try 3 time
bool loadDataOk = metaClient_->waitForMetadReady(3);
if (!loadDataOk) {
Expand All @@ -41,7 +49,9 @@ Status ExecutionEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExec

gflagsManager_ = std::make_unique<meta::ClientBasedGflagsManager>(metaClient_.get());

storage_ = std::make_unique<storage::StorageClient>(ioExecutor, metaClient_.get());
storage_ = std::make_unique<storage::StorageClient>(ioExecutor,
metaClient_.get(),
stats_->getStorageClientStats());
return Status::OK();
}

Expand All @@ -50,7 +60,8 @@ void ExecutionEngine::execute(RequestContextPtr rctx) {
schemaManager_.get(),
gflagsManager_.get(),
storage_.get(),
metaClient_.get());
metaClient_.get(),
stats_.get());
// TODO(dutor) add support to plan cache
auto plan = new ExecutionPlan(std::move(ectx));

Expand Down
2 changes: 2 additions & 0 deletions src/graph/ExecutionEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "meta/ClientBasedGflagsManager.h"
#include "meta/client/MetaClient.h"
#include "network/NetworkUtils.h"
#include "graph/GraphStats.h"
#include <folly/executors/IOThreadPoolExecutor.h>

/**
Expand Down Expand Up @@ -45,6 +46,7 @@ class ExecutionEngine final : public cpp::NonCopyable, public cpp::NonMovable {
std::unique_ptr<meta::ClientBasedGflagsManager> gflagsManager_;
std::unique_ptr<storage::StorageClient> storage_;
std::unique_ptr<meta::MetaClient> metaClient_;
std::unique_ptr<GraphStats> stats_;
};

} // namespace graph
Expand Down
4 changes: 4 additions & 0 deletions src/graph/ExecutionPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "base/Base.h"
#include "graph/ExecutionPlan.h"
#include "stats/StatsManager.h"

namespace nebula {
namespace graph {
Expand All @@ -20,6 +21,7 @@ void ExecutionPlan::execute() {
if (!result.ok()) {
status = std::move(result).status();
LOG(ERROR) << status;
stats::Stats::addStatsValue(ectx()->getGraphStats()->getParseErrorStats(), false);
break;
}

Expand Down Expand Up @@ -56,6 +58,7 @@ void ExecutionPlan::onFinish() {
auto *rctx = ectx()->rctx();
executor_->setupResponse(rctx->resp());
auto latency = rctx->duration().elapsedInUSec();
stats::Stats::addStatsValue(ectx()->getGraphStats()->getGraphAllStats(), true, latency);
rctx->resp().set_latency_in_us(latency);
auto &spaceName = rctx->session()->spaceName();
rctx->resp().set_space_name(spaceName);
Expand All @@ -80,6 +83,7 @@ void ExecutionPlan::onError(Status status) {
}
rctx->resp().set_error_msg(status.toString());
auto latency = rctx->duration().elapsedInUSec();
stats::Stats::addStatsValue(ectx()->getGraphStats()->getGraphAllStats(), false, latency);
rctx->resp().set_latency_in_us(latency);
rctx->finish();
delete this;
Expand Down
11 changes: 11 additions & 0 deletions src/graph/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,5 +378,16 @@ StatusOr<VariantType> Executor::transformDefaultValue(nebula::cpp2::SupportedTyp
return Status::OK();
}

void Executor::doError(Status status, const stats::Stats* stats, uint32_t count) const {
stats::Stats::addStatsValue(stats, false, duration().elapsedInUSec(), count);
DCHECK(onError_);
onError_(std::move(status));
}

void Executor::doFinish(ProcessControl pro, const stats::Stats* stats, uint32_t count) const {
stats::Stats::addStatsValue(stats, true, duration().elapsedInUSec(), count);
DCHECK(onFinish_);
onFinish_(pro);
}
} // namespace graph
} // namespace nebula
Loading

0 comments on commit d79ed19

Please sign in to comment.