Skip to content

Commit

Permalink
[feat][mds] Add rpc for client metric push
Browse files Browse the repository at this point in the history
  • Loading branch information
meilirensheng2020 authored and ketor committed Jan 6, 2025
1 parent cb8ba79 commit 784caf6
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 1 deletion.
27 changes: 26 additions & 1 deletion dingofs/src/mds/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ FSStatusCode FsManager::CreateFs(const pb::mds::CreateFsRequest* request,
child_status = fsStorage_->Delete(fs_name);
if (child_status != FSStatusCode::OK) {
LOG(ERROR) << "CreateFs fail, " << error_map[failure_stage]
<< ", then delete fs fail" << ", fsName = " << fs_name
<< ", then delete fs fail"
<< ", fsName = " << fs_name
<< ", ret = " << FSStatusCode_Name(child_status);
return child_status;
}
Expand Down Expand Up @@ -985,6 +986,30 @@ void FsManager::CommitTx(const pb::mds::CommitTxRequest* request,
dlock_->UnLock(fs_name, uuid);
}

// set fs cluster statistics
void FsManager::SetFsStats(const pb::mds::SetFsStatsRequest* request,
pb::mds::SetFsStatsResponse* response) {
FsMetric::GetInstance().SetFsStats(request->fsname(), request->fsstatsdata());
response->set_statuscode(FSStatusCode::OK);
}

// get fs cluster statistics
void FsManager::GetFsStats(const pb::mds::GetFsStatsRequest* request,
pb::mds::GetFsStatsResponse* response) {
FSStatusCode ret = FsMetric::GetInstance().GetFsStats(
request->fsname(), response->mutable_fsstatsdata());
response->set_statuscode(ret);
}

// get fs cluster persecond statistics
void FsManager::GetFsPerSecondStats(
const pb::mds::GetFsPerSecondStatsRequest* request,
pb::mds::GetFsPerSecondStatsResponse* response) {
FSStatusCode ret = FsMetric::GetInstance().GetFsPerSecondStats(
request->fsname(), response->mutable_fsstatsdata());
response->set_statuscode(ret);
}

// after mds restart need rebuild mountpoint ttl recorder
void FsManager::RebuildTimeRecorder() {
std::vector<FsInfoWrapper> fs_infos;
Expand Down
9 changes: 9 additions & 0 deletions dingofs/src/mds/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ class FsManager {
void CommitTx(const pb::mds::CommitTxRequest* request,
pb::mds::CommitTxResponse* response);

void SetFsStats(const pb::mds::SetFsStatsRequest* request,
pb::mds::SetFsStatsResponse* response);

void GetFsStats(const pb::mds::GetFsStatsRequest* request,
pb::mds::GetFsStatsResponse* response);

void GetFsPerSecondStats(const pb::mds::GetFsPerSecondStatsRequest* request,
pb::mds::GetFsPerSecondStatsResponse* response);

// periodically check if the mount point is alive
void BackEndCheckMountPoint();
void CheckMountPoint();
Expand Down
36 changes: 36 additions & 0 deletions dingofs/src/mds/mds_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

#include <vector>

#include "dingofs/src/mds/metric/metric.h"

namespace dingofs {
namespace mds {

Expand Down Expand Up @@ -286,5 +288,39 @@ void MdsServiceImpl::CommitTx(::google::protobuf::RpcController* controller,
VLOG(3) << "CommitTx [response]: " << response->DebugString();
}

void MdsServiceImpl::SetFsStats(::google::protobuf::RpcController* controller,
const pb::mds::SetFsStatsRequest* request,
pb::mds::SetFsStatsResponse* response,
::google::protobuf::Closure* done) {
(void)controller;
brpc::ClosureGuard guard(done);
VLOG(9) << "SetFsStats [request]: " << request->DebugString();
fsManager_->SetFsStats(request, response);
VLOG(9) << "SetFsStats [response]: " << response->DebugString();
}

void MdsServiceImpl::GetFsStats(::google::protobuf::RpcController* controller,
const pb::mds::GetFsStatsRequest* request,
pb::mds::GetFsStatsResponse* response,
::google::protobuf::Closure* done) {
(void)controller;
brpc::ClosureGuard guard(done);
VLOG(9) << "GetFsStats [request]: " << request->DebugString();
fsManager_->GetFsStats(request, response);
VLOG(9) << "GetFsStats [response]: " << response->DebugString();
}

void MdsServiceImpl::GetFsPerSecondStats(
::google::protobuf::RpcController* controller,
const pb::mds::GetFsPerSecondStatsRequest* request,
pb::mds::GetFsPerSecondStatsResponse* response,
::google::protobuf::Closure* done) {
(void)controller;
brpc::ClosureGuard guard(done);
VLOG(9) << "GetFsStats [request]: " << request->DebugString();
fsManager_->GetFsPerSecondStats(request, response);
VLOG(9) << "GetFsStats [response]: " << response->DebugString();
}

} // namespace mds
} // namespace dingofs
15 changes: 15 additions & 0 deletions dingofs/src/mds/mds_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ class MdsServiceImpl : public pb::mds::MdsService {
pb::mds::CommitTxResponse* response,
::google::protobuf::Closure* done) override;

void SetFsStats(::google::protobuf::RpcController* controller,
const pb::mds::SetFsStatsRequest* request,
pb::mds::SetFsStatsResponse* response,
::google::protobuf::Closure* done);

void GetFsStats(::google::protobuf::RpcController* controller,
const pb::mds::GetFsStatsRequest* request,
pb::mds::GetFsStatsResponse* response,
::google::protobuf::Closure* done);

void GetFsPerSecondStats(::google::protobuf::RpcController* controller,
const pb::mds::GetFsPerSecondStatsRequest* request,
pb::mds::GetFsPerSecondStatsResponse* response,
::google::protobuf::Closure* done);

private:
std::shared_ptr<FsManager> fsManager_;
std::shared_ptr<ChunkIdAllocator> chunkIdAllocator_;
Expand Down
45 changes: 45 additions & 0 deletions dingofs/src/mds/metric/fs_metric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@

#include "dingofs/src/mds/metric/fs_metric.h"

#include "dingofs/proto/mds.pb.h"
#include "dingofs/src/mds/metric/metric.h"

namespace dingofs {
namespace mds {

using dingofs::pb::mds::FsStatsData;
using dingofs::pb::mds::FSStatusCode;

void FsMetric::OnMount(const std::string& fsname, const Mountpoint& mp) {
std::lock_guard<Mutex> lock(mtx_);

Expand All @@ -48,5 +54,44 @@ void FsMetric::OnUnMount(const std::string& fsname, const Mountpoint& mp) {
iter->second->OnUnMount(mp);
}

void FsMetric::SetFsStats(const std::string& fsname,
const FsStatsData& fs_stats_data) {
std::lock_guard<Mutex> lock(mtx_);

auto iter = fsStatsMetrics_.find(fsname);
if (iter == fsStatsMetrics_.end()) {
auto r = fsStatsMetrics_.emplace(fsname, new FSStatsMetric(fsname));
iter = r.first;
}

iter->second->SetFsStats(fs_stats_data);
}

FSStatusCode FsMetric::GetFsStats(const std::string& fsname,
FsStatsData* fs_stats_data) {
std::lock_guard<Mutex> lock(mtx_);

auto iter = fsStatsMetrics_.find(fsname);
if (iter == fsStatsMetrics_.end()) {
return FSStatusCode::UNKNOWN_ERROR;
}

iter->second->GetFsStats(fs_stats_data);
return FSStatusCode::OK;
}

FSStatusCode FsMetric::GetFsPerSecondStats(const std::string& fsname,
FsStatsData* fs_stats_data) {
std::lock_guard<Mutex> lock(mtx_);

auto iter = fsStatsMetrics_.find(fsname);
if (iter == fsStatsMetrics_.end()) {
return FSStatusCode::UNKNOWN_ERROR;
}

iter->second->GetFsPerSecondStats(fs_stats_data);
return FSStatusCode::OK;
}

} // namespace mds
} // namespace dingofs
9 changes: 9 additions & 0 deletions dingofs/src/mds/metric/fs_metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
namespace dingofs {
namespace mds {

using dingofs::pb::mds::FSStatusCode;

class FsMetric {
public:
static FsMetric& GetInstance() {
Expand All @@ -43,6 +45,11 @@ class FsMetric {

void OnMount(const std::string& fsname, const Mountpoint& mp);
void OnUnMount(const std::string& fsname, const Mountpoint& mp);
void SetFsStats(const std::string& fsname, const FsStatsData& fs_stats_data);
FSStatusCode GetFsStats(const std::string& fsname,
FsStatsData* fs_stats_data);
FSStatusCode GetFsPerSecondStats(const std::string& fsname,
FsStatsData* fs_stats_data);

private:
FsMetric() = default;
Expand All @@ -54,6 +61,8 @@ class FsMetric {
private:
Mutex mtx_;
std::unordered_map<std::string, std::unique_ptr<FsMountMetric>> metrics_;
std::unordered_map<std::string, std::unique_ptr<FSStatsMetric>>
fsStatsMetrics_;
};

} // namespace mds
Expand Down
39 changes: 39 additions & 0 deletions dingofs/src/mds/metric/metric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,44 @@ std::string FsMountMetric::Key(const Mountpoint& mp) {
std::to_string(mp.port()) + "_" + mp.path();
}

// set fs cluster statistics, fsStatsData contains delta statistics since last
// read
void FSStatsMetric::SetFsStats(const FsStatsData& fs_stats_data) {
readBytes_.count << fs_stats_data.readbytes();
readQps_.count << fs_stats_data.readqps();
writeBytes_.count << fs_stats_data.writebytes();
writeQps_.count << fs_stats_data.writeqps();
s3ReadBytes_.count << fs_stats_data.s3readbytes();
s3ReadQps_.count << fs_stats_data.s3readqps();
s3WriteBytes_.count << fs_stats_data.s3writebytes();
s3WriteQps_.count << fs_stats_data.s3writeqps();
}

// get fs cluster statistics, fsStatsData contains the sum of all the client
// total amount statistics
void FSStatsMetric::GetFsStats(FsStatsData* fs_stats_data) const {
fs_stats_data->set_readbytes(readBytes_.count.get_value());
fs_stats_data->set_readqps(readQps_.count.get_value());
fs_stats_data->set_writebytes(writeBytes_.count.get_value());
fs_stats_data->set_writeqps(writeQps_.count.get_value());
fs_stats_data->set_s3readbytes(s3ReadBytes_.count.get_value());
fs_stats_data->set_s3readqps(s3ReadQps_.count.get_value());
fs_stats_data->set_s3writebytes(s3WriteBytes_.count.get_value());
fs_stats_data->set_s3writeqps(s3WriteQps_.count.get_value());
}

// get fs cluster per second statistics, fsStatsData contains recently
// statistics for per second
void FSStatsMetric::GetFsPerSecondStats(FsStatsData* fs_stats_data) const {
fs_stats_data->set_readbytes(readBytes_.value.get_value());
fs_stats_data->set_readqps(readQps_.value.get_value());
fs_stats_data->set_writebytes(writeBytes_.value.get_value());
fs_stats_data->set_writeqps(writeQps_.value.get_value());
fs_stats_data->set_s3readbytes(s3ReadBytes_.value.get_value());
fs_stats_data->set_s3readqps(s3ReadQps_.value.get_value());
fs_stats_data->set_s3writebytes(s3WriteBytes_.value.get_value());
fs_stats_data->set_s3writeqps(s3WriteQps_.value.get_value());
}

} // namespace mds
} // namespace dingofs
40 changes: 40 additions & 0 deletions dingofs/src/mds/metric/metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
namespace dingofs {
namespace mds {

using dingofs::pb::mds::FsStatsData;
using dingofs::pb::mds::Mountpoint;

// Metric for a filesystem
Expand Down Expand Up @@ -70,6 +71,45 @@ class FsMountMetric {
MountPointMetric mps_;
};

// metric stats per second
struct PerSecondMetric {
bvar::Adder<uint64_t> count; // total count
bvar::PerSecond<bvar::Adder<uint64_t>> value; // average count persecond
PerSecondMetric(const std::string& prefix, const std::string& name)
: count(prefix, name + "_total_count"), value(prefix, name, &count, 1) {}
};

class FSStatsMetric {
public:
explicit FSStatsMetric(const std::string& fsname)
: fsname_(fsname),
readBytes_("fs_", fsname + "_read_bytes"),
readQps_("fs_", fsname + "_read_qps"),
writeBytes_("fs_", fsname + "_write_bytes"),
writeQps_("fs_", fsname + "_write_qps"),
s3ReadBytes_("fs_", fsname + "_s3_read_bytes"),
s3ReadQps_("fs_", fsname + "_s3_read_qps"),
s3WriteBytes_("fs_", fsname + "_s3_write_bytes"),
s3WriteQps_("fs_", fsname + "_s3_write_qps") {}

void SetFsStats(const FsStatsData& fs_stats_data);

void GetFsStats(FsStatsData* fs_stats_data) const;

void GetFsPerSecondStats(FsStatsData* fs_stats_data) const;

private:
std::string fsname_;
PerSecondMetric readBytes_;
PerSecondMetric readQps_;
PerSecondMetric writeBytes_;
PerSecondMetric writeQps_;
PerSecondMetric s3ReadBytes_;
PerSecondMetric s3ReadQps_;
PerSecondMetric s3WriteBytes_;
PerSecondMetric s3WriteQps_;
};

} // namespace mds
} // namespace dingofs

Expand Down
35 changes: 35 additions & 0 deletions dingofs/test/mds/fs_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,5 +358,40 @@ TEST_F(FSManagerTest, GetLatestTxId_ParamFsId) {
}
}

TEST_F(FSManagerTest, SetFsStats) {
{
SetFsStatsRequest request;
SetFsStatsResponse response;

FsStatsData fsstatsdata;
fsstatsdata.set_readbytes(8192);
fsstatsdata.set_writebytes(16384);
fsstatsdata.set_readqps(10);
fsstatsdata.set_writeqps(20);
fsstatsdata.set_s3readbytes(8192);
fsstatsdata.set_s3writebytes(16384);
fsstatsdata.set_s3readqps(30);
fsstatsdata.set_s3writeqps(40);

request.set_fsname("dingofs");
request.mutable_fsstatsdata()->CopyFrom(fsstatsdata);

fsManager_->SetFsStats(&request, &response);
ASSERT_EQ(response.statuscode(), FSStatusCode::OK);
}
}

TEST_F(FSManagerTest, GetFsStats) {
{
GetFsStatsRequest request;
GetFsStatsResponse response;

request.set_fsname("dingofs");

fsManager_->GetFsStats(&request, &response);
ASSERT_EQ(response.statuscode(), FSStatusCode::OK);
}
}

} // namespace mds
} // namespace dingofs

0 comments on commit 784caf6

Please sign in to comment.