Skip to content

Commit

Permalink
Fix array schema when time traveling
Browse files Browse the repository at this point in the history
We were using the latest array schema even while traveling. This changes
the latest array schema selection when in ArrayDirectoryMode::READ to
use the newest array schema that is less than or equal to the current
end timestamp. If no array schemas are newer than the timestamp end
value, the earliest schema is used.
  • Loading branch information
davisp committed Nov 30, 2023
1 parent df88a4b commit 127493f
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 2 deletions.
1 change: 1 addition & 0 deletions test/regression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ if (TILEDB_CPP_API)
list(APPEND SOURCES targets/sc-25116.cc)
list(APPEND SOURCES targets/sc-29682.cc)
list(APPEND SOURCES targets/sc-33480.cc)
list(APPEND SOURCES targets/sc-35424.cc)
endif()

add_executable(tiledb_regression
Expand Down
165 changes: 165 additions & 0 deletions test/regression/targets/sc-35424.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#include <chrono>
#include <climits>
#include <thread>

#include <tiledb/tiledb>
#include <tiledb/tiledb_experimental>

#include <test/support/tdb_catch.h>

using namespace tiledb;

static void create_array(const std::string& array_uri);
static void write_first_fragment(const std::string& array_uri);
static uint64_t time_travel_destination();
static void add_attr_b(const std::string& array_uri);
static void write_second_fragment(const std::string& array_uri);

static void read_without_time_travel(const std::string& array_uri);
static void read_with_time_travel(const std::string& array_uri, uint64_t when);

TEST_CASE(
"Use the correct schema when time traveling",
"[time-traveling][array-schema][bug][sc35424]") {
std::string array_uri = "test_time_traveling_schema";

// Test setup
create_array(array_uri);
write_first_fragment(array_uri);
auto timepoint = time_travel_destination();
add_attr_b(array_uri);
write_second_fragment(array_uri);

// Check reads with and without time travel.
read_without_time_travel(array_uri);
read_with_time_travel(array_uri, timepoint);
}

void create_array(const std::string& array_uri) {
Context ctx;

auto obj = Object::object(ctx, array_uri);
if (obj.type() != Object::Type::Invalid) {
Object::remove(ctx, array_uri);
}

auto dim = Dimension::create<int32_t>(ctx, "d", {{0, 1024}});

Domain dom(ctx);
dom.add_dimension(dim);

auto attr = Attribute::create<int32_t>(ctx, "a");

ArraySchema schema(ctx, TILEDB_SPARSE);
schema.set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}})
.set_domain(dom)
.add_attribute(attr);

Array::create(array_uri, schema);
}

void write_first_fragment(const std::string& array_uri) {
std::vector<int32_t> d_data = {0, 1, 2, 3, 4};
std::vector<int32_t> a_data = {5, 6, 7, 8, 9};

Context ctx;
Array array(ctx, array_uri, TILEDB_WRITE);
Query query(ctx, array, TILEDB_WRITE);
query.set_layout(TILEDB_UNORDERED)
.set_data_buffer("d", d_data)
.set_data_buffer("a", a_data);
REQUIRE(query.submit() == Query::Status::COMPLETE);
array.close();
}

uint64_t time_travel_destination() {
// We sleep for 5ms to ensure that our fragments are separated in time
// and allowing us to grab a time guaranteed to be between them.
auto delay = std::chrono::milliseconds(5);
std::this_thread::sleep_for(delay);

auto timepoint = tiledb_timestamp_now_ms();

std::this_thread::sleep_for(delay);

return timepoint;
}

void add_attr_b(const std::string& array_uri) {
Context ctx;
auto attr = Attribute::create<int32_t>(ctx, "b");

ArraySchemaEvolution ase(ctx);
ase.add_attribute(attr);
ase.array_evolve(array_uri);
}

void write_second_fragment(const std::string& array_uri) {
std::vector<int32_t> d_data = {5, 6, 7, 8, 9};
std::vector<int32_t> a_data = {10, 11, 12, 13, 14};
std::vector<int32_t> b_data = {15, 16, 17, 18, 19};

Context ctx;
Array array(ctx, array_uri, TILEDB_WRITE);
Query query(ctx, array, TILEDB_WRITE);
query.set_layout(TILEDB_UNORDERED)
.set_data_buffer("d", d_data)
.set_data_buffer("a", a_data)
.set_data_buffer("b", b_data);
REQUIRE(query.submit() == Query::Status::COMPLETE);
array.close();
}

void read_without_time_travel(const std::string& array_uri) {
std::vector<int32_t> d_data(10);
std::vector<int32_t> a_data(10);
std::vector<int32_t> b_data(10);

Context ctx;
Array array(ctx, array_uri, TILEDB_READ);
Query query(ctx, array, TILEDB_READ);
query.set_data_buffer("d", d_data)
.set_data_buffer("a", a_data)
.set_data_buffer("b", b_data);

REQUIRE(query.submit() == Query::Status::COMPLETE);

for (int32_t i = 0; i < 10; i++) {
REQUIRE(d_data[i] == i);
REQUIRE(a_data[i] == i + 5);

if (i < 5) {
REQUIRE(b_data[i] == INT32_MIN);
} else {
REQUIRE(b_data[i] == i + 10);
}
}
}

void read_with_time_travel(const std::string& array_uri, uint64_t when) {
std::vector<int32_t> d_data(10, INT_MAX);
std::vector<int32_t> a_data(10, INT_MAX);
std::vector<int32_t> b_data(10, INT_MAX);

Context ctx;
Array array(ctx, array_uri, TILEDB_READ, TemporalPolicy(TimeTravel, when));
Query query(ctx, array, TILEDB_READ);
query.set_data_buffer("d", d_data).set_data_buffer("a", a_data);

auto matcher = Catch::Matchers::ContainsSubstring("There is no field b");
REQUIRE_THROWS_WITH(query.set_data_buffer("b", b_data), matcher);

REQUIRE(query.submit() == Query::Status::COMPLETE);

for (int32_t i = 0; i < 10; i++) {
if (i < 5) {
REQUIRE(d_data[i] == i);
REQUIRE(a_data[i] == i + 5);
REQUIRE(b_data[i] == INT_MAX);
} else {
REQUIRE(d_data[i] == INT_MAX);
REQUIRE(a_data[i] == INT_MAX);
REQUIRE(b_data[i] == INT_MAX);
}
}
}
4 changes: 4 additions & 0 deletions test/src/unit-capi-array_schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2149,6 +2149,8 @@ TEST_CASE_METHOD(
tiledb_array_t* array;
rc = tiledb_array_alloc(ctx_, array_name.c_str(), &array);
REQUIRE(rc == TILEDB_OK);
rc = tiledb_array_set_open_timestamp_end(ctx_, array, now + 1);
REQUIRE(rc == TILEDB_OK);
rc = tiledb_array_open(ctx_, array, TILEDB_READ);
REQUIRE(rc == TILEDB_OK);
tiledb_array_schema_t* read_schema;
Expand Down Expand Up @@ -2292,6 +2294,8 @@ TEST_CASE_METHOD(
tiledb_array_t* array;
rc = tiledb_array_alloc(ctx_, array_name.c_str(), &array);
REQUIRE(rc == TILEDB_OK);
rc = tiledb_array_set_open_timestamp_end(ctx_, array, now + 1);
REQUIRE(rc == TILEDB_OK);
rc = tiledb_array_open(ctx_, array, TILEDB_READ);
REQUIRE(rc == TILEDB_OK);
tiledb_array_schema_t* read_schema;
Expand Down
41 changes: 39 additions & 2 deletions tiledb/sm/array/array_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,7 @@ Status ArrayDirectory::load() {
"Cannot open array; Array does not exist."));
}

// Set the latest array schema URI
latest_array_schema_uri_ = array_schema_uris_.back();
latest_array_schema_uri_ = select_latest_array_schema_uri();
assert(!latest_array_schema_uri_.is_invalid());
}

Expand Down Expand Up @@ -1214,6 +1213,44 @@ Status ArrayDirectory::compute_array_schema_uris(
return Status::Ok();
}

URI ArrayDirectory::select_latest_array_schema_uri() {
// Set the latest array schema URI. When in READ mode, the latest array
// schema URI is the schema with the largest timestamp less than or equal
// to the current timestamp_end_. If no schema meets this definition, we
// use the first schema available.
//
// The reason for choosing the oldest array schema URI even when time
// traveling before it existed is to first, not break any arrays that have
// fragments written before the first schema existed. The second reason is
// to not break old arrays that only have the old `__array_schema.tdb`
// URI which does not have timestamps.
if (mode_ != ArrayDirectoryMode::READ) {
return array_schema_uris_.back();
}

optional<URI> latest_uri = nullopt;
uint64_t latest_ts = 0;

for (auto& uri : array_schema_uris_) {
auto name = uri.remove_trailing_slash().last_path_part();

// Skip the old schema URI name since it doesn't have timestamps
if (name == constants::array_schema_filename) {
continue;
}

std::pair<uint64_t, uint64_t> ts_range;
throw_if_not_ok(utils::parse::get_timestamp_range(uri, &ts_range));

if (ts_range.second > latest_ts && ts_range.second <= timestamp_end_) {
latest_uri = uri;
latest_ts = ts_range.second;
}
}

return latest_uri.value_or(array_schema_uris_.front());
}

bool ArrayDirectory::is_vacuum_file(const URI& uri) const {
if (utils::parse::ends_with(uri.to_string(), constants::vacuum_file_suffix))
return true;
Expand Down
7 changes: 7 additions & 0 deletions tiledb/sm/array/array_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,13 @@ class ArrayDirectory {
Status compute_array_schema_uris(
const std::vector<URI>& array_schema_dir_uris);

/**
* Select the URI to use for the latest array schema.
*
* @return URI The latest array schema URI to use.
*/
URI select_latest_array_schema_uri();

/**
* Checks if a fragment overlaps with the array directory timestamp
* range. Overlap is partial or full depending on the consolidation
Expand Down

0 comments on commit 127493f

Please sign in to comment.