Skip to content

Commit

Permalink
[Core] Remove cuda support in plasma store (ray-project#13070)
Browse files Browse the repository at this point in the history
* remove cuda support in plasma store
  • Loading branch information
suquark authored Dec 24, 2020
1 parent 2059a20 commit bf7f6a7
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 267 deletions.
125 changes: 3 additions & 122 deletions src/ray/object_manager/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@
#include "ray/object_manager/plasma/protocol.h"
#include "ray/object_manager/plasma/shared_memory.h"

#ifdef PLASMA_CUDA
#include "arrow/gpu/cuda_api.h"

using arrow::cuda::CudaBuffer;
using arrow::cuda::CudaBufferWriter;
using arrow::cuda::CudaContext;
using arrow::cuda::CudaDeviceManager;
#endif

namespace fb = plasma::flatbuf;

namespace plasma {
Expand All @@ -56,38 +47,6 @@ using fb::PlasmaError;

using arrow::MutableBuffer;

// ----------------------------------------------------------------------
// GPU support

#ifdef PLASMA_CUDA

namespace {

struct GpuProcessHandle {
/// Pointer to CUDA buffer that is backing this GPU object.
std::shared_ptr<CudaBuffer> ptr;
/// Number of client using this GPU object.
int client_count;
};

// This is necessary as IPC handles can only be mapped once per process.
// Thus if multiple clients in the same process get the same gpu object,
// they need to access the same mapped CudaBuffer.
std::unordered_map<ObjectID, GpuProcessHandle *> gpu_object_map;
std::mutex gpu_mutex;

// Return a new CudaBuffer pointing to the same data as the GpuProcessHandle,
// but able to persist after the original IPC-backed buffer is closed
// (ARROW-5924).
std::shared_ptr<Buffer> MakeBufferFromGpuProcessHandle(GpuProcessHandle *handle) {
return std::make_shared<CudaBuffer>(handle->ptr->address(), handle->ptr->size(),
handle->ptr->context());
}

} // namespace

#endif

// ----------------------------------------------------------------------
// PlasmaBuffer

Expand Down Expand Up @@ -245,22 +204,11 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
std::unordered_set<ObjectID> deletion_cache_;
/// A mutex which protects this class.
std::recursive_mutex client_mutex_;

#ifdef PLASMA_CUDA
/// Cuda Device Manager.
arrow::cuda::CudaDeviceManager *manager_;
#endif
};

PlasmaBuffer::~PlasmaBuffer() { RAY_UNUSED(client_->Release(object_id_)); }

PlasmaClient::Impl::Impl() : store_capacity_(0) {
#ifdef PLASMA_CUDA
auto maybe_manager = CudaDeviceManager::Instance();
DCHECK_OK(maybe_manager.status());
manager_ = *maybe_manager;
#endif
}
PlasmaClient::Impl::Impl() : store_capacity_(0) {}

PlasmaClient::Impl::~Impl() {}

Expand Down Expand Up @@ -363,25 +311,7 @@ Status PlasmaClient::Impl::HandleCreateReply(const ObjectID &object_id,
memcpy((*data)->mutable_data() + object.data_size, metadata, object.metadata_size);
}
} else {
#ifdef PLASMA_CUDA
std::shared_ptr<CudaContext> context;
ARROW_ASSIGN_OR_RAISE(context, manager_->GetContext(device_num - 1));
GpuProcessHandle *handle = new GpuProcessHandle();
handle->client_count = 2;
ARROW_ASSIGN_OR_RAISE(handle->ptr, context->OpenIpcBuffer(*object.ipc_handle));
{
std::lock_guard<std::mutex> lock(gpu_mutex);
gpu_object_map[object_id] = handle;
}
if (metadata != NULL) {
// Copy the metadata to the buffer.
CudaBufferWriter writer(handle->ptr);
RAY_RETURN_NOT_OK(writer.WriteAt(object.data_size, metadata, metadata_size));
}
*data = MakeBufferFromGpuProcessHandle(handle);
#else
RAY_LOG(FATAL) << "Arrow GPU library is not enabled.";
#endif
RAY_LOG(FATAL) << "GPU is not enabled.";
}

// Increment the count of the number of instances of this object that this
Expand Down Expand Up @@ -465,15 +395,7 @@ Status PlasmaClient::Impl::GetBuffers(
physical_buf = std::make_shared<Buffer>(
data + object->data_offset, object->data_size + object->metadata_size);
} else {
#ifdef PLASMA_CUDA
std::lock_guard<std::mutex> lock(gpu_mutex);
auto iter = gpu_object_map.find(object_ids[i]);
RAY_CHECK(iter != gpu_object_map.end());
iter->second->client_count++;
physical_buf = MakeBufferFromGpuProcessHandle(iter->second);
#else
RAY_LOG(FATAL) << "Arrow GPU library is not enabled.";
#endif
RAY_LOG(FATAL) << "GPU library is not enabled.";
}
physical_buf = wrap_buffer(object_ids[i], physical_buf);
object_buffers[i].data = SliceBuffer(physical_buf, 0, object->data_size);
Expand Down Expand Up @@ -530,25 +452,7 @@ Status PlasmaClient::Impl::GetBuffers(
physical_buf = std::make_shared<Buffer>(
data + object->data_offset, object->data_size + object->metadata_size);
} else {
#ifdef PLASMA_CUDA
std::lock_guard<std::mutex> lock(gpu_mutex);
auto iter = gpu_object_map.find(object_ids[i]);
if (iter == gpu_object_map.end()) {
std::shared_ptr<CudaContext> context;
ARROW_ASSIGN_OR_RAISE(context, manager_->GetContext(object->device_num - 1));
GpuProcessHandle *obj_handle = new GpuProcessHandle();
obj_handle->client_count = 1;
ARROW_ASSIGN_OR_RAISE(obj_handle->ptr,
context->OpenIpcBuffer(*object->ipc_handle));
gpu_object_map[object_ids[i]] = obj_handle;
physical_buf = MakeBufferFromGpuProcessHandle(obj_handle);
} else {
iter->second->client_count++;
physical_buf = MakeBufferFromGpuProcessHandle(iter->second);
}
#else
RAY_LOG(FATAL) << "Arrow GPU library is not enabled.";
#endif
}
// Finish filling out the return values.
physical_buf = wrap_buffer(object_ids[i], physical_buf);
Expand Down Expand Up @@ -611,18 +515,6 @@ Status PlasmaClient::Impl::Release(const ObjectID &object_id) {
auto object_entry = objects_in_use_.find(object_id);
RAY_CHECK(object_entry != objects_in_use_.end());

#ifdef PLASMA_CUDA
if (object_entry->second->object.device_num != 0) {
std::lock_guard<std::mutex> lock(gpu_mutex);
auto iter = gpu_object_map.find(object_id);
RAY_CHECK(iter != gpu_object_map.end());
if (--iter->second->client_count == 0) {
delete iter->second;
gpu_object_map.erase(iter);
}
}
#endif

object_entry->second->count -= 1;
RAY_CHECK(object_entry->second->count >= 0);
// Check if the client is no longer using this object.
Expand Down Expand Up @@ -706,17 +598,6 @@ Status PlasmaClient::Impl::Abort(const ObjectID &object_id) {
return Status::Invalid("Plasma client cannot have a reference to the buffer.");
}

#ifdef PLASMA_CUDA
if (object_entry->second->object.device_num != 0) {
std::lock_guard<std::mutex> lock(gpu_mutex);
auto iter = gpu_object_map.find(object_id);
RAY_CHECK(iter != gpu_object_map.end());
RAY_CHECK(iter->second->client_count == 1);
delete iter->second;
gpu_object_map.erase(iter);
}
#endif

// Send the abort request.
RAY_RETURN_NOT_OK(SendAbortRequest(store_conn_, object_id));
// Decrease the reference count to zero, then remove the object.
Expand Down
18 changes: 0 additions & 18 deletions src/ray/object_manager/plasma/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
#include "ray/object_manager/format/object_manager_generated.h"
#include "ray/object_manager/plasma/compat.h"

#ifdef PLASMA_CUDA
#include "arrow/gpu/cuda_api.h"
#endif

namespace plasma {

using ray::NodeID;
Expand All @@ -51,12 +47,6 @@ enum class ObjectState : int {
PLASMA_EVICTED = 3,
};

namespace internal {

struct CudaIpcPlaceholder {};

} // namespace internal

/// This type is used by the Plasma store. It is here because it is exposed to
/// the eviction policy.
struct ObjectTableEntry {
Expand Down Expand Up @@ -92,16 +82,8 @@ struct ObjectTableEntry {
int64_t create_time;
/// How long creation of this object took.
int64_t construct_duration;

/// The state of the object, e.g., whether it is open or sealed.
ObjectState state;

#ifdef PLASMA_CUDA
/// IPC GPU handle to share with clients.
std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle;
#else
std::shared_ptr<internal::CudaIpcPlaceholder> ipc_handle;
#endif
};

/// Mapping from ObjectIDs to information about the object.
Expand Down
19 changes: 4 additions & 15 deletions src/ray/object_manager/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,13 @@
#include "ray/object_manager/plasma/common.h"
#include "ray/object_manager/plasma/compat.h"

#ifdef PLASMA_CUDA
using arrow::cuda::CudaIpcMemHandle;
#endif

namespace plasma {

/// Allocation granularity used in plasma for object allocation.
constexpr int64_t kBlockSize = 64;

// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
struct PlasmaObject {
#ifdef PLASMA_CUDA
// IPC handle for Cuda.
std::shared_ptr<CudaIpcMemHandle> ipc_handle;
#endif
/// The file descriptor of the memory mapped file in the store. It is used as
/// a unique identifier of the file in the client to look up the corresponding
/// file descriptor on the client's side.
Expand All @@ -59,13 +51,10 @@ struct PlasmaObject {
int64_t mmap_size;

bool operator==(const PlasmaObject &other) const {
return (
#ifdef PLASMA_CUDA
(ipc_handle == other.ipc_handle) &&
#endif
(store_fd == other.store_fd) && (data_offset == other.data_offset) &&
(metadata_offset == other.metadata_offset) && (data_size == other.data_size) &&
(metadata_size == other.metadata_size) && (device_num == other.device_num));
return ((store_fd == other.store_fd) && (data_offset == other.data_offset) &&
(metadata_offset == other.metadata_offset) &&
(data_size == other.data_size) && (metadata_size == other.metadata_size) &&
(device_num == other.device_num));
}
};

Expand Down
46 changes: 1 addition & 45 deletions src/ray/object_manager/plasma/protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,12 @@

#include <utility>

#include "arrow/util/ubsan.h"
#include "flatbuffers/flatbuffers.h"

#include "ray/object_manager/plasma/common.h"
#include "ray/object_manager/plasma/connection.h"
#include "ray/object_manager/plasma/plasma_generated.h"

#ifdef PLASMA_CUDA
#include "arrow/gpu/cuda_api.h"
#endif
#include "arrow/util/ubsan.h"

namespace fb = plasma::flatbuf;

namespace plasma {
Expand Down Expand Up @@ -256,15 +251,6 @@ Status SendCreateReply(const std::shared_ptr<Client> &client, ObjectID object_id
object.data_size, object.metadata_offset,
object.metadata_size, object.device_num);
auto object_string = fbb.CreateString(object_id.Binary());
#ifdef PLASMA_CUDA
flatbuffers::Offset<fb::CudaHandle> ipc_handle;
if (object.device_num != 0) {
std::shared_ptr<arrow::Buffer> handle;
ARROW_ASSIGN_OR_RAISE(handle, object.ipc_handle->Serialize());
ipc_handle =
fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size()));
}
#endif
fb::PlasmaCreateReplyBuilder crb(fbb);
crb.add_error(static_cast<PlasmaError>(error_code));
crb.add_plasma_object(&plasma_object);
Expand All @@ -273,11 +259,7 @@ Status SendCreateReply(const std::shared_ptr<Client> &client, ObjectID object_id
crb.add_store_fd(FD2INT(object.store_fd));
crb.add_mmap_size(object.mmap_size);
if (object.device_num != 0) {
#ifdef PLASMA_CUDA
crb.add_ipc_handle(ipc_handle);
#else
RAY_LOG(FATAL) << "This should be unreachable.";
#endif
}
auto message = crb.Finish();
return PlasmaSend(client, MessageType::PlasmaCreateReply, &fbb, message);
Expand Down Expand Up @@ -306,13 +288,6 @@ Status ReadCreateReply(uint8_t *data, size_t size, ObjectID *object_id,
*mmap_size = message->mmap_size();

object->device_num = message->plasma_object()->device_num();
#ifdef PLASMA_CUDA
if (object->device_num != 0) {
ARROW_ASSIGN_OR_RAISE(
object->ipc_handle,
CudaIpcMemHandle::FromBuffer(message->ipc_handle()->handle()->data()));
}
#endif
return PlasmaErrorStatus(message->error());
}

Expand Down Expand Up @@ -594,14 +569,6 @@ Status SendGetReply(const std::shared_ptr<Client> &client, ObjectID object_ids[]
objects.push_back(PlasmaObjectSpec(FD2INT(object.store_fd), object.data_offset,
object.data_size, object.metadata_offset,
object.metadata_size, object.device_num));
#ifdef PLASMA_CUDA
if (object.device_num != 0) {
std::shared_ptr<arrow::Buffer> handle;
ARROW_ASSIGN_OR_RAISE(handle, object.ipc_handle->Serialize());
handles.push_back(
fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size())));
}
#endif
}
std::vector<int> store_fds_as_int;
for (MEMFD_TYPE store_fd : store_fds) {
Expand All @@ -623,9 +590,6 @@ Status ReadGetReply(uint8_t *data, size_t size, ObjectID object_ids[],
std::vector<int64_t> &mmap_sizes) {
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaGetReply>(data);
#ifdef PLASMA_CUDA
int handle_pos = 0;
#endif
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::FromBinary(message->object_ids()->Get(i)->str());
Expand All @@ -638,14 +602,6 @@ Status ReadGetReply(uint8_t *data, size_t size, ObjectID object_ids[],
plasma_objects[i].metadata_offset = object->metadata_offset();
plasma_objects[i].metadata_size = object->metadata_size();
plasma_objects[i].device_num = object->device_num();
#ifdef PLASMA_CUDA
if (object->device_num() != 0) {
const void *ipc_handle = message->handles()->Get(handle_pos)->handle()->data();
ARROW_ASSIGN_OR_RAISE(plasma_objects[i].ipc_handle,
CudaIpcMemHandle::FromBuffer(ipc_handle));
handle_pos++;
}
#endif
}
RAY_CHECK(message->store_fds()->size() == message->mmap_sizes()->size());
for (uoffset_t i = 0; i < message->store_fds()->size(); i++) {
Expand Down
Loading

0 comments on commit bf7f6a7

Please sign in to comment.