Skip to content

Commit

Permalink
[GCS] Move resource usage info to gcs resource manager (ray-project#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
ffbin authored Dec 25, 2020
1 parent cf9952a commit 25f9f0d
Show file tree
Hide file tree
Showing 25 changed files with 462 additions and 492 deletions.
80 changes: 40 additions & 40 deletions src/ray/gcs/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -488,40 +488,6 @@ class NodeInfoAccessor {
const std::shared_ptr<rpc::HeartbeatTableData> &data_ptr,
const StatusCallback &callback) = 0;

/// Report resource usage of a node to GCS asynchronously.
///
/// \param data_ptr The data that will be reported to GCS.
/// \param callback Callback that will be called after report finishes.
/// \return Status
virtual Status AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr,
const StatusCallback &callback) = 0;

/// Resend resource usage when GCS restarts from a failure.
virtual void AsyncReReportResourceUsage() = 0;

/// Return resources in last report. Used by light heartbeat.
std::shared_ptr<SchedulingResources> &GetLastResourceUsage() {
return last_resource_usage_;
}

/// Get newest resource usage of all nodes from GCS asynchronously.
///
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
virtual Status AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback) = 0;

/// Subscribe batched state of all nodes from GCS.
///
/// \param subscribe Callback that will be called each time when batch resource usage is
/// updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeBatchedResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &subscribe,
const StatusCallback &done) = 0;

/// Reestablish subscription.
/// This should be called when GCS server restarts from a failure.
/// PubSub server restart will cause GCS server restart. In this case, we need to
Expand Down Expand Up @@ -549,12 +515,6 @@ class NodeInfoAccessor {

protected:
NodeInfoAccessor() = default;

private:
/// Cache which stores resource usage in last report used to check if they are changed.
/// Used by light resource usage report.
std::shared_ptr<SchedulingResources> last_resource_usage_ =
std::make_shared<SchedulingResources>();
};

/// \class NodeResourceInfoAccessor
Expand Down Expand Up @@ -619,8 +579,48 @@ class NodeResourceInfoAccessor {
/// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;

/// Report resource usage of a node to GCS asynchronously.
///
/// \param data_ptr The data that will be reported to GCS.
/// \param callback Callback that will be called after report finishes.
/// \return Status
virtual Status AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr,
const StatusCallback &callback) = 0;

/// Resend resource usage when GCS restarts from a failure.
virtual void AsyncReReportResourceUsage() = 0;

/// Return resources in last report. Used by light heartbeat.
std::shared_ptr<SchedulingResources> &GetLastResourceUsage() {
return last_resource_usage_;
}

/// Get newest resource usage of all nodes from GCS asynchronously.
///
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
virtual Status AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback) = 0;

/// Subscribe batched state of all nodes from GCS.
///
/// \param subscribe Callback that will be called each time when batch resource usage is
/// updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeBatchedResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &subscribe,
const StatusCallback &done) = 0;

protected:
NodeResourceInfoAccessor() = default;

private:
/// Cache which stores resource usage in last report used to check if they are changed.
/// Used by light resource usage report.
std::shared_ptr<SchedulingResources> last_resource_usage_ =
std::make_shared<SchedulingResources>();
};

/// \class ErrorInfoAccessor
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/global_state_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ std::string GlobalStateAccessor::GetInternalConfig() {
std::unique_ptr<std::string> GlobalStateAccessor::GetAllResourceUsage() {
std::unique_ptr<std::string> resource_batch_data;
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAllResourceUsage(
RAY_CHECK_OK(gcs_client_->NodeResources().AsyncGetAllResourceUsage(
TransformForItemCallback<rpc::ResourceUsageBatchData>(resource_batch_data,
promise)));
promise.get_future().get();
Expand Down
184 changes: 93 additions & 91 deletions src/ray/gcs/gcs_client/service_based_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,92 +529,6 @@ Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat(
return Status::OK();
}

Status ServiceBasedNodeInfoAccessor::AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr, const StatusCallback &callback) {
absl::MutexLock lock(&mutex_);
cached_resource_usage_.mutable_resources()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportResourceUsage(
cached_resource_usage_,
[callback](const Status &status, const rpc::ReportResourceUsageReply &reply) {
if (callback) {
callback(status);
}
});
return Status::OK();
}

void ServiceBasedNodeInfoAccessor::AsyncReReportResourceUsage() {
absl::MutexLock lock(&mutex_);
if (cached_resource_usage_.has_resources()) {
RAY_LOG(INFO) << "Rereport resource usage.";
FillResourceUsageRequest(cached_resource_usage_);
client_impl_->GetGcsRpcClient().ReportResourceUsage(
cached_resource_usage_,
[](const Status &status, const rpc::ReportResourceUsageReply &reply) {});
}
}

void ServiceBasedNodeInfoAccessor::FillResourceUsageRequest(
rpc::ReportResourceUsageRequest &resources) {
if (RayConfig::instance().light_report_resource_usage_enabled()) {
SchedulingResources cached_resources = SchedulingResources(*GetLastResourceUsage());

auto resources_data = resources.mutable_resources();
resources_data->clear_resources_total();
for (const auto &resource_pair :
cached_resources.GetTotalResources().GetResourceMap()) {
(*resources_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}

resources_data->clear_resources_available();
resources_data->set_resources_available_changed(true);
for (const auto &resource_pair :
cached_resources.GetAvailableResources().GetResourceMap()) {
(*resources_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}

resources_data->clear_resource_load();
resources_data->set_resource_load_changed(true);
for (const auto &resource_pair :
cached_resources.GetLoadResources().GetResourceMap()) {
(*resources_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
}
}

Status ServiceBasedNodeInfoAccessor::AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback) {
rpc::GetAllResourceUsageRequest request;
client_impl_->GetGcsRpcClient().GetAllResourceUsage(
request,
[callback](const Status &status, const rpc::GetAllResourceUsageReply &reply) {
callback(reply.resource_usage_data());
RAY_LOG(DEBUG) << "Finished getting resource usage of all nodes, status = "
<< status;
});
return Status::OK();
}

Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchedResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
subscribe_batch_resource_usage_operation_ = [this,
subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
rpc::ResourceUsageBatchData resources_batch_data;
resources_batch_data.ParseFromString(data);
subscribe(resources_batch_data);
};
return client_impl_->GetGcsPubSub().Subscribe(RESOURCES_BATCH_CHANNEL, "",
on_subscribe, done);
};
return subscribe_batch_resource_usage_operation_(done);
}

void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) {
NodeID node_id = NodeID::FromBinary(node_info.node_id());
bool is_alive = (node_info.state() == GcsNodeInfo::ALIVE);
Expand Down Expand Up @@ -680,9 +594,6 @@ void ServiceBasedNodeInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar
fetch_node_data_operation_(fetch_all_done);
}));
}
if (subscribe_batch_resource_usage_operation_ != nullptr) {
RAY_CHECK_OK(subscribe_batch_resource_usage_operation_(nullptr));
}
} else {
if (fetch_node_data_operation_ != nullptr) {
fetch_node_data_operation_(fetch_all_done);
Expand Down Expand Up @@ -793,6 +704,79 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncUpdateResources(
return Status::OK();
}

Status ServiceBasedNodeResourceInfoAccessor::AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr, const StatusCallback &callback) {
absl::MutexLock lock(&mutex_);
cached_resource_usage_.mutable_resources()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportResourceUsage(
cached_resource_usage_,
[callback](const Status &status, const rpc::ReportResourceUsageReply &reply) {
if (callback) {
callback(status);
}
});
return Status::OK();
}

void ServiceBasedNodeResourceInfoAccessor::AsyncReReportResourceUsage() {
absl::MutexLock lock(&mutex_);
if (cached_resource_usage_.has_resources()) {
RAY_LOG(INFO) << "Rereport resource usage.";
FillResourceUsageRequest(cached_resource_usage_);
client_impl_->GetGcsRpcClient().ReportResourceUsage(
cached_resource_usage_,
[](const Status &status, const rpc::ReportResourceUsageReply &reply) {});
}
}

void ServiceBasedNodeResourceInfoAccessor::FillResourceUsageRequest(
rpc::ReportResourceUsageRequest &resources) {
if (RayConfig::instance().light_report_resource_usage_enabled()) {
SchedulingResources cached_resources = SchedulingResources(*GetLastResourceUsage());

auto resources_data = resources.mutable_resources();
resources_data->clear_resources_total();
for (const auto &resource_pair :
cached_resources.GetTotalResources().GetResourceMap()) {
(*resources_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}

resources_data->clear_resources_available();
resources_data->set_resources_available_changed(true);
for (const auto &resource_pair :
cached_resources.GetAvailableResources().GetResourceMap()) {
(*resources_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}

resources_data->clear_resource_load();
resources_data->set_resource_load_changed(true);
for (const auto &resource_pair :
cached_resources.GetLoadResources().GetResourceMap()) {
(*resources_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
}
}

Status ServiceBasedNodeResourceInfoAccessor::AsyncSubscribeBatchedResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
subscribe_batch_resource_usage_operation_ = [this,
subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
rpc::ResourceUsageBatchData resources_batch_data;
resources_batch_data.ParseFromString(data);
subscribe(resources_batch_data);
};
return client_impl_->GetGcsPubSub().Subscribe(RESOURCES_BATCH_CHANNEL, "",
on_subscribe, done);
};
return subscribe_batch_resource_usage_operation_(done);
}

Status ServiceBasedNodeResourceInfoAccessor::AsyncDeleteResources(
const NodeID &node_id, const std::vector<std::string> &resource_names,
const StatusCallback &callback) {
Expand Down Expand Up @@ -841,11 +825,29 @@ void ServiceBasedNodeResourceInfoAccessor::AsyncResubscribe(
RAY_LOG(DEBUG) << "Reestablishing subscription for node resource info.";
// If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
// server.
if (is_pubsub_server_restarted && subscribe_resource_operation_ != nullptr) {
RAY_CHECK_OK(subscribe_resource_operation_(nullptr));
if (is_pubsub_server_restarted) {
if (subscribe_resource_operation_ != nullptr) {
RAY_CHECK_OK(subscribe_resource_operation_(nullptr));
}
if (subscribe_batch_resource_usage_operation_ != nullptr) {
RAY_CHECK_OK(subscribe_batch_resource_usage_operation_(nullptr));
}
}
}

Status ServiceBasedNodeResourceInfoAccessor::AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback) {
rpc::GetAllResourceUsageRequest request;
client_impl_->GetGcsRpcClient().GetAllResourceUsage(
request,
[callback](const Status &status, const rpc::GetAllResourceUsageReply &reply) {
callback(reply.resource_usage_data());
RAY_LOG(DEBUG) << "Finished getting resource usage of all nodes, status = "
<< status;
});
return Status::OK();
}

ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl) {}
Expand Down
39 changes: 23 additions & 16 deletions src/ray/gcs/gcs_client/service_based_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,6 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
Status AsyncReportHeartbeat(const std::shared_ptr<rpc::HeartbeatTableData> &data_ptr,
const StatusCallback &callback) override;

Status AsyncReportResourceUsage(const std::shared_ptr<rpc::ResourcesData> &data_ptr,
const StatusCallback &callback) override;

void AsyncReReportResourceUsage() override;

/// Fill resource fields with cached resources. Used by light resource usage report.
void FillResourceUsageRequest(rpc::ReportResourceUsageRequest &resource_usage);

Status AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback) override;

Status AsyncSubscribeBatchedResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &subscribe,
const StatusCallback &done) override;

void AsyncResubscribe(bool is_pubsub_server_restarted) override;

Status AsyncSetInternalConfig(
Expand All @@ -193,7 +178,6 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
/// Save the subscribe operation in this function, so we can call it again when PubSub
/// server restarts from a failure.
SubscribeOperation subscribe_node_operation_;
SubscribeOperation subscribe_batch_resource_usage_operation_;

/// Save the fetch data operation in this function, so we can call it again when GCS
/// server restarts from a failure.
Expand Down Expand Up @@ -250,12 +234,35 @@ class ServiceBasedNodeResourceInfoAccessor : public NodeResourceInfoAccessor {
Status AsyncSubscribeToResources(const ItemCallback<rpc::NodeResourceChange> &subscribe,
const StatusCallback &done) override;

Status AsyncReportResourceUsage(const std::shared_ptr<rpc::ResourcesData> &data_ptr,
const StatusCallback &callback) override;

void AsyncReReportResourceUsage() override;

/// Fill resource fields with cached resources. Used by light resource usage report.
void FillResourceUsageRequest(rpc::ReportResourceUsageRequest &resource_usage);

Status AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback) override;

Status AsyncSubscribeBatchedResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &subscribe,
const StatusCallback &done) override;

void AsyncResubscribe(bool is_pubsub_server_restarted) override;

private:
// Mutex to protect the cached_resource_usage_ field.
absl::Mutex mutex_;

/// Save the resource usage data, so we can resend it again when GCS server restarts
/// from a failure.
rpc::ReportResourceUsageRequest cached_resource_usage_ GUARDED_BY(mutex_);

/// Save the subscribe operation in this function, so we can call it again when PubSub
/// server restarts from a failure.
SubscribeOperation subscribe_resource_operation_;
SubscribeOperation subscribe_batch_resource_usage_operation_;

ServiceBasedGcsClient *client_impl_;

Expand Down
Loading

0 comments on commit 25f9f0d

Please sign in to comment.