Skip to content

Commit

Permalink
[Core] Remote outdated external store (ray-project#13080)
Browse files Browse the repository at this point in the history
* remove outdated external store
  • Loading branch information
suquark authored Dec 25, 2020
1 parent bf7f6a7 commit cf9952a
Show file tree
Hide file tree
Showing 10 changed files with 24 additions and 291 deletions.
3 changes: 0 additions & 3 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ cc_library(
"src/ray/object_manager/plasma/client.h",
"src/ray/object_manager/plasma/common.h",
"src/ray/object_manager/plasma/compat.h",
"src/ray/object_manager/plasma/external_store.h",
"src/ray/object_manager/plasma/connection.h",
"src/ray/object_manager/plasma/malloc.h",
"src/ray/object_manager/plasma/plasma.h",
Expand Down Expand Up @@ -287,7 +286,6 @@ cc_library(
"src/ray/object_manager/plasma/create_request_queue.cc",
"src/ray/object_manager/plasma/dlmalloc.cc",
"src/ray/object_manager/plasma/eviction_policy.cc",
"src/ray/object_manager/plasma/external_store.cc",
"src/ray/object_manager/plasma/plasma_allocator.cc",
"src/ray/object_manager/plasma/quota_aware_policy.cc",
"src/ray/object_manager/plasma/store.cc",
Expand All @@ -297,7 +295,6 @@ cc_library(
"src/ray/object_manager/common.h",
"src/ray/object_manager/plasma/create_request_queue.h",
"src/ray/object_manager/plasma/eviction_policy.h",
"src/ray/object_manager/plasma/external_store.h",
"src/ray/object_manager/plasma/plasma_allocator.h",
"src/ray/object_manager/plasma/quota_aware_policy.h",
"src/ray/object_manager/plasma/store.h",
Expand Down
2 changes: 1 addition & 1 deletion src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config,
if (config.object_store_memory > 0) {
plasma::plasma_store_runner.reset(new plasma::PlasmaStoreRunner(
config.store_socket_name, config.object_store_memory, config.huge_pages,
config.plasma_directory, ""));
config.plasma_directory));
// Initialize object store.
store_thread_ =
std::thread(&plasma::PlasmaStoreRunner::Start, plasma::plasma_store_runner.get(),
Expand Down
5 changes: 2 additions & 3 deletions src/ray/object_manager/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ struct ObjectBuffer {
int device_num;
};

// TODO(suquark): Maybe we should not export plasma later?
class RAY_EXPORT PlasmaClient {
class PlasmaClient {
public:
PlasmaClient();
~PlasmaClient();
Expand Down Expand Up @@ -279,7 +278,7 @@ class RAY_EXPORT PlasmaClient {

bool IsInUse(const ObjectID &object_id);

class RAY_NO_EXPORT Impl;
class Impl;
std::shared_ptr<Impl> impl_;
};

Expand Down
63 changes: 0 additions & 63 deletions src/ray/object_manager/plasma/external_store.cc

This file was deleted.

120 changes: 0 additions & 120 deletions src/ray/object_manager/plasma/external_store.h

This file was deleted.

79 changes: 14 additions & 65 deletions src/ray/object_manager/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ GetRequest::GetRequest(boost::asio::io_service &io_context,

PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string directory,
bool hugepages_enabled, const std::string &socket_name,
std::shared_ptr<ExternalStore> external_store,
uint32_t delay_on_oom_ms,
ray::SpillObjectsCallback spill_objects_callback,
std::function<void()> object_store_full_callback)
Expand All @@ -121,7 +120,6 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire
acceptor_(main_service, ParseUrlEndpoint(socket_name)),
socket_(main_service),
eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()),
external_store_(external_store),
spill_objects_callback_(spill_objects_callback),
delay_on_oom_ms_(delay_on_oom_ms),
usage_log_interval_ns_(RayConfig::instance().object_store_usage_log_interval_s() *
Expand Down Expand Up @@ -467,8 +465,6 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr<Client> &client,
int64_t timeout_ms) {
// Create a get request for this object.
auto get_req = new GetRequest(io_context_, client, object_ids);
std::vector<ObjectID> evicted_ids;
std::vector<ObjectTableEntry *> evicted_entries;
for (auto object_id : object_ids) {
// Check if this object is already present
// locally. If so, record that the object is being used and mark it as accounted for.
Expand All @@ -489,12 +485,12 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr<Client> &client,
/*evict=*/true, &entry->fd, &entry->map_size,
&entry->offset, client, false, &error);
if (entry->pointer) {
// TODO(suquark): Not sure if this old behavior is still compatible
// with our current object spilling mechanics.
entry->state = ObjectState::PLASMA_CREATED;
entry->create_time = std::time(nullptr);
eviction_policy_.ObjectCreated(object_id, client.get(), false);
AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client);
evicted_ids.push_back(object_id);
evicted_entries.push_back(entry);
} else {
// We are out of memory and cannot allocate memory for this object.
// Change the state of the object back to PLASMA_EVICTED so some
Expand All @@ -511,31 +507,6 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr<Client> &client,
}
}

if (!evicted_ids.empty()) {
std::vector<std::shared_ptr<Buffer>> buffers;
for (size_t i = 0; i < evicted_ids.size(); ++i) {
RAY_CHECK(evicted_entries[i]->pointer != nullptr);
buffers.emplace_back(new arrow::MutableBuffer(evicted_entries[i]->pointer,
evicted_entries[i]->data_size));
}
if (external_store_->Get(evicted_ids, buffers).ok()) {
for (size_t i = 0; i < evicted_ids.size(); ++i) {
evicted_entries[i]->state = ObjectState::PLASMA_SEALED;
evicted_entries[i]->construct_duration =
std::time(nullptr) - evicted_entries[i]->create_time;
PlasmaObject_init(&get_req->objects[evicted_ids[i]], evicted_entries[i]);
get_req->num_satisfied += 1;
}
} else {
// We tried to get the objects from the external store, but could not get them.
// Set the state of these objects back to PLASMA_EVICTED so some other request
// can try again.
for (size_t i = 0; i < evicted_ids.size(); ++i) {
evicted_entries[i]->state = ObjectState::PLASMA_EVICTED;
}
}
}

// If all of the objects are present already or if the timeout is 0, return to
// the client.
if (get_req->num_satisfied == get_req->num_objects_to_wait_for || timeout_ms == 0) {
Expand Down Expand Up @@ -703,9 +674,6 @@ void PlasmaStore::EvictObjects(const std::vector<ObjectID> &object_ids) {
if (object_ids.size() == 0) {
return;
}

std::vector<std::shared_ptr<arrow::Buffer>> evicted_object_data;
std::vector<ObjectTableEntry *> evicted_entries;
for (const auto &object_id : object_ids) {
RAY_LOG(DEBUG) << "evicting object " << object_id.Hex();
auto entry = GetObjectTableEntry(&store_info_, object_id);
Expand All @@ -718,37 +686,18 @@ void PlasmaStore::EvictObjects(const std::vector<ObjectID> &object_ids) {
RAY_CHECK(entry->ref_count == 0)
<< "To evict an object, there must be no clients currently using it.";

// If there is a backing external store, then mark object for eviction to
// external store, free the object data pointer and keep a placeholder
// entry in ObjectTable
if (external_store_) {
evicted_object_data.push_back(std::make_shared<arrow::Buffer>(
entry->pointer, entry->data_size + entry->metadata_size));
evicted_entries.push_back(entry);
} else {
// Prepare the notification before deleting the object.
ObjectInfoT notification;
notification.object_id = object_id.Binary();
notification.owner_raylet_id = entry->owner_raylet_id.Binary();
notification.owner_ip_address = entry->owner_ip_address;
notification.owner_port = entry->owner_port;
notification.owner_worker_id = entry->owner_worker_id.Binary();
notification.is_deletion = true;
// If there is no backing external store, just erase the object entry
// and send a deletion notification.
EraseFromObjectTable(object_id);
// Inform all subscribers that the object has been deleted.
PushNotification(&notification);
}
}

if (external_store_ && !object_ids.empty()) {
RAY_CHECK_OK(external_store_->Put(object_ids, evicted_object_data));
for (auto entry : evicted_entries) {
PlasmaAllocator::Free(entry->pointer, entry->data_size + entry->metadata_size);
entry->pointer = nullptr;
entry->state = ObjectState::PLASMA_EVICTED;
}
// Prepare the notification before deleting the object.
ObjectInfoT notification;
notification.object_id = object_id.Binary();
notification.owner_raylet_id = entry->owner_raylet_id.Binary();
notification.owner_ip_address = entry->owner_ip_address;
notification.owner_port = entry->owner_port;
notification.owner_worker_id = entry->owner_worker_id.Binary();
notification.is_deletion = true;
// Erase the object entry and send a deletion notification.
EraseFromObjectTable(object_id);
// Inform all subscribers that the object has been deleted.
PushNotification(&notification);
}
}

Expand Down
8 changes: 1 addition & 7 deletions src/ray/object_manager/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include "ray/object_manager/plasma/common.h"
#include "ray/object_manager/plasma/connection.h"
#include "ray/object_manager/plasma/create_request_queue.h"
#include "ray/object_manager/plasma/external_store.h"
#include "ray/object_manager/plasma/plasma.h"
#include "ray/object_manager/plasma/protocol.h"
#include "ray/object_manager/plasma/quota_aware_policy.h"
Expand All @@ -55,8 +54,7 @@ class PlasmaStore {
// TODO: PascalCase PlasmaStore methods.
PlasmaStore(boost::asio::io_service &main_service, std::string directory,
bool hugepages_enabled, const std::string &socket_name,
std::shared_ptr<ExternalStore> external_store, uint32_t delay_on_oom_ms,
ray::SpillObjectsCallback spill_objects_callback,
uint32_t delay_on_oom_ms, ray::SpillObjectsCallback spill_objects_callback,
std::function<void()> object_store_full_callback);

~PlasmaStore();
Expand Down Expand Up @@ -275,10 +273,6 @@ class PlasmaStore {

std::unordered_set<ObjectID> deletion_cache_;

/// Manages worker threads for handling asynchronous/multi-threaded requests
/// for reading/writing data to/from external store.
std::shared_ptr<ExternalStore> external_store_;

std::shared_ptr<ray::ObjectStoreNotificationManager> notification_listener_;
/// A callback to asynchronously spill objects when space is needed. The
/// callback returns the amount of space still needed after the spilling is
Expand Down
Loading

0 comments on commit cf9952a

Please sign in to comment.