Skip to content

Commit

Permalink
Load enumerations for all array schemas in a single request. (#5349)
Browse files Browse the repository at this point in the history
This fixes a performance regression added in
#5291 while loading
enumerations for all array schemas. The regression impacts arrays with
multiple evolved schemas. Previously #5291 introduced a loop over REST
requests for-each schema, this PR loads all enumerations for all schemas
in a single request.

---
TYPE: C_API
DESC: Introduce tiledb_array_load_enumerations_all_schemas

TYPE: CPP_API
DESC: Introduce ArrayExperimental::load_enumerations_all_schemas

---------

Co-authored-by: Ypatia Tsavliri <[email protected]>
  • Loading branch information
shaunrd0 and ypatia authored Oct 17, 2024
1 parent 8dbab75 commit b1dd2b4
Show file tree
Hide file tree
Showing 18 changed files with 1,029 additions and 98 deletions.
27 changes: 20 additions & 7 deletions test/src/unit-cppapi-enumerations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ TEST_CASE_METHOD(
"CPP API: Load All Enumerations - All Schemas",
"[enumeration][array][load-all-enumerations][all-schemas][rest]") {
create_array();

// Loading the array with array open v1 will only initialize the latest schema
// For the first test this is fine, we only need to load enumerations for the
// latest schema. In subsequent tests we will need to call
// ArrayExperimental::load_enumerations_all_schemas.
auto array = tiledb::Array(ctx_, uri_, TILEDB_READ);
auto schema = array.load_schema(ctx_, uri_);
REQUIRE(
Expand All @@ -344,6 +349,16 @@ TEST_CASE_METHOD(
false);
std::string schema_name_1 = schema.ptr()->array_schema()->name();

// If not using array open v3 just test that the correct exception is thrown
if (!array.ptr()->array()->use_refactored_array_open()) {
CHECK_THROWS_WITH(
ArrayExperimental::load_enumerations_all_schemas(ctx_, array),
Catch::Matchers::ContainsSubstring(
"The array must be opened using "
"`rest.use_refactored_array_open=true`"));
return;
}

// Evolve once to add an enumeration.
ArraySchemaEvolution ase(ctx_);
std::vector<std::string> var_values{"one", "two", "three"};
Expand All @@ -354,7 +369,7 @@ TEST_CASE_METHOD(
ase.add_attribute(attr4);
ase.array_evolve(uri_);
array.reopen();
ArrayExperimental::load_all_enumerations(ctx_, array);
CHECK_NOTHROW(ArrayExperimental::load_enumerations_all_schemas(ctx_, array));
auto all_schemas = array.ptr()->array()->array_schemas_all();
schema = array.load_schema(ctx_, uri_);
std::string schema_name_2 = schema.ptr()->array_schema()->name();
Expand All @@ -379,9 +394,8 @@ TEST_CASE_METHOD(
ase2.drop_attribute("attr1");
CHECK_NOTHROW(ase2.array_evolve(uri_));
// Apply evolution to the array and reopen.
CHECK_NOTHROW(array.close());
CHECK_NOTHROW(array.open(TILEDB_READ));
ArrayExperimental::load_all_enumerations(ctx_, array);
CHECK_NOTHROW(array.reopen());
CHECK_NOTHROW(ArrayExperimental::load_enumerations_all_schemas(ctx_, array));
all_schemas = array.ptr()->array()->array_schemas_all();
schema = array.load_schema(ctx_, uri_);
std::string schema_name_3 = schema.ptr()->array_schema()->name();
Expand Down Expand Up @@ -416,9 +430,8 @@ TEST_CASE_METHOD(
CHECK_NOTHROW(ase3.array_evolve(uri_));

// Apply evolution to the array and reopen.
CHECK_NOTHROW(array.close());
CHECK_NOTHROW(array.open(TILEDB_READ));
ArrayExperimental::load_all_enumerations(ctx_, array);
CHECK_NOTHROW(array.reopen());
CHECK_NOTHROW(ArrayExperimental::load_enumerations_all_schemas(ctx_, array));
all_schemas = array.ptr()->array()->array_schemas_all();
schema = array.load_schema(ctx_, uri_);
std::string schema_name_4 = schema.ptr()->array_schema()->name();
Expand Down
14 changes: 12 additions & 2 deletions test/src/unit-enumerations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,16 @@ TEST_CASE_METHOD(
REQUIRE(schema->is_enumeration_loaded("test_enmr") == false);
std::string schema_name_1 = schema->name();

// If not using array open v3 just test that the correct exception is thrown
if (!array->use_refactored_array_open()) {
CHECK_THROWS_WITH(
array->load_all_enumerations(true),
Catch::Matchers::ContainsSubstring(
"The array must be opened using "
"`rest.use_refactored_array_open=true`"));
return;
}

// Evolve once to add an enumeration.
auto ase = make_shared<ArraySchemaEvolution>(HERE(), memory_tracker_);
std::vector<std::string> var_values{"one", "two", "three"};
Expand All @@ -1141,7 +1151,7 @@ TEST_CASE_METHOD(
CHECK_NOTHROW(Array::evolve_array_schema(
ctx_.resources(), uri_, ase.get(), array->get_encryption_key()));
CHECK(array->reopen().ok());
CHECK_NOTHROW(array->load_all_enumerations());
CHECK_NOTHROW(array->load_all_enumerations(true));
auto all_schemas = array->array_schemas_all();
schema = array->array_schema_latest_ptr();
std::string schema_name_2 = schema->name();
Expand All @@ -1162,7 +1172,7 @@ TEST_CASE_METHOD(
CHECK_NOTHROW(Array::evolve_array_schema(
ctx_.resources(), uri_, ase.get(), array->get_encryption_key()));
CHECK(array->reopen().ok());
CHECK_NOTHROW(array->load_all_enumerations());
CHECK_NOTHROW(array->load_all_enumerations(true));
all_schemas = array->array_schemas_all();
schema = array->array_schema_latest_ptr();
std::string schema_name_3 = schema->name();
Expand Down
23 changes: 23 additions & 0 deletions tiledb/api/c_api/array/array_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,21 @@ capi_return_t tiledb_array_load_all_enumerations(const tiledb_array_t* array) {
return TILEDB_OK;
}

capi_return_t tiledb_array_load_enumerations_all_schemas(
const tiledb_array_t* array) {
ensure_array_is_valid(array);
// Array::array_schemas_all_ is only initialized using array open V3, so we
// won't have schemas to store the loaded enumerations unless array open V3 is
// used.
if (!array->array()->use_refactored_array_open()) {
throw CAPIException(
"Unable to load enumerations for all array schemas; The array must be "
"opened using `rest.use_refactored_array_open=true`");
}
array->load_all_enumerations(true);
return TILEDB_OK;
}

} // namespace tiledb::api

using tiledb::api::api_entry_context;
Expand Down Expand Up @@ -1054,3 +1069,11 @@ CAPI_INTERFACE(
return api_entry_context<tiledb::api::tiledb_array_load_all_enumerations>(
ctx, array);
}

CAPI_INTERFACE(
array_load_enumerations_all_schemas,
tiledb_ctx_t* ctx,
const tiledb_array_t* array) {
return api_entry_context<
tiledb::api::tiledb_array_load_enumerations_all_schemas>(ctx, array);
}
23 changes: 20 additions & 3 deletions tiledb/api/c_api/array/array_api_experimental.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ TILEDB_EXPORT capi_return_t tiledb_array_get_enumeration(
tiledb_enumeration_t** enumeration) TILEDB_NOEXCEPT;

/**
* Load all enumerations for the array.
* Load all enumerations for the array's latest array schema.
*
* **Example:**
*
Expand All @@ -100,13 +100,30 @@ TILEDB_EXPORT capi_return_t tiledb_array_get_enumeration(
*
* @param[in] ctx The TileDB context.
* @param[in] array The TileDB array.
* @param[in] latest_only If non-zero, only load enumerations for the latest
* schema.
* @return `TILEDB_OK` for success and `TILEDB_ERR` for error.
*/
TILEDB_EXPORT capi_return_t tiledb_array_load_all_enumerations(
tiledb_ctx_t* ctx, const tiledb_array_t* array) TILEDB_NOEXCEPT;

/**
* Load all enumerations for all schemas in the array.
*
* This method requires the array to be opened with the config option
* `rest.use_refactored_array_open=true` (default).
*
* **Example:**
*
* @code{.c}
* tiledb_array_load_enumerations_all_schemas(ctx, array);
* @endcode
*
* @param[in] ctx The TileDB context.
* @param[in] array The TileDB array.
* @return `TILEDB_OK` for success and `TILEDB_ERR` for error.
*/
TILEDB_EXPORT capi_return_t tiledb_array_load_enumerations_all_schemas(
tiledb_ctx_t* ctx, const tiledb_array_t* array) TILEDB_NOEXCEPT;

#ifdef __cplusplus
}
#endif
Expand Down
16 changes: 11 additions & 5 deletions tiledb/api/c_api/array/array_api_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,16 @@ struct tiledb_array_handle_t
return array_->get_enumeration(enumeration_name);
}

std::unordered_map<
std::string,
std::vector<shared_ptr<const tiledb::sm::Enumeration>>>
get_enumerations_all_schemas() {
return array_->get_enumerations_all_schemas();
}

std::vector<shared_ptr<const tiledb::sm::Enumeration>> get_enumerations(
const std::vector<std::string>& enumeration_names,
shared_ptr<tiledb::sm::ArraySchema> schema) {
return array_->get_enumerations(enumeration_names, schema);
const std::vector<std::string>& enumeration_names) {
return array_->get_enumerations(enumeration_names);
}

void get_metadata(
Expand Down Expand Up @@ -179,8 +185,8 @@ struct tiledb_array_handle_t
return array_->is_open();
}

void load_all_enumerations() const {
array_->load_all_enumerations();
void load_all_enumerations(bool all_schemas = false) const {
array_->load_all_enumerations(all_schemas);
}

tiledb::sm::NDRange& loaded_non_empty_domain() {
Expand Down
103 changes: 83 additions & 20 deletions tiledb/sm/array/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -805,33 +805,85 @@ void Array::encryption_type(

shared_ptr<const Enumeration> Array::get_enumeration(
const std::string& enumeration_name) {
return get_enumerations({enumeration_name})[0];
}

std::unordered_map<std::string, std::vector<shared_ptr<const Enumeration>>>
Array::get_enumerations_all_schemas() {
if (!is_open_) {
throw ArrayException("Unable to load enumerations; Array is not open.");
}

auto schema = opened_array_->array_schema_latest_ptr();
if (!schema->has_enumeration(enumeration_name)) {
throw ArrayException(
"Unable to get enumeration; Enumeration '" + enumeration_name +
"' does not exist.");
} else if (schema->is_enumeration_loaded(enumeration_name)) {
return schema->get_enumeration(enumeration_name);
std::unordered_map<std::string, std::vector<shared_ptr<const Enumeration>>>
ret;
if (remote_) {
auto rest_client = resources_.rest_client();
if (rest_client == nullptr) {
throw ArrayException(
"Error loading enumerations; Remote array with no REST client.");
}

// Pass an empty list of enumeration names. REST will use timestamps to
// load all enumerations on all schemas for the array within that range.
ret = rest_client->post_enumerations_from_rest(
array_uri_,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
this,
{},
memory_tracker_);

// Store the enumerations from the REST response.
for (const auto& schema_enmrs : ret) {
auto schema = array_schemas_all().at(schema_enmrs.first);
for (const auto& enmr : schema_enmrs.second) {
schema->store_enumeration(enmr);
}
}
} else {
for (const auto& schema : array_schemas_all()) {
std::unordered_set<std::string> enmrs_to_load;
auto enumeration_names = schema.second->get_enumeration_names();
// Dedupe requested names and filter out anything already loaded.
for (auto& enmr_name : enumeration_names) {
if (schema.second->is_enumeration_loaded(enmr_name)) {
continue;
}
enmrs_to_load.insert(enmr_name);
}

// Create a vector of paths to be loaded.
std::vector<std::string> paths_to_load;
for (auto& enmr_name : enmrs_to_load) {
auto path = schema.second->get_enumeration_path_name(enmr_name);
paths_to_load.push_back(path);
}

// Load the enumerations from storage
auto loaded = array_directory().load_enumerations_from_paths(
paths_to_load, *encryption_key(), memory_tracker_);

// Store the loaded enumerations in the schema.
for (auto& enmr : loaded) {
schema.second->store_enumeration(enmr);
}
ret[schema.first] = loaded;
}
}

return get_enumerations({enumeration_name}, schema)[0];
return ret;
}

std::vector<shared_ptr<const Enumeration>> Array::get_enumerations(
const std::vector<std::string>& enumeration_names,
shared_ptr<ArraySchema> schema) {
const std::vector<std::string>& enumeration_names) {
if (!is_open_) {
throw ArrayException("Unable to load enumerations; Array is not open.");
}

// Dedupe requested names and filter out anything already loaded.
std::unordered_set<std::string> enmrs_to_load;
for (auto& enmr_name : enumeration_names) {
if (schema->is_enumeration_loaded(enmr_name)) {
if (array_schema_latest().is_enumeration_loaded(enmr_name)) {
continue;
}
enmrs_to_load.insert(enmr_name);
Expand All @@ -856,16 +908,16 @@ std::vector<shared_ptr<const Enumeration>> Array::get_enumerations(

loaded = rest_client->post_enumerations_from_rest(
array_uri_,
schema->timestamp_range().first,
schema->timestamp_range().second,
array_dir_timestamp_start_,
array_dir_timestamp_end_,
this,
names_to_load,
memory_tracker_);
memory_tracker_)[array_schema_latest().name()];
} else {
// Create a vector of paths to be loaded.
std::vector<std::string> paths_to_load;
for (auto& enmr_name : enmrs_to_load) {
auto path = schema->get_enumeration_path_name(enmr_name);
auto path = array_schema_latest().get_enumeration_path_name(enmr_name);
paths_to_load.push_back(path);
}

Expand All @@ -876,25 +928,36 @@ std::vector<shared_ptr<const Enumeration>> Array::get_enumerations(

// Store the loaded enumerations in the schema
for (auto& enmr : loaded) {
schema->store_enumeration(enmr);
opened_array_->array_schema_latest_ptr()->store_enumeration(enmr);
}
}

// Return the requested list of enumerations
std::vector<shared_ptr<const Enumeration>> ret(enumeration_names.size());
for (size_t i = 0; i < enumeration_names.size(); i++) {
ret[i] = schema->get_enumeration(enumeration_names[i]);
ret[i] = array_schema_latest().get_enumeration(enumeration_names[i]);
}
return ret;
}

void Array::load_all_enumerations() {
void Array::load_all_enumerations(bool all_schemas) {
if (!is_open_) {
throw ArrayException("Unable to load all enumerations; Array is not open.");
}
// Load all enumerations, discarding the returned list of loaded enumerations.
for (const auto& schema : array_schemas_all()) {
get_enumerations(schema.second->get_enumeration_names(), schema.second);
if (all_schemas) {
// Unless we are using array open V3, Array::array_schemas_all_ will not be
// initialized. We throw an exception since this is required to store the
// loaded enumerations.
if (!use_refactored_array_open()) {
throw ArrayException(
"Unable to load enumerations for all array schemas; The array must "
"be opened using `rest.use_refactored_array_open=true`");
}

get_enumerations_all_schemas();
} else {
get_enumerations(array_schema_latest().get_enumeration_names());
}
}

Expand Down
Loading

0 comments on commit b1dd2b4

Please sign in to comment.