diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index e1e70d5c5c1fc..4adbc0dddc554 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -488,40 +488,6 @@ class NodeInfoAccessor { const std::shared_ptr &data_ptr, const StatusCallback &callback) = 0; - /// Report resource usage of a node to GCS asynchronously. - /// - /// \param data_ptr The data that will be reported to GCS. - /// \param callback Callback that will be called after report finishes. - /// \return Status - virtual Status AsyncReportResourceUsage( - const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; - - /// Resend resource usage when GCS restarts from a failure. - virtual void AsyncReReportResourceUsage() = 0; - - /// Return resources in last report. Used by light heartbeat. - std::shared_ptr &GetLastResourceUsage() { - return last_resource_usage_; - } - - /// Get newest resource usage of all nodes from GCS asynchronously. - /// - /// \param callback Callback that will be called after lookup finishes. - /// \return Status - virtual Status AsyncGetAllResourceUsage( - const ItemCallback &callback) = 0; - - /// Subscribe batched state of all nodes from GCS. - /// - /// \param subscribe Callback that will be called each time when batch resource usage is - /// updated. - /// \param done Callback that will be called when subscription is complete. - /// \return Status - virtual Status AsyncSubscribeBatchedResourceUsage( - const ItemCallback &subscribe, - const StatusCallback &done) = 0; - /// Reestablish subscription. /// This should be called when GCS server restarts from a failure. /// PubSub server restart will cause GCS server restart. In this case, we need to @@ -549,12 +515,6 @@ class NodeInfoAccessor { protected: NodeInfoAccessor() = default; - - private: - /// Cache which stores resource usage in last report used to check if they are changed. - /// Used by light resource usage report. - std::shared_ptr last_resource_usage_ = - std::make_shared(); }; /// \class NodeResourceInfoAccessor @@ -619,8 +579,48 @@ class NodeResourceInfoAccessor { /// \param is_pubsub_server_restarted Whether pubsub server is restarted. virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + /// Report resource usage of a node to GCS asynchronously. + /// + /// \param data_ptr The data that will be reported to GCS. + /// \param callback Callback that will be called after report finishes. + /// \return Status + virtual Status AsyncReportResourceUsage( + const std::shared_ptr &data_ptr, + const StatusCallback &callback) = 0; + + /// Resend resource usage when GCS restarts from a failure. + virtual void AsyncReReportResourceUsage() = 0; + + /// Return resources in last report. Used by light heartbeat. + std::shared_ptr &GetLastResourceUsage() { + return last_resource_usage_; + } + + /// Get newest resource usage of all nodes from GCS asynchronously. + /// + /// \param callback Callback that will be called after lookup finishes. + /// \return Status + virtual Status AsyncGetAllResourceUsage( + const ItemCallback &callback) = 0; + + /// Subscribe batched state of all nodes from GCS. + /// + /// \param subscribe Callback that will be called each time when batch resource usage is + /// updated. + /// \param done Callback that will be called when subscription is complete. + /// \return Status + virtual Status AsyncSubscribeBatchedResourceUsage( + const ItemCallback &subscribe, + const StatusCallback &done) = 0; + protected: NodeResourceInfoAccessor() = default; + + private: + /// Cache which stores resource usage in last report used to check if they are changed. + /// Used by light resource usage report. + std::shared_ptr last_resource_usage_ = + std::make_shared(); }; /// \class ErrorInfoAccessor diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index 5791515bc0d97..947c16b969735 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -178,7 +178,7 @@ std::string GlobalStateAccessor::GetInternalConfig() { std::unique_ptr GlobalStateAccessor::GetAllResourceUsage() { std::unique_ptr resource_batch_data; std::promise promise; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAllResourceUsage( + RAY_CHECK_OK(gcs_client_->NodeResources().AsyncGetAllResourceUsage( TransformForItemCallback(resource_batch_data, promise))); promise.get_future().get(); diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 0e610a68e505d..8a3fd8424942d 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -529,92 +529,6 @@ Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat( return Status::OK(); } -Status ServiceBasedNodeInfoAccessor::AsyncReportResourceUsage( - const std::shared_ptr &data_ptr, const StatusCallback &callback) { - absl::MutexLock lock(&mutex_); - cached_resource_usage_.mutable_resources()->CopyFrom(*data_ptr); - client_impl_->GetGcsRpcClient().ReportResourceUsage( - cached_resource_usage_, - [callback](const Status &status, const rpc::ReportResourceUsageReply &reply) { - if (callback) { - callback(status); - } - }); - return Status::OK(); -} - -void ServiceBasedNodeInfoAccessor::AsyncReReportResourceUsage() { - absl::MutexLock lock(&mutex_); - if (cached_resource_usage_.has_resources()) { - RAY_LOG(INFO) << "Rereport resource usage."; - FillResourceUsageRequest(cached_resource_usage_); - client_impl_->GetGcsRpcClient().ReportResourceUsage( - cached_resource_usage_, - [](const Status &status, const rpc::ReportResourceUsageReply &reply) {}); - } -} - -void ServiceBasedNodeInfoAccessor::FillResourceUsageRequest( - rpc::ReportResourceUsageRequest &resources) { - if (RayConfig::instance().light_report_resource_usage_enabled()) { - SchedulingResources cached_resources = SchedulingResources(*GetLastResourceUsage()); - - auto resources_data = resources.mutable_resources(); - resources_data->clear_resources_total(); - for (const auto &resource_pair : - cached_resources.GetTotalResources().GetResourceMap()) { - (*resources_data->mutable_resources_total())[resource_pair.first] = - resource_pair.second; - } - - resources_data->clear_resources_available(); - resources_data->set_resources_available_changed(true); - for (const auto &resource_pair : - cached_resources.GetAvailableResources().GetResourceMap()) { - (*resources_data->mutable_resources_available())[resource_pair.first] = - resource_pair.second; - } - - resources_data->clear_resource_load(); - resources_data->set_resource_load_changed(true); - for (const auto &resource_pair : - cached_resources.GetLoadResources().GetResourceMap()) { - (*resources_data->mutable_resource_load())[resource_pair.first] = - resource_pair.second; - } - } -} - -Status ServiceBasedNodeInfoAccessor::AsyncGetAllResourceUsage( - const ItemCallback &callback) { - rpc::GetAllResourceUsageRequest request; - client_impl_->GetGcsRpcClient().GetAllResourceUsage( - request, - [callback](const Status &status, const rpc::GetAllResourceUsageReply &reply) { - callback(reply.resource_usage_data()); - RAY_LOG(DEBUG) << "Finished getting resource usage of all nodes, status = " - << status; - }); - return Status::OK(); -} - -Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchedResourceUsage( - const ItemCallback &subscribe, - const StatusCallback &done) { - RAY_CHECK(subscribe != nullptr); - subscribe_batch_resource_usage_operation_ = [this, - subscribe](const StatusCallback &done) { - auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { - rpc::ResourceUsageBatchData resources_batch_data; - resources_batch_data.ParseFromString(data); - subscribe(resources_batch_data); - }; - return client_impl_->GetGcsPubSub().Subscribe(RESOURCES_BATCH_CHANNEL, "", - on_subscribe, done); - }; - return subscribe_batch_resource_usage_operation_(done); -} - void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) { NodeID node_id = NodeID::FromBinary(node_info.node_id()); bool is_alive = (node_info.state() == GcsNodeInfo::ALIVE); @@ -680,9 +594,6 @@ void ServiceBasedNodeInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar fetch_node_data_operation_(fetch_all_done); })); } - if (subscribe_batch_resource_usage_operation_ != nullptr) { - RAY_CHECK_OK(subscribe_batch_resource_usage_operation_(nullptr)); - } } else { if (fetch_node_data_operation_ != nullptr) { fetch_node_data_operation_(fetch_all_done); @@ -793,6 +704,79 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncUpdateResources( return Status::OK(); } +Status ServiceBasedNodeResourceInfoAccessor::AsyncReportResourceUsage( + const std::shared_ptr &data_ptr, const StatusCallback &callback) { + absl::MutexLock lock(&mutex_); + cached_resource_usage_.mutable_resources()->CopyFrom(*data_ptr); + client_impl_->GetGcsRpcClient().ReportResourceUsage( + cached_resource_usage_, + [callback](const Status &status, const rpc::ReportResourceUsageReply &reply) { + if (callback) { + callback(status); + } + }); + return Status::OK(); +} + +void ServiceBasedNodeResourceInfoAccessor::AsyncReReportResourceUsage() { + absl::MutexLock lock(&mutex_); + if (cached_resource_usage_.has_resources()) { + RAY_LOG(INFO) << "Rereport resource usage."; + FillResourceUsageRequest(cached_resource_usage_); + client_impl_->GetGcsRpcClient().ReportResourceUsage( + cached_resource_usage_, + [](const Status &status, const rpc::ReportResourceUsageReply &reply) {}); + } +} + +void ServiceBasedNodeResourceInfoAccessor::FillResourceUsageRequest( + rpc::ReportResourceUsageRequest &resources) { + if (RayConfig::instance().light_report_resource_usage_enabled()) { + SchedulingResources cached_resources = SchedulingResources(*GetLastResourceUsage()); + + auto resources_data = resources.mutable_resources(); + resources_data->clear_resources_total(); + for (const auto &resource_pair : + cached_resources.GetTotalResources().GetResourceMap()) { + (*resources_data->mutable_resources_total())[resource_pair.first] = + resource_pair.second; + } + + resources_data->clear_resources_available(); + resources_data->set_resources_available_changed(true); + for (const auto &resource_pair : + cached_resources.GetAvailableResources().GetResourceMap()) { + (*resources_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; + } + + resources_data->clear_resource_load(); + resources_data->set_resource_load_changed(true); + for (const auto &resource_pair : + cached_resources.GetLoadResources().GetResourceMap()) { + (*resources_data->mutable_resource_load())[resource_pair.first] = + resource_pair.second; + } + } +} + +Status ServiceBasedNodeResourceInfoAccessor::AsyncSubscribeBatchedResourceUsage( + const ItemCallback &subscribe, + const StatusCallback &done) { + RAY_CHECK(subscribe != nullptr); + subscribe_batch_resource_usage_operation_ = [this, + subscribe](const StatusCallback &done) { + auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { + rpc::ResourceUsageBatchData resources_batch_data; + resources_batch_data.ParseFromString(data); + subscribe(resources_batch_data); + }; + return client_impl_->GetGcsPubSub().Subscribe(RESOURCES_BATCH_CHANNEL, "", + on_subscribe, done); + }; + return subscribe_batch_resource_usage_operation_(done); +} + Status ServiceBasedNodeResourceInfoAccessor::AsyncDeleteResources( const NodeID &node_id, const std::vector &resource_names, const StatusCallback &callback) { @@ -841,11 +825,29 @@ void ServiceBasedNodeResourceInfoAccessor::AsyncResubscribe( RAY_LOG(DEBUG) << "Reestablishing subscription for node resource info."; // If the pub-sub server has also restarted, we need to resubscribe to the pub-sub // server. - if (is_pubsub_server_restarted && subscribe_resource_operation_ != nullptr) { - RAY_CHECK_OK(subscribe_resource_operation_(nullptr)); + if (is_pubsub_server_restarted) { + if (subscribe_resource_operation_ != nullptr) { + RAY_CHECK_OK(subscribe_resource_operation_(nullptr)); + } + if (subscribe_batch_resource_usage_operation_ != nullptr) { + RAY_CHECK_OK(subscribe_batch_resource_usage_operation_(nullptr)); + } } } +Status ServiceBasedNodeResourceInfoAccessor::AsyncGetAllResourceUsage( + const ItemCallback &callback) { + rpc::GetAllResourceUsageRequest request; + client_impl_->GetGcsRpcClient().GetAllResourceUsage( + request, + [callback](const Status &status, const rpc::GetAllResourceUsageReply &reply) { + callback(reply.resource_usage_data()); + RAY_LOG(DEBUG) << "Finished getting resource usage of all nodes, status = " + << status; + }); + return Status::OK(); +} + ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor( ServiceBasedGcsClient *client_impl) : client_impl_(client_impl) {} diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 167814bb25045..47c763c665328 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -165,21 +165,6 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { Status AsyncReportHeartbeat(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; - Status AsyncReportResourceUsage(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - void AsyncReReportResourceUsage() override; - - /// Fill resource fields with cached resources. Used by light resource usage report. - void FillResourceUsageRequest(rpc::ReportResourceUsageRequest &resource_usage); - - Status AsyncGetAllResourceUsage( - const ItemCallback &callback) override; - - Status AsyncSubscribeBatchedResourceUsage( - const ItemCallback &subscribe, - const StatusCallback &done) override; - void AsyncResubscribe(bool is_pubsub_server_restarted) override; Status AsyncSetInternalConfig( @@ -193,7 +178,6 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { /// Save the subscribe operation in this function, so we can call it again when PubSub /// server restarts from a failure. SubscribeOperation subscribe_node_operation_; - SubscribeOperation subscribe_batch_resource_usage_operation_; /// Save the fetch data operation in this function, so we can call it again when GCS /// server restarts from a failure. @@ -250,12 +234,35 @@ class ServiceBasedNodeResourceInfoAccessor : public NodeResourceInfoAccessor { Status AsyncSubscribeToResources(const ItemCallback &subscribe, const StatusCallback &done) override; + Status AsyncReportResourceUsage(const std::shared_ptr &data_ptr, + const StatusCallback &callback) override; + + void AsyncReReportResourceUsage() override; + + /// Fill resource fields with cached resources. Used by light resource usage report. + void FillResourceUsageRequest(rpc::ReportResourceUsageRequest &resource_usage); + + Status AsyncGetAllResourceUsage( + const ItemCallback &callback) override; + + Status AsyncSubscribeBatchedResourceUsage( + const ItemCallback &subscribe, + const StatusCallback &done) override; + void AsyncResubscribe(bool is_pubsub_server_restarted) override; private: + // Mutex to protect the cached_resource_usage_ field. + absl::Mutex mutex_; + + /// Save the resource usage data, so we can resend it again when GCS server restarts + /// from a failure. + rpc::ReportResourceUsageRequest cached_resource_usage_ GUARDED_BY(mutex_); + /// Save the subscribe operation in this function, so we can call it again when PubSub /// server restarts from a failure. SubscribeOperation subscribe_resource_operation_; + SubscribeOperation subscribe_batch_resource_usage_operation_; ServiceBasedGcsClient *client_impl_; diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.cc b/src/ray/gcs/gcs_client/service_based_gcs_client.cc index 900d3e50d30c6..cf9bdd9e4d4ed 100644 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.cc +++ b/src/ray/gcs/gcs_client/service_based_gcs_client.cc @@ -182,7 +182,7 @@ void ServiceBasedGcsClient::GcsServiceFailureDetected(rpc::GcsServiceFailureType // following flag is always false. resubscribe_func_(false); // Resend resource usage after reconnected, needed by resource view in GCS. - node_accessor_->AsyncReReportResourceUsage(); + node_resource_accessor_->AsyncReReportResourceUsage(); break; default: RAY_LOG(FATAL) << "Unsupported failure type: " << type; diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index 0df0c7763ad03..7af602808fc71 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -204,7 +204,7 @@ TEST_F(GlobalStateAccessorTest, TestGetAllResourceUsage) { std::promise promise1; auto resources1 = std::make_shared(); resources1->set_node_id(node_table_data->node_id()); - RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage( + RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage( resources1, [&promise1](Status status) { promise1.set_value(status.ok()); })); WaitReady(promise1.get_future(), timeout_ms_); @@ -220,7 +220,7 @@ TEST_F(GlobalStateAccessorTest, TestGetAllResourceUsage) { (*heartbeat2->mutable_resources_total())["GPU"] = 10; heartbeat2->set_resources_available_changed(true); (*heartbeat2->mutable_resources_available())["GPU"] = 5; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage( + RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage( heartbeat2, [&promise2](Status status) { promise2.set_value(status.ok()); })); WaitReady(promise2.get_future(), timeout_ms_); @@ -241,7 +241,7 @@ TEST_F(GlobalStateAccessorTest, TestGetAllResourceUsage) { heartbeat3->set_node_id(node_table_data->node_id()); (*heartbeat3->mutable_resources_available())["CPU"] = 1; (*heartbeat3->mutable_resources_available())["GPU"] = 6; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage( + RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage( heartbeat3, [&promise3](Status status) { promise3.set_value(status.ok()); })); WaitReady(promise3.get_future(), timeout_ms_); diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index b470598d0dbc6..577f15a0d6b6d 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -332,7 +332,7 @@ class ServiceBasedGcsClientTest : public ::testing::Test { bool SubscribeBatchResourceUsage( const gcs::ItemCallback &subscribe) { std::promise promise; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeBatchedResourceUsage( + RAY_CHECK_OK(gcs_client_->NodeResources().AsyncSubscribeBatchedResourceUsage( subscribe, [&promise](Status status) { promise.set_value(status.ok()); })); return WaitReady(promise.get_future(), timeout_ms_); } @@ -346,7 +346,7 @@ class ServiceBasedGcsClientTest : public ::testing::Test { bool ReportResourceUsage(const std::shared_ptr resources) { std::promise promise; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage( + RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage( resources, [&promise](Status status) { promise.set_value(status.ok()); })); return WaitReady(promise.get_future(), timeout_ms_); } diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 322b0349fe5a7..3724bac5e94df 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -23,16 +23,9 @@ namespace ray { namespace gcs { ////////////////////////////////////////////////////////////////////////////////////////// -GcsNodeManager::GcsNodeManager(boost::asio::io_service &main_io_service, - std::shared_ptr gcs_pub_sub, +GcsNodeManager::GcsNodeManager(std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage) - : resource_timer_(main_io_service), - light_report_resource_usage_enabled_( - RayConfig::instance().light_report_resource_usage_enabled()), - gcs_pub_sub_(gcs_pub_sub), - gcs_table_storage_(gcs_table_storage) { - SendBatchedResourceUsage(); -} + : gcs_pub_sub_(gcs_pub_sub), gcs_table_storage_(gcs_table_storage) {} void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request, rpc::RegisterNodeReply *reply, @@ -95,38 +88,6 @@ void GcsNodeManager::HandleGetAllNodeInfo(const rpc::GetAllNodeInfoRequest &requ ++counts_[CountType::GET_ALL_NODE_INFO_REQUEST]; } -void GcsNodeManager::HandleReportResourceUsage( - const rpc::ReportResourceUsageRequest &request, rpc::ReportResourceUsageReply *reply, - rpc::SendReplyCallback send_reply_callback) { - NodeID node_id = NodeID::FromBinary(request.resources().node_id()); - auto resources_data = std::make_shared(); - resources_data->CopyFrom(request.resources()); - - // We use `node_resource_usages_` to filter out the nodes that report resource - // information for the first time. `UpdateNodeResourceUsage` will modify - // `node_resource_usages_`, so we need to do it before `UpdateNodeResourceUsage`. - if (!light_report_resource_usage_enabled_ || - node_resource_usages_.count(node_id) == 0 || - resources_data->resources_available_changed()) { - const auto &resource_changed = MapFromProtobuf(resources_data->resources_available()); - for (auto &listener : node_resource_changed_listeners_) { - listener(node_id, resource_changed); - } - } - - UpdateNodeResourceUsage(node_id, request); - - if (!light_report_resource_usage_enabled_ || resources_data->should_global_gc() || - resources_data->resources_total_size() > 0 || - resources_data->resources_available_changed() || - resources_data->resource_load_changed()) { - resources_buffer_[node_id] = *resources_data; - } - - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); - ++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST]; -} - void GcsNodeManager::HandleSetInternalConfig(const rpc::SetInternalConfigRequest &request, rpc::SetInternalConfigReply *reply, rpc::SendReplyCallback send_reply_callback) { @@ -155,78 +116,6 @@ void GcsNodeManager::HandleGetInternalConfig(const rpc::GetInternalConfigRequest ++counts_[CountType::GET_INTERNAL_CONFIG_REQUEST]; } -void GcsNodeManager::HandleGetAllResourceUsage( - const rpc::GetAllResourceUsageRequest &request, rpc::GetAllResourceUsageReply *reply, - rpc::SendReplyCallback send_reply_callback) { - if (!node_resource_usages_.empty()) { - auto batch = std::make_shared(); - absl::flat_hash_map aggregate_load; - for (const auto &usage : node_resource_usages_) { - // Aggregate the load reported by each raylet. - auto load = usage.second.resource_load_by_shape(); - for (const auto &demand : load.resource_demands()) { - auto scheduling_key = ResourceSet(MapFromProtobuf(demand.shape())); - auto &aggregate_demand = aggregate_load[scheduling_key]; - aggregate_demand.set_num_ready_requests_queued( - aggregate_demand.num_ready_requests_queued() + - demand.num_ready_requests_queued()); - aggregate_demand.set_num_infeasible_requests_queued( - aggregate_demand.num_infeasible_requests_queued() + - demand.num_infeasible_requests_queued()); - if (RayConfig::instance().report_worker_backlog()) { - aggregate_demand.set_backlog_size(aggregate_demand.backlog_size() + - demand.backlog_size()); - } - } - - batch->add_batch()->CopyFrom(usage.second); - } - - for (const auto &demand : aggregate_load) { - auto demand_proto = batch->mutable_resource_load_by_shape()->add_resource_demands(); - demand_proto->CopyFrom(demand.second); - for (const auto &resource_pair : demand.first.GetResourceMap()) { - (*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second; - } - } - - // Update placement group load to heartbeat batch. - // This is updated only one per second. - if (placement_group_load_.has_value()) { - auto placement_group_load = placement_group_load_.value(); - auto placement_group_load_proto = batch->mutable_placement_group_load(); - placement_group_load_proto->CopyFrom(*placement_group_load.get()); - } - reply->mutable_resource_usage_data()->CopyFrom(*batch); - } - - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); - ++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST]; -} - -void GcsNodeManager::UpdateNodeResourceUsage( - const NodeID node_id, const rpc::ReportResourceUsageRequest &request) { - auto iter = node_resource_usages_.find(node_id); - if (!light_report_resource_usage_enabled_ || iter == node_resource_usages_.end()) { - auto resources_data = std::make_shared(); - resources_data->CopyFrom(request.resources()); - node_resource_usages_[node_id] = *resources_data; - } else { - if (request.resources().resources_total_size() > 0) { - (*iter->second.mutable_resources_total()) = request.resources().resources_total(); - } - if (request.resources().resources_available_changed()) { - (*iter->second.mutable_resources_available()) = - request.resources().resources_available(); - } - if (request.resources().resource_load_changed()) { - (*iter->second.mutable_resource_load()) = request.resources().resource_load(); - } - (*iter->second.mutable_resource_load_by_shape()) = - request.resources().resource_load_by_shape(); - } -} - absl::optional> GcsNodeManager::GetAliveNode( const ray::NodeID &node_id) const { auto iter = alive_nodes_.find(node_id); @@ -261,8 +150,6 @@ std::shared_ptr GcsNodeManager::RemoveNode( stats::NodeFailureTotal.Record(1); // Remove from alive nodes. alive_nodes_.erase(iter); - resources_buffer_.erase(node_id); - node_resource_usages_.erase(node_id); if (!is_intended) { // Broadcast a warning to all of the drivers indicating that the node // has been marked as dead. @@ -317,11 +204,6 @@ void GcsNodeManager::Initialize(const GcsInitData &gcs_init_data) { const std::pair &right) { return left.second < right.second; }); } -void GcsNodeManager::UpdatePlacementGroupLoad( - const std::shared_ptr placement_group_load) { - placement_group_load_ = absl::make_optional(placement_group_load); -} - void GcsNodeManager::AddDeadNodeToCache(std::shared_ptr node) { if (dead_nodes_.size() >= RayConfig::instance().maximum_gcs_dead_node_cached_count()) { const auto &node_id = sorted_dead_node_list_.begin()->first; @@ -334,34 +216,6 @@ void GcsNodeManager::AddDeadNodeToCache(std::shared_ptr node) sorted_dead_node_list_.emplace_back(node_id, node->timestamp()); } -void GcsNodeManager::SendBatchedResourceUsage() { - if (!resources_buffer_.empty()) { - auto batch = std::make_shared(); - for (auto &resources : resources_buffer_) { - batch->add_batch()->Swap(&resources.second); - } - stats::OutboundHeartbeatSizeKB.Record((double)(batch->ByteSizeLong() / 1024.0)); - - RAY_CHECK_OK(gcs_pub_sub_->Publish(RESOURCES_BATCH_CHANNEL, "", - batch->SerializeAsString(), nullptr)); - resources_buffer_.clear(); - } - - auto resources_period = boost::posix_time::milliseconds( - RayConfig::instance().raylet_report_resources_period_milliseconds()); - resource_timer_.expires_from_now(resources_period); - resource_timer_.async_wait([this](const boost::system::error_code &error) { - if (error == boost::asio::error::operation_aborted) { - // `operation_aborted` is set when `resource_timer_` is canceled or destroyed. - // The Monitor lifetime may be short than the object who use it. (e.g. gcs_server) - return; - } - RAY_CHECK(!error) << "Sending batched resource usage failed with error: " - << error.message(); - SendBatchedResourceUsage(); - }); -} - std::string GcsNodeManager::DebugString() const { std::ostringstream stream; stream << "GcsNodeManager: {RegisterNode request count: " @@ -370,10 +224,6 @@ std::string GcsNodeManager::DebugString() const { << counts_[CountType::UNREGISTER_NODE_REQUEST] << ", GetAllNodeInfo request count: " << counts_[CountType::GET_ALL_NODE_INFO_REQUEST] - << ", ReportResourceUsage request count: " - << counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST] - << ", GetAllResourceUsage request count: " - << counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST] << ", SetInternalConfig request count: " << counts_[CountType::SET_INTERNAL_CONFIG_REQUEST] << ", GetInternalConfig request count: " diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 8b99eaa13cf46..cc9da799dd3af 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -36,11 +36,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler { public: /// Create a GcsNodeManager. /// - /// \param main_io_service The main event loop. /// \param gcs_pub_sub GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. - explicit GcsNodeManager(boost::asio::io_service &main_io_service, - std::shared_ptr gcs_pub_sub, + explicit GcsNodeManager(std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage); /// Handle register rpc request come from raylet. @@ -58,16 +56,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { rpc::GetAllNodeInfoReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle report resource usage rpc come from raylet. - void HandleReportResourceUsage(const rpc::ReportResourceUsageRequest &request, - rpc::ReportResourceUsageReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - - /// Handle get all resource usage rpc request. - void HandleGetAllResourceUsage(const rpc::GetAllResourceUsageRequest &request, - rpc::GetAllResourceUsageReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - /// Handle set internal config. void HandleSetInternalConfig(const rpc::SetInternalConfigRequest &request, rpc::SetInternalConfigReply *reply, @@ -78,13 +66,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { rpc::GetInternalConfigReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Update resource usage of given node. - /// - /// \param node_id Node id. - /// \param request Request containing resource usage. - void UpdateNodeResourceUsage(const NodeID node_id, - const rpc::ReportResourceUsageRequest &request); - void OnNodeFailure(const NodeID &node_id); /// Add an alive node. @@ -133,29 +114,12 @@ class GcsNodeManager : public rpc::NodeInfoHandler { node_added_listeners_.emplace_back(std::move(listener)); } - /// Add listener to monitor the resource change of nodes. - /// - /// \param listener The handler which process the resource change of nodes. - void AddNodeResourceChangedListener( - std::function &)> - listener) { - RAY_CHECK(listener); - node_resource_changed_listeners_.emplace_back(std::move(listener)); - } - /// Initialize with the gcs tables data synchronously. /// This should be called when GCS server restarts after a failure. /// /// \param gcs_init_data. void Initialize(const GcsInitData &gcs_init_data); - /// Update the placement group load information so that it will be reported through - /// heartbeat. - /// - /// \param placement_group_load placement group load protobuf. - void UpdatePlacementGroupLoad( - const std::shared_ptr placement_group_load); - std::string DebugString() const; private: @@ -165,13 +129,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// \param node The node which is dead. void AddDeadNodeToCache(std::shared_ptr node); - /// Send any buffered resource usage as a single publish. - void SendBatchedResourceUsage(); - - /// A timer that ticks every raylet_report_resources_period_milliseconds. - boost::asio::deadline_timer resource_timer_; - // Only the changed part will be reported if this is true. - const bool light_report_resource_usage_enabled_; /// Alive nodes. absl::flat_hash_map> alive_nodes_; /// Dead nodes. @@ -179,37 +136,25 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// The nodes are sorted according to the timestamp, and the oldest is at the head of /// the list. std::list> sorted_dead_node_list_; - /// Newest resource usage of all nodes. - absl::flat_hash_map node_resource_usages_; - /// A buffer containing resource usage received from node managers in the last tick. - absl::flat_hash_map resources_buffer_; /// Listeners which monitors the addition of nodes. std::vector)>> node_added_listeners_; /// Listeners which monitors the removal of nodes. std::vector)>> node_removed_listeners_; - /// Listeners which monitors the resource change of nodes. - std::vector &)>> - node_resource_changed_listeners_; /// A publisher for publishing gcs messages. std::shared_ptr gcs_pub_sub_; /// Storage for GCS tables. std::shared_ptr gcs_table_storage_; - /// Placement group load information that is used for autoscaler. - absl::optional> placement_group_load_; // Debug info. enum CountType { REGISTER_NODE_REQUEST = 0, UNREGISTER_NODE_REQUEST = 1, GET_ALL_NODE_INFO_REQUEST = 2, - REPORT_RESOURCE_USAGE_REQUEST = 3, - GET_ALL_RESOURCE_USAGE_REQUEST = 4, - SET_INTERNAL_CONFIG_REQUEST = 5, - GET_INTERNAL_CONFIG_REQUEST = 6, - CountType_MAX = 7, + SET_INTERNAL_CONFIG_REQUEST = 3, + GET_INTERNAL_CONFIG_REQUEST = 4, + CountType_MAX = 5, }; uint64_t counts_[CountType::CountType_MAX] = {0}; }; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 5ac3d0510f2b1..a77a32db2efac 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -107,11 +107,11 @@ GcsPlacementGroupManager::GcsPlacementGroupManager( boost::asio::io_context &io_context, std::shared_ptr scheduler, std::shared_ptr gcs_table_storage, - GcsNodeManager &gcs_node_manager) + GcsResourceManager &gcs_resource_manager) : io_context_(io_context), gcs_placement_group_scheduler_(std::move(scheduler)), gcs_table_storage_(std::move(gcs_table_storage)), - gcs_node_manager_(gcs_node_manager) { + gcs_resource_manager_(gcs_resource_manager) { Tick(); } @@ -537,7 +537,7 @@ void GcsPlacementGroupManager::UpdatePlacementGroupLoad() { break; } } - gcs_node_manager_.UpdatePlacementGroupLoad(move(placement_group_load)); + gcs_resource_manager_.UpdatePlacementGroupLoad(move(placement_group_load)); } void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) { diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index 17a4f5c1100f9..8bd36941745f2 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -130,12 +130,12 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// \param io_context The event loop to run the monitor on. /// \param scheduler Used to schedule placement group creation tasks. /// \param gcs_table_storage Used to flush placement group data to storage. - /// \param gcs_node_manager Reference of GcsNodeManager. + /// \param gcs_resource_manager Reference of GcsResourceManager. explicit GcsPlacementGroupManager( boost::asio::io_context &io_context, std::shared_ptr scheduler, std::shared_ptr gcs_table_storage, - GcsNodeManager &gcs_node_manager); + GcsResourceManager &gcs_resource_manager); ~GcsPlacementGroupManager() = default; @@ -308,8 +308,8 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// We should probably support concurrenet creation (or batching). PlacementGroupID scheduling_in_progress_id_ = PlacementGroupID::Nil(); - /// Reference of GcsNodeManager. - GcsNodeManager &gcs_node_manager_; + /// Reference of GcsResourceManager. + GcsResourceManager &gcs_resource_manager_; // Debug info. enum CountType { diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index f0b3be06c3c23..483b915a9df44 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -13,14 +13,22 @@ // limitations under the License. #include "ray/gcs/gcs_server/gcs_resource_manager.h" +#include "ray/common/ray_config.h" +#include "ray/stats/stats.h" namespace ray { namespace gcs { GcsResourceManager::GcsResourceManager( - std::shared_ptr gcs_pub_sub, + boost::asio::io_service &main_io_service, std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage) - : gcs_pub_sub_(gcs_pub_sub), gcs_table_storage_(gcs_table_storage) {} + : resource_timer_(main_io_service), + light_report_resource_usage_enabled_( + RayConfig::instance().light_report_resource_usage_enabled()), + gcs_pub_sub_(gcs_pub_sub), + gcs_table_storage_(gcs_table_storage) { + SendBatchedResourceUsage(); +} void GcsResourceManager::HandleGetResources(const rpc::GetResourcesRequest &request, rpc::GetResourcesReply *reply, @@ -153,6 +161,108 @@ void GcsResourceManager::HandleGetAllAvailableResources( ++counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST]; } +void GcsResourceManager::HandleReportResourceUsage( + const rpc::ReportResourceUsageRequest &request, rpc::ReportResourceUsageReply *reply, + rpc::SendReplyCallback send_reply_callback) { + NodeID node_id = NodeID::FromBinary(request.resources().node_id()); + auto resources_data = std::make_shared(); + resources_data->CopyFrom(request.resources()); + + // We use `node_resource_usages_` to filter out the nodes that report resource + // information for the first time. `UpdateNodeResourceUsage` will modify + // `node_resource_usages_`, so we need to do it before `UpdateNodeResourceUsage`. + if (!light_report_resource_usage_enabled_ || + node_resource_usages_.count(node_id) == 0 || + resources_data->resources_available_changed()) { + const auto &resource_changed = MapFromProtobuf(resources_data->resources_available()); + SetAvailableResources(node_id, ResourceSet(resource_changed)); + } + + UpdateNodeResourceUsage(node_id, request); + + if (!light_report_resource_usage_enabled_ || resources_data->should_global_gc() || + resources_data->resources_total_size() > 0 || + resources_data->resources_available_changed() || + resources_data->resource_load_changed()) { + resources_buffer_[node_id] = *resources_data; + } + + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST]; +} + +void GcsResourceManager::HandleGetAllResourceUsage( + const rpc::GetAllResourceUsageRequest &request, rpc::GetAllResourceUsageReply *reply, + rpc::SendReplyCallback send_reply_callback) { + if (!node_resource_usages_.empty()) { + auto batch = std::make_shared(); + absl::flat_hash_map aggregate_load; + for (const auto &usage : node_resource_usages_) { + // Aggregate the load reported by each raylet. + auto load = usage.second.resource_load_by_shape(); + for (const auto &demand : load.resource_demands()) { + auto scheduling_key = ResourceSet(MapFromProtobuf(demand.shape())); + auto &aggregate_demand = aggregate_load[scheduling_key]; + aggregate_demand.set_num_ready_requests_queued( + aggregate_demand.num_ready_requests_queued() + + demand.num_ready_requests_queued()); + aggregate_demand.set_num_infeasible_requests_queued( + aggregate_demand.num_infeasible_requests_queued() + + demand.num_infeasible_requests_queued()); + if (RayConfig::instance().report_worker_backlog()) { + aggregate_demand.set_backlog_size(aggregate_demand.backlog_size() + + demand.backlog_size()); + } + } + + batch->add_batch()->CopyFrom(usage.second); + } + + for (const auto &demand : aggregate_load) { + auto demand_proto = batch->mutable_resource_load_by_shape()->add_resource_demands(); + demand_proto->CopyFrom(demand.second); + for (const auto &resource_pair : demand.first.GetResourceMap()) { + (*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second; + } + } + + // Update placement group load to heartbeat batch. + // This is updated only one per second. + if (placement_group_load_.has_value()) { + auto placement_group_load = placement_group_load_.value(); + auto placement_group_load_proto = batch->mutable_placement_group_load(); + placement_group_load_proto->CopyFrom(*placement_group_load.get()); + } + reply->mutable_resource_usage_data()->CopyFrom(*batch); + } + + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST]; +} + +void GcsResourceManager::UpdateNodeResourceUsage( + const NodeID node_id, const rpc::ReportResourceUsageRequest &request) { + auto iter = node_resource_usages_.find(node_id); + if (!light_report_resource_usage_enabled_ || iter == node_resource_usages_.end()) { + auto resources_data = std::make_shared(); + resources_data->CopyFrom(request.resources()); + node_resource_usages_[node_id] = *resources_data; + } else { + if (request.resources().resources_total_size() > 0) { + (*iter->second.mutable_resources_total()) = request.resources().resources_total(); + } + if (request.resources().resources_available_changed()) { + (*iter->second.mutable_resources_available()) = + request.resources().resources_available(); + } + if (request.resources().resource_load_changed()) { + (*iter->second.mutable_resource_load()) = request.resources().resource_load(); + } + (*iter->second.mutable_resource_load_by_shape()) = + request.resources().resource_load_by_shape(); + } +} + void GcsResourceManager::Initialize(const GcsInitData &gcs_init_data) { const auto &nodes = gcs_init_data.Nodes(); for (const auto &entry : nodes) { @@ -216,6 +326,8 @@ void GcsResourceManager::OnNodeAdd(const rpc::GcsNodeInfo &node) { } void GcsResourceManager::OnNodeDead(const NodeID &node_id) { + resources_buffer_.erase(node_id); + node_resource_usages_.erase(node_id); cluster_scheduling_resources_.erase(node_id); } @@ -244,6 +356,39 @@ bool GcsResourceManager::ReleaseResources(const NodeID &node_id, return true; } +void GcsResourceManager::SendBatchedResourceUsage() { + if (!resources_buffer_.empty()) { + auto batch = std::make_shared(); + for (auto &resources : resources_buffer_) { + batch->add_batch()->Swap(&resources.second); + } + stats::OutboundHeartbeatSizeKB.Record((double)(batch->ByteSizeLong() / 1024.0)); + + RAY_CHECK_OK(gcs_pub_sub_->Publish(RESOURCES_BATCH_CHANNEL, "", + batch->SerializeAsString(), nullptr)); + resources_buffer_.clear(); + } + + auto resources_period = boost::posix_time::milliseconds( + RayConfig::instance().raylet_report_resources_period_milliseconds()); + resource_timer_.expires_from_now(resources_period); + resource_timer_.async_wait([this](const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + // `operation_aborted` is set when `resource_timer_` is canceled or destroyed. + // The Monitor lifetime may be short than the object who use it. (e.g. gcs_server) + return; + } + RAY_CHECK(!error) << "Sending batched resource usage failed with error: " + << error.message(); + SendBatchedResourceUsage(); + }); +} + +void GcsResourceManager::UpdatePlacementGroupLoad( + const std::shared_ptr placement_group_load) { + placement_group_load_ = absl::make_optional(placement_group_load); +} + std::string GcsResourceManager::DebugString() const { std::ostringstream stream; stream << "GcsResourceManager: {GetResources request count: " @@ -253,7 +398,11 @@ std::string GcsResourceManager::DebugString() const { << ", UpdateResources request count: " << counts_[CountType::UPDATE_RESOURCES_REQUEST] << ", DeleteResources request count: " - << counts_[CountType::DELETE_RESOURCES_REQUEST] << "}"; + << counts_[CountType::DELETE_RESOURCES_REQUEST] + << ", ReportResourceUsage request count: " + << counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST] + << ", GetAllResourceUsage request count: " + << counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST] << "}"; return stream.str(); } diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index 095e0c234dbe4..f606c9823a6d5 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -37,9 +37,11 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler { public: /// Create a GcsResourceManager. /// + /// \param main_io_service The main event loop. /// \param gcs_pub_sub GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. - explicit GcsResourceManager(std::shared_ptr gcs_pub_sub, + explicit GcsResourceManager(boost::asio::io_service &main_io_service, + std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage); virtual ~GcsResourceManager() {} @@ -65,6 +67,16 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler { rpc::GetAllAvailableResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle report resource usage rpc come from raylet. + void HandleReportResourceUsage(const rpc::ReportResourceUsageRequest &request, + rpc::ReportResourceUsageReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + + /// Handle get all resource usage rpc request. + void HandleGetAllResourceUsage(const rpc::GetAllResourceUsageRequest &request, + rpc::GetAllResourceUsageReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Get the resources of all nodes in the cluster. /// /// \return The resources of all nodes in the cluster. @@ -118,6 +130,20 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler { const NodeID &node_id, const std::unordered_map &changed_resources); + /// Update resource usage of given node. + /// + /// \param node_id Node id. + /// \param request Request containing resource usage. + void UpdateNodeResourceUsage(const NodeID node_id, + const rpc::ReportResourceUsageRequest &request); + + /// Update the placement group load information so that it will be reported through + /// heartbeat. + /// + /// \param placement_group_load placement group load protobuf. + void UpdatePlacementGroupLoad( + const std::shared_ptr placement_group_load); + private: /// Delete the scheduling resources of the specified node. /// @@ -126,12 +152,26 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler { void DeleteResources(const NodeID &node_id, const std::vector &deleted_resources); + /// Send any buffered resource usage as a single publish. + void SendBatchedResourceUsage(); + + /// A timer that ticks every raylet_report_resources_period_milliseconds. + boost::asio::deadline_timer resource_timer_; + // Only the changed part will be reported if this is true. + const bool light_report_resource_usage_enabled_; + /// Newest resource usage of all nodes. + absl::flat_hash_map node_resource_usages_; + /// A buffer containing resource usage received from node managers in the last tick. + absl::flat_hash_map resources_buffer_; + /// A publisher for publishing gcs messages. std::shared_ptr gcs_pub_sub_; /// Storage for GCS tables. std::shared_ptr gcs_table_storage_; /// Map from node id to the scheduling resources of the node. absl::flat_hash_map cluster_scheduling_resources_; + /// Placement group load information that is used for autoscaler. + absl::optional> placement_group_load_; /// Debug info. enum CountType { @@ -139,7 +179,9 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler { UPDATE_RESOURCES_REQUEST = 1, DELETE_RESOURCES_REQUEST = 2, GET_ALL_AVAILABLE_RESOURCES_REQUEST = 3, - CountType_MAX = 4, + REPORT_RESOURCE_USAGE_REQUEST = 4, + GET_ALL_RESOURCE_USAGE_REQUEST = 5, + CountType_MAX = 6, }; uint64_t counts_[CountType::CountType_MAX] = {0}; }; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 672c593dfafd3..aacbe29261340 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -132,8 +132,7 @@ void GcsServer::Stop() { void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { RAY_CHECK(redis_client_ && gcs_table_storage_ && gcs_pub_sub_); - gcs_node_manager_ = - std::make_shared(main_service_, gcs_pub_sub_, gcs_table_storage_); + gcs_node_manager_ = std::make_shared(gcs_pub_sub_, gcs_table_storage_); // Initialize by gcs tables data. gcs_node_manager_->Initialize(gcs_init_data); // Register service. @@ -160,8 +159,8 @@ void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) { void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_); - gcs_resource_manager_ = - std::make_shared(gcs_pub_sub_, gcs_table_storage_); + gcs_resource_manager_ = std::make_shared( + main_service_, gcs_pub_sub_, gcs_table_storage_); // Initialize by gcs tables data. gcs_resource_manager_->Initialize(gcs_init_data); // Register service. @@ -223,7 +222,7 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) { raylet_client_pool_); gcs_placement_group_manager_ = std::make_shared( - main_service_, scheduler, gcs_table_storage_, *gcs_node_manager_); + main_service_, scheduler, gcs_table_storage_, *gcs_resource_manager_); // Initialize by gcs tables data. gcs_placement_group_manager_->Initialize(gcs_init_data); // Register service. @@ -306,12 +305,6 @@ void GcsServer::InstallEventListeners() { gcs_actor_manager_->OnNodeDead(node_id); raylet_client_pool_->Disconnect(NodeID::FromBinary(node->node_id())); }); - gcs_node_manager_->AddNodeResourceChangedListener( - [this](const NodeID &node_id, - const std::unordered_map &resource_changed) { - gcs_resource_manager_->SetAvailableResources(node_id, - ResourceSet(resource_changed)); - }); // Install worker event listener. gcs_worker_manager_->AddWorkerDeadListener( diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index 7bb1ca716f440..fffef0db79065 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -26,9 +26,8 @@ class GcsActorSchedulerTest : public ::testing::Test { worker_client_ = std::make_shared(); gcs_pub_sub_ = std::make_shared(redis_client_); gcs_table_storage_ = std::make_shared(redis_client_); - gcs_resource_manager_ = std::make_shared(nullptr, nullptr); - gcs_node_manager_ = std::make_shared(io_service_, gcs_pub_sub_, - gcs_table_storage_); + gcs_node_manager_ = + std::make_shared(gcs_pub_sub_, gcs_table_storage_); store_client_ = std::make_shared(io_service_); gcs_actor_table_ = std::make_shared(store_client_); @@ -55,7 +54,6 @@ class GcsActorSchedulerTest : public ::testing::Test { std::shared_ptr gcs_actor_table_; std::shared_ptr raylet_client_; std::shared_ptr worker_client_; - std::shared_ptr gcs_resource_manager_; std::shared_ptr gcs_node_manager_; std::shared_ptr gcs_actor_scheduler_; std::vector> success_actors_; diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index 25f80733a4798..b85b594e9a464 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -23,65 +23,29 @@ class GcsNodeManagerTest : public ::testing::Test { public: GcsNodeManagerTest() { gcs_pub_sub_ = std::make_shared(redis_client_); - gcs_resource_manager_ = std::make_shared(nullptr, nullptr); } protected: std::shared_ptr gcs_pub_sub_; std::shared_ptr redis_client_; std::shared_ptr gcs_table_storage_; - std::shared_ptr gcs_resource_manager_; }; TEST_F(GcsNodeManagerTest, TestManagement) { - boost::asio::io_service io_service; - gcs::GcsNodeManager node_manager(io_service, gcs_pub_sub_, gcs_table_storage_); + gcs::GcsNodeManager node_manager(gcs_pub_sub_, gcs_table_storage_); // Test Add/Get/Remove functionality. auto node = Mocker::GenNodeInfo(); auto node_id = NodeID::FromBinary(node->node_id()); - { - rpc::GetAllResourceUsageRequest request; - rpc::GetAllResourceUsageReply reply; - auto send_reply_callback = [](ray::Status status, std::function f1, - std::function f2) {}; - node_manager.HandleGetAllResourceUsage(request, &reply, send_reply_callback); - ASSERT_EQ(reply.resource_usage_data().batch().size(), 0); - } - node_manager.AddNode(node); ASSERT_EQ(node, node_manager.GetAliveNode(node_id).value()); - rpc::ReportResourceUsageRequest report_request; - (*report_request.mutable_resources()->mutable_resources_available())["CPU"] = 2; - (*report_request.mutable_resources()->mutable_resources_total())["CPU"] = 2; - node_manager.UpdateNodeResourceUsage(node_id, report_request); - - { - rpc::GetAllResourceUsageRequest request; - rpc::GetAllResourceUsageReply reply; - auto send_reply_callback = [](ray::Status status, std::function f1, - std::function f2) {}; - node_manager.HandleGetAllResourceUsage(request, &reply, send_reply_callback); - ASSERT_EQ(reply.resource_usage_data().batch().size(), 1); - } - node_manager.RemoveNode(node_id); ASSERT_TRUE(!node_manager.GetAliveNode(node_id).has_value()); - - { - rpc::GetAllResourceUsageRequest request; - rpc::GetAllResourceUsageReply reply; - auto send_reply_callback = [](ray::Status status, std::function f1, - std::function f2) {}; - node_manager.HandleGetAllResourceUsage(request, &reply, send_reply_callback); - ASSERT_EQ(reply.resource_usage_data().batch().size(), 0); - } } TEST_F(GcsNodeManagerTest, TestListener) { - boost::asio::io_service io_service; - gcs::GcsNodeManager node_manager(io_service, gcs_pub_sub_, gcs_table_storage_); + gcs::GcsNodeManager node_manager(gcs_pub_sub_, gcs_table_storage_); // Test AddNodeAddedListener. int node_count = 1000; std::vector> added_nodes; diff --git a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc index f6842d287c7e9..40b695d22b0e1 100644 --- a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc @@ -54,9 +54,8 @@ class GcsObjectManagerTest : public ::testing::Test { public: void SetUp() override { gcs_table_storage_ = std::make_shared(io_service_); - gcs_resource_manager_ = std::make_shared(nullptr, nullptr); - gcs_node_manager_ = std::make_shared(io_service_, gcs_pub_sub_, - gcs_table_storage_); + gcs_node_manager_ = + std::make_shared(gcs_pub_sub_, gcs_table_storage_); gcs_object_manager_ = std::make_shared( gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_); GenTestData(); @@ -84,7 +83,6 @@ class GcsObjectManagerTest : public ::testing::Test { protected: boost::asio::io_service io_service_; - std::shared_ptr gcs_resource_manager_; std::shared_ptr gcs_node_manager_; std::shared_ptr gcs_pub_sub_; std::shared_ptr gcs_object_manager_; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index 70bfdce31fd3e..fec3f25404011 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -68,12 +68,11 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { : mock_placement_group_scheduler_(new MockPlacementGroupScheduler()) { gcs_pub_sub_ = std::make_shared(redis_client_); gcs_table_storage_ = std::make_shared(io_service_); - gcs_resource_manager_ = std::make_shared(nullptr, nullptr); - gcs_node_manager_ = std::make_shared(io_service_, gcs_pub_sub_, - gcs_table_storage_); + gcs_resource_manager_ = + std::make_shared(io_service_, nullptr, nullptr); gcs_placement_group_manager_.reset( new gcs::GcsPlacementGroupManager(io_service_, mock_placement_group_scheduler_, - gcs_table_storage_, *gcs_node_manager_)); + gcs_table_storage_, *gcs_resource_manager_)); } void SetUp() override { @@ -105,7 +104,6 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { boost::asio::io_service io_service_; std::shared_ptr gcs_table_storage_; std::shared_ptr gcs_resource_manager_; - std::shared_ptr gcs_node_manager_; std::shared_ptr gcs_pub_sub_; std::shared_ptr redis_client_; }; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 6a0a5839b1921..0703d949776d7 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -39,9 +39,10 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { } gcs_table_storage_ = std::make_shared(io_service_); gcs_pub_sub_ = std::make_shared(redis_client_); - gcs_resource_manager_ = std::make_shared(nullptr, nullptr); - gcs_node_manager_ = std::make_shared(io_service_, gcs_pub_sub_, - gcs_table_storage_); + gcs_resource_manager_ = + std::make_shared(io_service_, nullptr, nullptr); + gcs_node_manager_ = + std::make_shared(gcs_pub_sub_, gcs_table_storage_); gcs_table_storage_ = std::make_shared(io_service_); store_client_ = std::make_shared(io_service_); raylet_client_pool_ = std::make_shared( diff --git a/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc index 9f732e0dd5125..c7f83691b6f9b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc @@ -25,9 +25,11 @@ using ::testing::_; class GcsResourceManagerTest : public ::testing::Test { public: GcsResourceManagerTest() { - gcs_resource_manager_ = std::make_shared(nullptr, nullptr); + gcs_resource_manager_ = + std::make_shared(io_service_, nullptr, nullptr); } + boost::asio::io_service io_service_; std::shared_ptr gcs_resource_manager_; }; @@ -55,6 +57,33 @@ TEST_F(GcsResourceManagerTest, TestBasic) { ASSERT_TRUE(gcs_resource_manager_->AcquireResources(node_id, resource_set)); } +TEST_F(GcsResourceManagerTest, TestResourceUsageAPI) { + auto node = Mocker::GenNodeInfo(); + auto node_id = NodeID::FromBinary(node->node_id()); + rpc::GetAllResourceUsageRequest get_all_request; + rpc::GetAllResourceUsageReply get_all_reply; + auto send_reply_callback = [](ray::Status status, std::function f1, + std::function f2) {}; + gcs_resource_manager_->HandleGetAllResourceUsage(get_all_request, &get_all_reply, + send_reply_callback); + ASSERT_EQ(get_all_reply.resource_usage_data().batch().size(), 0); + + rpc::ReportResourceUsageRequest report_request; + (*report_request.mutable_resources()->mutable_resources_available())["CPU"] = 2; + (*report_request.mutable_resources()->mutable_resources_total())["CPU"] = 2; + gcs_resource_manager_->UpdateNodeResourceUsage(node_id, report_request); + + gcs_resource_manager_->HandleGetAllResourceUsage(get_all_request, &get_all_reply, + send_reply_callback); + ASSERT_EQ(get_all_reply.resource_usage_data().batch().size(), 1); + + gcs_resource_manager_->OnNodeDead(node_id); + rpc::GetAllResourceUsageReply get_all_reply2; + gcs_resource_manager_->HandleGetAllResourceUsage(get_all_request, &get_all_reply2, + send_reply_callback); + ASSERT_EQ(get_all_reply2.resource_usage_data().batch().size(), 0); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 093b7c4626e90..ede33b395f37d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -371,12 +371,6 @@ struct GcsServerMocker { return Status::NotImplemented(""); } - Status AsyncSubscribeBatchedResourceUsage( - const gcs::ItemCallback &subscribe, - const gcs::StatusCallback &done) override { - return Status::NotImplemented(""); - } - void AsyncResubscribe(bool is_pubsub_server_restarted) override {} }; diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 8bba86e56e05c..612105afd6f28 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -164,22 +164,6 @@ message ReportHeartbeatReply { GcsStatus status = 1; } -message ReportResourceUsageRequest { - ResourcesData resources = 1; -} - -message ReportResourceUsageReply { - GcsStatus status = 1; -} - -message GetAllResourceUsageRequest { -} - -message GetAllResourceUsageReply { - GcsStatus status = 1; - ResourceUsageBatchData resource_usage_data = 2; -} - message SetInternalConfigRequest { StoredConfig config = 1; } @@ -204,10 +188,6 @@ service NodeInfoGcsService { rpc UnregisterNode(UnregisterNodeRequest) returns (UnregisterNodeReply); // Get information of all nodes from GCS Service. rpc GetAllNodeInfo(GetAllNodeInfoRequest) returns (GetAllNodeInfoReply); - // Report resource usage of a node to GCS Service. - rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply); - // Get resource usage of all nodes from GCS Service. - rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply); // Set cluster internal config. rpc SetInternalConfig(SetInternalConfigRequest) returns (SetInternalConfigReply); // Get cluster internal config. @@ -249,6 +229,22 @@ message GetAllAvailableResourcesReply { repeated AvailableResources resources_list = 2; } +message ReportResourceUsageRequest { + ResourcesData resources = 1; +} + +message ReportResourceUsageReply { + GcsStatus status = 1; +} + +message GetAllResourceUsageRequest { +} + +message GetAllResourceUsageReply { + GcsStatus status = 1; + ResourceUsageBatchData resource_usage_data = 2; +} + // Service for node resource info access. service NodeResourceInfoGcsService { // Get node's resources from GCS Service. @@ -260,6 +256,10 @@ service NodeResourceInfoGcsService { // Get available resources of all nodes. rpc GetAllAvailableResources(GetAllAvailableResourcesRequest) returns (GetAllAvailableResourcesReply); + // Report resource usage of a node to GCS Service. + rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply); + // Get resource usage of all nodes from GCS Service. + rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply); } // Service for heartbeat info access. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index bcfc4e32f02ec..6fba62e436f4d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -288,7 +288,7 @@ ray::Status NodeManager::RegisterGcs() { [this](const ResourceUsageBatchData &resource_usage_batch) { ResourceUsageBatchAdded(resource_usage_batch); }; - RAY_RETURN_NOT_OK(gcs_client_->Nodes().AsyncSubscribeBatchedResourceUsage( + RAY_RETURN_NOT_OK(gcs_client_->NodeResources().AsyncSubscribeBatchedResourceUsage( resource_usage_batch_added, /*done*/ nullptr)); // Subscribe to all unexpected failure notifications from the local and @@ -448,7 +448,7 @@ void NodeManager::ReportResourceUsage() { // Update local chche from gcs remote cache, this is needed when gcs restart. // We should always keep the cache view consistent. new_resource_scheduler_->UpdateLastReportResourcesFromGcs( - gcs_client_->Nodes().GetLastResourceUsage()); + gcs_client_->NodeResources().GetLastResourceUsage()); new_resource_scheduler_->FillResourceUsage(light_report_resource_usage_enabled_, resources_data); cluster_task_manager_->FillResourceUsage(light_report_resource_usage_enabled_, @@ -460,7 +460,7 @@ void NodeManager::ReportResourceUsage() { // If light resource usage report enabled, we only set filed that represent resources // changed. if (light_report_resource_usage_enabled_) { - auto last_heartbeat_resources = gcs_client_->Nodes().GetLastResourceUsage(); + auto last_heartbeat_resources = gcs_client_->NodeResources().GetLastResourceUsage(); if (!last_heartbeat_resources->GetTotalResources().IsEqual( local_resources.GetTotalResources())) { for (const auto &resource_pair : @@ -546,8 +546,8 @@ void NodeManager::ReportResourceUsage() { if (resources_data->resources_total_size() > 0 || resources_data->resources_available_changed() || resources_data->resource_load_changed() || resources_data->should_global_gc()) { - RAY_CHECK_OK( - gcs_client_->Nodes().AsyncReportResourceUsage(resources_data, /*done*/ nullptr)); + RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage(resources_data, + /*done*/ nullptr)); } // Reset the timer. diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 39641358f102d..fc3acad2439d1 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -161,14 +161,6 @@ class GcsRpcClient { /// Get information of all nodes from GCS Service. VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllNodeInfo, node_info_grpc_client_, ) - /// Report resource usage of a node to GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportResourceUsage, - node_info_grpc_client_, ) - - /// Get resource usage of all nodes from GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllResourceUsage, - node_info_grpc_client_, ) - /// Set internal config of the cluster in the GCS Service. VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, SetInternalConfig, node_info_grpc_client_, ) @@ -193,6 +185,14 @@ class GcsRpcClient { VOID_GCS_RPC_CLIENT_METHOD(NodeResourceInfoGcsService, GetAllAvailableResources, node_resource_info_grpc_client_, ) + /// Report resource usage of a node to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeResourceInfoGcsService, ReportResourceUsage, + node_resource_info_grpc_client_, ) + + /// Get resource usage of all nodes from GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeResourceInfoGcsService, GetAllResourceUsage, + node_resource_info_grpc_client_, ) + /// Report heartbeat of a node to GCS Service. VOID_GCS_RPC_CLIENT_METHOD(HeartbeatInfoGcsService, ReportHeartbeat, heartbeat_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index a39323e40ccd0..52c5dc74b2a1c 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -183,14 +183,6 @@ class NodeInfoGcsServiceHandler { GetAllNodeInfoReply *reply, SendReplyCallback send_reply_callback) = 0; - virtual void HandleReportResourceUsage(const ReportResourceUsageRequest &request, - ReportResourceUsageReply *reply, - SendReplyCallback send_reply_callback) = 0; - - virtual void HandleGetAllResourceUsage(const GetAllResourceUsageRequest &request, - GetAllResourceUsageReply *reply, - SendReplyCallback send_reply_callback) = 0; - virtual void HandleSetInternalConfig(const SetInternalConfigRequest &request, SetInternalConfigReply *reply, SendReplyCallback send_reply_callback) = 0; @@ -219,8 +211,6 @@ class NodeInfoGrpcService : public GrpcService { NODE_INFO_SERVICE_RPC_HANDLER(RegisterNode); NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode); NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo); - NODE_INFO_SERVICE_RPC_HANDLER(ReportResourceUsage); - NODE_INFO_SERVICE_RPC_HANDLER(GetAllResourceUsage); NODE_INFO_SERVICE_RPC_HANDLER(SetInternalConfig); NODE_INFO_SERVICE_RPC_HANDLER(GetInternalConfig); } @@ -252,6 +242,14 @@ class NodeResourceInfoGcsServiceHandler { const rpc::GetAllAvailableResourcesRequest &request, rpc::GetAllAvailableResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) = 0; + + virtual void HandleReportResourceUsage(const ReportResourceUsageRequest &request, + ReportResourceUsageReply *reply, + SendReplyCallback send_reply_callback) = 0; + + virtual void HandleGetAllResourceUsage(const GetAllResourceUsageRequest &request, + GetAllResourceUsageReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeResourceInfoGcsService`. @@ -274,6 +272,8 @@ class NodeResourceInfoGrpcService : public GrpcService { NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(UpdateResources); NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(DeleteResources); NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetAllAvailableResources); + NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(ReportResourceUsage); + NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetAllResourceUsage); } private: