Skip to content

Commit

Permalink
Consolidation plan REST support - part1: handler (#4534)
Browse files Browse the repository at this point in the history
This is adding support for serializing the Consolidation Plan via REST,
in order to make it usable with TileDB Cloud.
This first part is adding the C-API handler Cloud needs to call from Go
to implement this feature. Part 2 that integrates with the rest client
will follow as a separate PR.

Design doc:
https://www.notion.so/Consolidation-Plan-Request-Handler-d25fb00674444bcdaed74a8e46761b97

---
TYPE: IMPROVEMENT
DESC: Consolidation plan REST support - part1: handler
  • Loading branch information
ypatia authored Nov 30, 2023
1 parent aea92ab commit e1929ed
Show file tree
Hide file tree
Showing 12 changed files with 1,088 additions and 22 deletions.
4 changes: 2 additions & 2 deletions test/src/test-capi-consolidation-plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,15 @@ TEST_CASE_METHOD(
ctx_.ptr().get(), consolidation_plan, 0, &num_fragments));
CHECK(num_fragments == 11);
check_last_error(
"Error: ConsolidationPlan: Trying to access a node that doesn't exists");
"Error: ConsolidationPlan: Trying to access a node that doesn't exist.");

const char* frag_uri = nullptr;
CHECK(
TILEDB_ERR == tiledb_consolidation_plan_get_fragment_uri(
ctx_.ptr().get(), consolidation_plan, 0, 0, &frag_uri));
CHECK(frag_uri == nullptr);
check_last_error(
"Error: ConsolidationPlan: Trying to access a node that doesn't exists");
"Error: ConsolidationPlan: Trying to access a node that doesn't exist.");

tiledb_consolidation_plan_free(&consolidation_plan);
}
Expand Down
76 changes: 63 additions & 13 deletions test/src/test-cppapi-consolidation-plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@
*/

#include "test/support/src/helpers.h"
#include "tiledb/api/c_api/buffer/buffer_api_internal.h"
#include "tiledb/api/c_api/config/config_api_internal.h"
#include "tiledb/sm/c_api/tiledb_serialization.h"
#include "tiledb/sm/cpp_api/tiledb"
#include "tiledb/sm/cpp_api/tiledb_experimental"
#include "tiledb/sm/enums/serialization_type.h"
#include "tiledb/sm/serialization/consolidation.h"

#include <test/support/tdb_catch.h>

Expand All @@ -46,6 +51,7 @@ struct CppConsolidationPlanFx {
// TileDB context.
Context ctx_;
VFS vfs_;
Config cfg_;

std::string key_ = "0123456789abcdeF0123456789abcdeF";
const tiledb_encryption_type_t enc_type_ = TILEDB_AES_256_GCM;
Expand All @@ -67,15 +73,20 @@ struct CppConsolidationPlanFx {
bool is_array(const std::string& array_name);
void check_last_error(std::string expected);
void validate_plan(
uint64_t fragment_size,
const Array& array,
ConsolidationPlan& plan,
std::vector<std::vector<std::string>> expected_plan);
tiledb::sm::ConsolidationPlan call_handler(
uint64_t fragment_size,
const Array& array,
tiledb::sm::SerializationType stype);
};

CppConsolidationPlanFx::CppConsolidationPlanFx()
: vfs_(ctx_) {
Config config;
config.set("sm.consolidation.buffer_size", "1000");
ctx_ = Context(config);
cfg_.set("sm.consolidation.buffer_size", "1000");
ctx_ = Context(cfg_);
vfs_ = VFS(ctx_);

remove_sparse_array();
Expand Down Expand Up @@ -188,9 +199,44 @@ void CppConsolidationPlanFx::check_last_error(std::string expected) {
CHECK(msg == expected);
}

tiledb::sm::ConsolidationPlan CppConsolidationPlanFx::call_handler(
uint64_t fragment_size,
const Array& array,
tiledb::sm::SerializationType stype) {
auto req_buf = tiledb_buffer_handle_t::make_handle();
auto resp_buf = tiledb_buffer_handle_t::make_handle();

tiledb::sm::serialization::serialize_consolidation_plan_request(
fragment_size, cfg_.ptr()->config(), stype, req_buf->buffer());
auto rval = tiledb_handle_consolidation_plan_request(
ctx_.ptr().get(),
array.ptr().get(),
static_cast<tiledb_serialization_type_t>(stype),
req_buf,
resp_buf);
REQUIRE(rval == TILEDB_OK);

auto fragments_per_node =
tiledb::sm::serialization::deserialize_consolidation_plan_response(
stype, resp_buf->buffer());
// construct consolidation plan from the members we got from serialization
return tiledb::sm::ConsolidationPlan(fragment_size, fragments_per_node);
}

void CppConsolidationPlanFx::validate_plan(
[[maybe_unused]] uint64_t fragment_size,
[[maybe_unused]] const Array& array,
ConsolidationPlan& plan,
std::vector<std::vector<std::string>> expected_plan) {
#ifdef TILEDB_SERIALIZATION
auto stype = GENERATE(
tiledb::sm::SerializationType::JSON,
tiledb::sm::SerializationType::CAPNP);
auto deserialized_plan = call_handler(fragment_size, array, stype);
// Now the two plans should be exactly the same.
REQUIRE(plan.dump() == deserialized_plan.dump());
#endif

// Take all the nodes in the plan, make a string out of them, the string will
// be the sorted fragment URIs.
std::vector<std::string> string_plan(plan.num_nodes());
Expand Down Expand Up @@ -247,11 +293,11 @@ TEST_CASE_METHOD(

CHECK_THROWS_WITH(
consolidation_plan.num_fragments(0),
"Error: ConsolidationPlan: Trying to access a node that doesn't exists");
"Error: ConsolidationPlan: Trying to access a node that doesn't exist.");

CHECK_THROWS_WITH(
consolidation_plan.fragment_uri(0, 0),
"Error: ConsolidationPlan: Trying to access a node that doesn't exists");
"Error: ConsolidationPlan: Trying to access a node that doesn't exist.");
}

TEST_CASE_METHOD(
Expand Down Expand Up @@ -286,7 +332,7 @@ TEST_CASE_METHOD(
ConsolidationPlan consolidation_plan(ctx_, array, 1);

// Validate the plan.
validate_plan(consolidation_plan, {{uri1, uri2}});
validate_plan(1, array, consolidation_plan, {{uri1, uri2}});
}

TEST_CASE_METHOD(
Expand All @@ -312,7 +358,7 @@ TEST_CASE_METHOD(
ConsolidationPlan consolidation_plan(ctx_, array, 1);

// Validate the plan.
validate_plan(consolidation_plan, {{uri1, uri2}, {uri3, uri4}});
validate_plan(1, array, consolidation_plan, {{uri1, uri2}, {uri3, uri4}});
}

TEST_CASE_METHOD(
Expand All @@ -338,7 +384,7 @@ TEST_CASE_METHOD(
ConsolidationPlan consolidation_plan(ctx_, array, 1);

// Validate the plan.
validate_plan(consolidation_plan, {{uri1, uri2, uri3}});
validate_plan(1, array, consolidation_plan, {{uri1, uri2, uri3}});
}

TEST_CASE_METHOD(
Expand All @@ -361,7 +407,7 @@ TEST_CASE_METHOD(
ConsolidationPlan consolidation_plan(ctx_, array, 10 * 1024);

// Validate the plan.
validate_plan(consolidation_plan, {{uri1}});
validate_plan(10 * 1024, array, consolidation_plan, {{uri1}});
}

TEST_CASE_METHOD(
Expand All @@ -380,7 +426,7 @@ TEST_CASE_METHOD(
ConsolidationPlan consolidation_plan(ctx_, array, 100 * 1024);

// Validate the plan.
validate_plan(consolidation_plan, {{uri1, uri2}});
validate_plan(100 * 1024, array, consolidation_plan, {{uri1, uri2}});
}

TEST_CASE_METHOD(
Expand Down Expand Up @@ -409,7 +455,7 @@ TEST_CASE_METHOD(

// Validate the plan, we should only have a node for the large fragment to be
// split.
validate_plan(consolidation_plan, {{uri2}});
validate_plan(100 * 1024, array, consolidation_plan, {{uri2}});
}

TEST_CASE_METHOD(
Expand Down Expand Up @@ -438,7 +484,7 @@ TEST_CASE_METHOD(

// Validate the plan, we should only have a node for the large fragment to be
// split.
validate_plan(consolidation_plan, {{uri1, uri3}, {uri2}});
validate_plan(100 * 1024, array, consolidation_plan, {{uri1, uri3}, {uri2}});
}

TEST_CASE_METHOD(
Expand Down Expand Up @@ -486,5 +532,9 @@ TEST_CASE_METHOD(
ConsolidationPlan consolidation_plan(ctx_, array, 100 * 1024);

// Validate the plan.
validate_plan(consolidation_plan, {{uri1, uri2, uri3}, {uri6, uri7}, {uri5}});
validate_plan(
100 * 1024,
array,
consolidation_plan,
{{uri1, uri2, uri3}, {uri6, uri7}, {uri5}});
}
62 changes: 62 additions & 0 deletions test/src/unit-request-handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,23 @@ struct HandleQueryPlanRequestFx : RequestHandlerFx {
std::string call_handler(SerializationType stype, Query& query);
};

struct HandleConsolidationPlanRequestFx : RequestHandlerFx {
HandleConsolidationPlanRequestFx()
: RequestHandlerFx("consolidation_plan_handler") {
}

virtual shared_ptr<ArraySchema> create_schema() override {
auto schema = make_shared<ArraySchema>(HERE(), ArrayType::SPARSE);
auto dim = make_shared<Dimension>(HERE(), "dim1", Datatype::INT32);
int range[2] = {0, 1000};
throw_if_not_ok(dim->set_domain(range));
auto dom = make_shared<Domain>(HERE());
throw_if_not_ok(dom->add_dimension(dim));
throw_if_not_ok(schema->set_domain(dom));
return schema;
};
};

/* ********************************* */
/* Testing Array Schema Loading */
/* ********************************* */
Expand Down Expand Up @@ -266,6 +283,51 @@ TEST_CASE_METHOD(
REQUIRE(rval != TILEDB_OK);
}

TEST_CASE_METHOD(
HandleConsolidationPlanRequestFx,
"tiledb_handle_consolidation_plan_request - error checks",
"[request_handler][consolidation-plan][errors]") {
create_array();

auto ctx = tiledb::Context();
auto array = tiledb::Array(ctx, uri_.to_string(), TILEDB_READ);
auto stype = TILEDB_CAPNP;
auto req_buf = tiledb_buffer_handle_t::make_handle();
auto resp_buf = tiledb_buffer_handle_t::make_handle();

auto rval = tiledb_handle_consolidation_plan_request(
nullptr,
array.ptr().get(),
static_cast<tiledb_serialization_type_t>(stype),
req_buf,
resp_buf);
REQUIRE(rval != TILEDB_OK);

rval = tiledb_handle_consolidation_plan_request(
ctx.ptr().get(),
nullptr,
static_cast<tiledb_serialization_type_t>(stype),
req_buf,
resp_buf);
REQUIRE(rval != TILEDB_OK);

rval = tiledb_handle_consolidation_plan_request(
ctx.ptr().get(),
array.ptr().get(),
static_cast<tiledb_serialization_type_t>(stype),
nullptr,
resp_buf);
REQUIRE(rval != TILEDB_OK);

rval = tiledb_handle_consolidation_plan_request(
ctx.ptr().get(),
array.ptr().get(),
static_cast<tiledb_serialization_type_t>(stype),
req_buf,
nullptr);
REQUIRE(rval != TILEDB_OK);
}

/* ********************************* */
/* Testing Support Code */
/* ********************************* */
Expand Down
48 changes: 45 additions & 3 deletions tiledb/sm/c_api/tiledb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
#include "tiledb/sm/serialization/array_schema.h"
#include "tiledb/sm/serialization/array_schema_evolution.h"
#include "tiledb/sm/serialization/config.h"
#include "tiledb/sm/serialization/consolidation.h"
#include "tiledb/sm/serialization/enumeration.h"
#include "tiledb/sm/serialization/fragment_info.h"
#include "tiledb/sm/serialization/fragments.h"
Expand Down Expand Up @@ -4367,6 +4368,39 @@ capi_return_t tiledb_handle_query_plan_request(
return TILEDB_OK;
}

capi_return_t tiledb_handle_consolidation_plan_request(
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialization_type,
const tiledb_buffer_t* request,
tiledb_buffer_t* response) {
if (sanity_check(ctx, array) == TILEDB_ERR) {
throw std::invalid_argument("Array paramter must be valid.");
}

api::ensure_buffer_is_valid(request);
api::ensure_buffer_is_valid(response);

// Error if array is not open
if (!array->array_->is_open()) {
throw std::logic_error(
"Cannot get consolidation plan. Input array is not open");
}

auto fragment_size =
tiledb::sm::serialization::deserialize_consolidation_plan_request(
static_cast<tiledb::sm::SerializationType>(serialization_type),
request->buffer());
sm::ConsolidationPlan plan(array->array_, fragment_size);

tiledb::sm::serialization::serialize_consolidation_plan_response(
plan,
static_cast<tiledb::sm::SerializationType>(serialization_type),
response->buffer());

return TILEDB_OK;
}

/* ****************************** */
/* C++ API */
/* ****************************** */
Expand Down Expand Up @@ -5083,10 +5117,7 @@ int32_t tiledb_consolidation_plan_dump_json_str(
return TILEDB_ERR;
}

consolidation_plan->consolidation_plan_->dump();

std::string str = consolidation_plan->consolidation_plan_->dump();
;

*out = static_cast<char*>(std::malloc(str.size() + 1));
if (*out == nullptr) {
Expand Down Expand Up @@ -7245,6 +7276,17 @@ CAPI_INTERFACE(
ctx, array, serialization_type, request, response);
}

CAPI_INTERFACE(
handle_consolidation_plan_request,
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialization_type,
const tiledb_buffer_t* request,
tiledb_buffer_t* response) {
return api_entry<tiledb::api::tiledb_handle_consolidation_plan_request>(
ctx, array, serialization_type, request, response);
}

/* ****************************** */
/* C++ API */
/* ****************************** */
Expand Down
19 changes: 19 additions & 0 deletions tiledb/sm/c_api/tiledb_serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,25 @@ TILEDB_EXPORT capi_return_t tiledb_handle_query_plan_request(
const tiledb_buffer_t* request,
tiledb_buffer_t* response) TILEDB_NOEXCEPT;

/**
* Process a consolidation plan request.
*
* @param ctx The TileDB context.
* @param array The TileDB Array.
* @param serialization_type The type of Cap'n Proto serialization used.
* @param request A buffer containing the ConsolidationPlanRequest Capnp
* message.
* @param response An allocated buffer that will contain the
* ConsolidationPlanResponse Capnp message.
* @return capi_return_t TILEDB_OK on success, TILEDB_ERR on error.
*/
TILEDB_EXPORT capi_return_t tiledb_handle_consolidation_plan_request(
tiledb_ctx_t* ctx,
tiledb_array_t* array,
tiledb_serialization_type_t serialization_type,
const tiledb_buffer_t* request,
tiledb_buffer_t* response) TILEDB_NOEXCEPT;

#ifdef __cplusplus
}
#endif
Expand Down
8 changes: 8 additions & 0 deletions tiledb/sm/consolidation_plan/consolidation_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ ConsolidationPlan::ConsolidationPlan(
generate(array);
}

ConsolidationPlan::ConsolidationPlan(
uint64_t fragment_size,
std::vector<std::vector<std::string>> fragment_uris_per_node)
: num_nodes_{fragment_uris_per_node.size()}
, fragment_uris_per_node_{fragment_uris_per_node}
, desired_fragment_size_{fragment_size} {
}

ConsolidationPlan::~ConsolidationPlan() = default;

/* ********************************* */
Expand Down
Loading

0 comments on commit e1929ed

Please sign in to comment.