Skip to content

Commit

Permalink
Consolidation Plan REST support - part 2: rest client (#4537)
Browse files Browse the repository at this point in the history
This is part2 of #4534 , adding
the `rest_client` part.

---
TYPE: FEATURE
DESC: Consolidation Plan REST support - part 2: rest client
  • Loading branch information
ypatia authored Mar 12, 2024
1 parent 2e25ad2 commit a0e82bd
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 45 deletions.
85 changes: 41 additions & 44 deletions test/src/test-capi-consolidation-plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* Tests the ConsolidationPlan API
*/

#include <test/support/src/vfs_helpers.h>
#include "test/support/src/helpers.h"
#include "tiledb/api/c_api/context/context_api_internal.h"
#include "tiledb/sm/c_api/tiledb.h"
Expand All @@ -42,17 +43,13 @@
using namespace tiledb;
using namespace tiledb::test;

struct ConsolidationPlanFx {
// Constants.
const char* SPARSE_ARRAY_NAME = "test_deletes_array";

// TileDB context.
Context ctx_;
VFS vfs_;

std::string key_ = "0123456789abcdeF0123456789abcdeF";
const tiledb_encryption_type_t enc_type_ = TILEDB_AES_256_GCM;
#ifndef TILEDB_TESTS_ENABLE_REST
constexpr bool rest_tests = false;
#else
constexpr bool rest_tests = true;
#endif

struct ConsolidationPlanFx {
// Constructors/destructors.
ConsolidationPlanFx();
~ConsolidationPlanFx();
Expand All @@ -69,20 +66,39 @@ struct ConsolidationPlanFx {
void remove_array(const std::string& array_name);
bool is_array(const std::string& array_name);
void check_last_error(std::string expected);

// TileDB context.
Context ctx_;
// Full URI initialized using fs_vec_ random temp directory.
std::string array_name_;

// Vector of supported filsystems
tiledb_vfs_handle_t* vfs_c_{nullptr};
tiledb_ctx_handle_t* ctx_c_{nullptr};
const std::vector<std::unique_ptr<test::SupportedFs>> fs_vec_;

std::string key_ = "0123456789abcdeF0123456789abcdeF";
const tiledb_encryption_type_t enc_type_ = TILEDB_AES_256_GCM;
};

ConsolidationPlanFx::ConsolidationPlanFx()
: vfs_(ctx_) {
: fs_vec_(test::vfs_test_get_fs_vec()) {
Config config;
config.set("sm.consolidation.buffer_size", "1000");
ctx_ = Context(config);
vfs_ = VFS(ctx_);

remove_sparse_array();
REQUIRE(
test::vfs_test_init(fs_vec_, &ctx_c_, &vfs_c_, config.ptr().get()).ok());
ctx_ = Context(ctx_c_);
std::string temp_dir = fs_vec_[0]->temp_dir();
if constexpr (rest_tests) {
array_name_ = "tiledb://unit/";
}
array_name_ += temp_dir + "test_consolidation_plan_array";
test::vfs_test_create_temp_dir(ctx_c_, vfs_c_, temp_dir);
}

ConsolidationPlanFx::~ConsolidationPlanFx() {
remove_sparse_array();
Array::delete_array(ctx_, array_name_);
REQUIRE(test::vfs_test_close(fs_vec_, ctx_c_, vfs_c_).ok());
}

void ConsolidationPlanFx::create_sparse_array(bool allows_dups, bool encrypt) {
Expand Down Expand Up @@ -115,9 +131,9 @@ void ConsolidationPlanFx::create_sparse_array(bool allows_dups, bool encrypt) {
schema.set_coords_filter_list(filter_list);

if (encrypt) {
Array::create(SPARSE_ARRAY_NAME, schema, enc_type_, key_);
Array::create(array_name_, schema, enc_type_, key_);
} else {
Array::create(SPARSE_ARRAY_NAME, schema);
Array::create(array_name_, schema);
}
}

Expand All @@ -132,16 +148,13 @@ void ConsolidationPlanFx::write_sparse(
if (encrypt) {
array = std::make_unique<Array>(
ctx_,
SPARSE_ARRAY_NAME,
array_name_,
TILEDB_WRITE,
TemporalPolicy(TimeTravel, timestamp),
EncryptionAlgorithm(AESGCM, key_.c_str()));
} else {
array = std::make_unique<Array>(
ctx_,
SPARSE_ARRAY_NAME,
TILEDB_WRITE,
TemporalPolicy(TimeTravel, timestamp));
ctx_, array_name_, TILEDB_WRITE, TemporalPolicy(TimeTravel, timestamp));
}

// Create query.
Expand All @@ -152,28 +165,12 @@ void ConsolidationPlanFx::write_sparse(
query.set_data_buffer("d2", dim2);

// Submit/finalize the query.
query.submit();
query.finalize();
query.submit_and_finalize();

// Close array.
array->close();
}

void ConsolidationPlanFx::remove_array(const std::string& array_name) {
if (!is_array(array_name))
return;

vfs_.remove_dir(array_name);
}

void ConsolidationPlanFx::remove_sparse_array() {
remove_array(SPARSE_ARRAY_NAME);
}

bool ConsolidationPlanFx::is_array(const std::string& array_name) {
return vfs_.is_dir(array_name);
}

void ConsolidationPlanFx::check_last_error(std::string expected) {
const char* msg = "unset";
tiledb_error_t* err{nullptr};
Expand All @@ -188,11 +185,11 @@ void ConsolidationPlanFx::check_last_error(std::string expected) {
TEST_CASE_METHOD(
ConsolidationPlanFx,
"CAPI: Consolidation plan",
"[capi][consolidation-plan]") {
"[capi][consolidation-plan][rest]") {
create_sparse_array();
write_sparse({0, 1, 2, 3}, {1, 1, 1, 2}, {1, 2, 4, 3}, 1);

Array array{ctx_, SPARSE_ARRAY_NAME, TILEDB_READ};
Array array{ctx_, array_name_, TILEDB_READ};

tiledb_consolidation_plan_t* consolidation_plan{};
CHECK(
Expand Down Expand Up @@ -231,11 +228,11 @@ TEST_CASE_METHOD(
TEST_CASE_METHOD(
ConsolidationPlanFx,
"CAPI: Consolidation plan dump",
"[capi][consolidation-plan][dump]") {
"[capi][consolidation-plan][dump][rest]") {
create_sparse_array();
write_sparse({0, 1, 2, 3}, {1, 1, 1, 2}, {1, 2, 4, 3}, 1);

Array array{ctx_, SPARSE_ARRAY_NAME, TILEDB_READ};
Array array{ctx_, array_name_, TILEDB_READ};

tiledb_consolidation_plan_t* consolidation_plan{};
CHECK(
Expand Down
5 changes: 5 additions & 0 deletions tiledb/sm/array/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,11 @@ class Array {
/** Load array directory for non-remote arrays */
const ArrayDirectory& load_array_directory();

/* Get the REST client */
[[nodiscard]] inline shared_ptr<RestClient> rest_client() const {
return resources_.rest_client();
}

private:
/* ********************************* */
/* PRIVATE ATTRIBUTES */
Expand Down
16 changes: 15 additions & 1 deletion tiledb/sm/consolidation_plan/consolidation_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "tiledb/sm/consolidation_plan/consolidation_plan.h"
#include "tiledb/common/common.h"
#include "tiledb/common/logger.h"
#include "tiledb/sm/rest/rest_client.h"

using namespace tiledb::sm;
using namespace tiledb::common;
Expand All @@ -44,7 +45,20 @@ using namespace tiledb::common;
ConsolidationPlan::ConsolidationPlan(
shared_ptr<Array> array, uint64_t fragment_size)
: desired_fragment_size_(fragment_size) {
generate(array);
if (array->is_remote()) {
auto rest_client = array->rest_client();
if (!rest_client) {
throw std::runtime_error(
"Failed to create a consolidation plan; Remote array"
"with no REST client.");
}
// reach out to the REST client to populate class members
fragment_uris_per_node_ = rest_client->post_consolidation_plan_from_rest(
array->array_uri(), array->config(), fragment_size);
num_nodes_ = fragment_uris_per_node_.size();
} else {
generate(array);
}
}

ConsolidationPlan::ConsolidationPlan(
Expand Down
49 changes: 49 additions & 0 deletions tiledb/sm/rest/rest_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,48 @@ Status RestClient::post_vacuum_to_rest(const URI& uri, const Config& config) {
stats_, url, serialization_type_, &serialized, &returned_data, cache_key);
}

std::vector<std::vector<std::string>>
RestClient::post_consolidation_plan_from_rest(
const URI& uri, const Config& config, uint64_t fragment_size) {
Buffer buff;
serialization::serialize_consolidation_plan_request(
fragment_size, config, serialization_type_, buff);

// Wrap in a list
BufferList serialized;
throw_if_not_ok(serialized.add_buffer(std::move(buff)));

// Init curl and form the URL
Curl curlc(logger_);
std::string array_ns, array_uri;
throw_if_not_ok(uri.get_rest_components(&array_ns, &array_uri));
const std::string cache_key = array_ns + ":" + array_uri;
throw_if_not_ok(
curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
"/" + curlc.url_escape(array_uri) +
"/consolidate/plan";

// Get the data
Buffer returned_data;
throw_if_not_ok(curlc.post_data(
stats_,
url,
serialization_type_,
&serialized,
&returned_data,
cache_key));
if (returned_data.data() == nullptr || returned_data.size() == 0) {
throw Status_RestError(
"Error getting query plan from REST; server returned no data.");
}

// Ensure data has a null delimiter for cap'n proto if using JSON
throw_if_not_ok(ensure_json_null_delimited_string(&returned_data));
return serialization::deserialize_consolidation_plan_response(
serialization_type_, returned_data);
}

#else

RestClient::RestClient() {
Expand Down Expand Up @@ -1789,6 +1831,13 @@ Status RestClient::post_vacuum_to_rest(const URI&, const Config&) {
Status_RestError("Cannot use rest client; serialization not enabled."));
}

std::vector<std::vector<std::string>>
RestClient::post_consolidation_plan_from_rest(
const URI&, const Config&, uint64_t) {
throw StatusException(
Status_RestError("Cannot use rest client; serialization not enabled."));
}

#endif // TILEDB_SERIALIZATION

} // namespace sm
Expand Down
11 changes: 11 additions & 0 deletions tiledb/sm/rest/rest_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,17 @@ class RestClient {
return rest_server_;
}

/**
* Get consolidation plan from the REST server via POST request.
*
* @param uri Array URI.
* @param config Config of the array.
* @param fragment_size Maximum fragment size for constructing the plan.
* @return The requested consolidation plan
*/
std::vector<std::vector<std::string>> post_consolidation_plan_from_rest(
const URI& uri, const Config& config, uint64_t fragment_size);

private:
/* ********************************* */
/* PRIVATE ATTRIBUTES */
Expand Down

0 comments on commit a0e82bd

Please sign in to comment.