Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-8964 Add protobuf support to existing ducktape suite for schema evolution. #24998

Open
wants to merge 8 commits into
base: dev
Choose a base branch
from
39 changes: 27 additions & 12 deletions src/v/datalake/catalog_schema_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,13 @@ fill_field_ids(iceberg::struct_type& dest, const iceberg::struct_type& source) {
// the schemas are identical. An error indicates that the schemas differ, in
// which case the errc indicates whether they are compatible. source can be
// correctly evolved to dest.
// NOTE: pass the source struct by value to create a clear barrier between the
// traversal and whatever (probably cached) metadata the source struct was
// pulled from.
// NOTE: Post processing is required to assign IDs to any destination fields not
// found in source (i.e. new fields).
checked<std::nullopt_t, fill_errc> check_schema_compat(
iceberg::struct_type& dest, const iceberg::struct_type& source) {
checked<std::nullopt_t, fill_errc>
check_schema_compat(iceberg::struct_type& dest, iceberg::struct_type source) {
using namespace iceberg;
if (auto evo_res = evolve_schema(source, dest); evo_res.has_error()) {
vlog(
Expand Down Expand Up @@ -148,7 +151,8 @@ simple_schema_manager::ensure_table_schema(

ss::future<checked<schema_manager::table_info, schema_manager::errc>>
simple_schema_manager::get_table_info(
const iceberg::table_identifier& table_id) {
const iceberg::table_identifier& table_id,
std::optional<std::reference_wrapper<iceberg::struct_type>>) {
auto it = table_info_by_id.find(table_id);
if (it == table_info_by_id.end()) {
co_return errc::failed;
Expand Down Expand Up @@ -181,7 +185,7 @@ catalog_schema_manager::ensure_table_schema(
co_return get_res.error();
}
if (get_res.value()) {
// Success! Schema already matches what we need.
// Success! The desired schema matches one already in the table.
co_return std::nullopt;
}

Expand Down Expand Up @@ -220,7 +224,8 @@ catalog_schema_manager::ensure_table_schema(

ss::future<checked<schema_manager::table_info, schema_manager::errc>>
catalog_schema_manager::get_table_info(
const iceberg::table_identifier& table_id) {
const iceberg::table_identifier& table_id,
std::optional<std::reference_wrapper<iceberg::struct_type>> desired_type) {
auto load_res = co_await catalog_.load_table(table_id);
if (load_res.has_error()) {
co_return log_and_convert_catalog_err(
Expand All @@ -230,7 +235,10 @@ catalog_schema_manager::get_table_info(
}
const auto& table = load_res.value();

auto cur_schema = table.get_schema(table.current_schema_id);
const auto* cur_schema = desired_type.has_value()
? table.get_equivalent_schema(
desired_type->get())
: table.get_schema(table.current_schema_id);
if (!cur_schema) {
vlog(
datalake_log.error,
Expand Down Expand Up @@ -263,20 +271,27 @@ catalog_schema_manager::get_ids_from_table_meta(
const iceberg::table_identifier& table_id,
const iceberg::table_metadata& table_meta,
iceberg::struct_type& dest_type) {
auto schema_iter = std::ranges::find(
table_meta.schemas,
table_meta.current_schema_id,
&iceberg::schema::schema_id);
if (schema_iter == table_meta.schemas.end()) {
const auto* schema = table_meta.get_equivalent_schema(dest_type);
if (schema != nullptr) {
vlog(
datalake_log.debug,
"Found exact match schema: ID {}",
schema->schema_id);
return true;
}

schema = table_meta.get_schema(table_meta.current_schema_id);
if (schema == nullptr) {
vlog(
datalake_log.error,
"Cannot find current schema {} in table {}",
table_meta.current_schema_id,
table_id);
return errc::failed;
}

auto compat_res = check_schema_compat(
dest_type, schema_iter->schema_struct);
dest_type, schema->schema_struct.copy());
if (compat_res.has_error()) {
switch (compat_res.error()) {
case fill_errc::invalid_schema:
Expand Down
19 changes: 13 additions & 6 deletions src/v/datalake/catalog_schema_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ class schema_manager {
bool fill_registered_ids(iceberg::struct_type&);
};

virtual ss::future<checked<table_info, errc>>
get_table_info(const iceberg::table_identifier&) = 0;
virtual ss::future<checked<table_info, errc>> get_table_info(
const iceberg::table_identifier&,
std::optional<std::reference_wrapper<iceberg::struct_type>> desired_type
= std::nullopt)
= 0;

virtual ~schema_manager() = default;
};
Expand All @@ -65,8 +68,10 @@ class simple_schema_manager : public schema_manager {
const iceberg::struct_type& desired_type,
const iceberg::unresolved_partition_spec&) override;

ss::future<checked<table_info, schema_manager::errc>>
get_table_info(const iceberg::table_identifier&) override;
ss::future<checked<table_info, schema_manager::errc>> get_table_info(
const iceberg::table_identifier&,
std::optional<std::reference_wrapper<iceberg::struct_type>> desired_type
= std::nullopt) override;

private:
iceberg::uri table_location_prefix_;
Expand All @@ -92,8 +97,10 @@ class catalog_schema_manager : public schema_manager {
const iceberg::unresolved_partition_spec&) override;

// Loads the table metadata for the given topic.
ss::future<checked<table_info, schema_manager::errc>>
get_table_info(const iceberg::table_identifier&) override;
ss::future<checked<table_info, schema_manager::errc>> get_table_info(
const iceberg::table_identifier&,
std::optional<std::reference_wrapper<iceberg::struct_type>> desired_type
= std::nullopt) override;

private:
// Attempts to fill the field ids in the given type with those from the
Expand Down
3 changes: 2 additions & 1 deletion src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ record_multiplexer::operator()(model::record_batch batch) {
}

auto table_id = table_id_provider::table_id(_ntp.tp.topic);
auto load_res = co_await _schema_mgr.get_table_info(table_id);
auto load_res = co_await _schema_mgr.get_table_info(
table_id, record_type.type);
if (load_res.has_error()) {
auto e = load_res.error();
switch (e) {
Expand Down
45 changes: 45 additions & 0 deletions src/v/datalake/tests/catalog_schema_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,48 @@ TEST_F(CatalogSchemaManagerTest, CustomPartitionSpec) {
};
ASSERT_EQ(*pspec, expected);
}

TEST_F(CatalogSchemaManagerTest, GetTableInfo) {
struct_type first{};
first.fields.emplace_back(
nested_field::create(1, "foo", field_required::no, int_type{}));

auto second = first.copy();
second.fields.emplace_back(
nested_field::create(2, "bar", field_required::no, string_type{}));

// set schema to 'first'
auto ensure_res
= schema_mgr.ensure_table_schema(table_ident, first, empty_pspec).get();
ASSERT_FALSE(ensure_res.has_error());

// get_table_info returns current schema by default
auto info_res = schema_mgr.get_table_info(table_ident).get();
ASSERT_FALSE(info_res.has_error());
ASSERT_EQ(info_res.value().schema.schema_struct, first);

// we can also retrieve 'first' by equivalence match
info_res = schema_mgr.get_table_info(table_ident, first).get();
ASSERT_FALSE(info_res.has_error());
ASSERT_EQ(info_res.value().schema.schema_struct, first);

// set schema to 'second'
ensure_res
= schema_mgr.ensure_table_schema(table_ident, second, empty_pspec).get();
ASSERT_FALSE(ensure_res.has_error());

// 'second' is current, so get_table_info returns this by default
info_res = schema_mgr.get_table_info(table_ident).get();
ASSERT_FALSE(info_res.has_error());
ASSERT_EQ(info_res.value().schema.schema_struct, second);

// or by equivalence match, as before
info_res = schema_mgr.get_table_info(table_ident, second).get();
ASSERT_FALSE(info_res.has_error());
ASSERT_EQ(info_res.value().schema.schema_struct, second);

// but now we can retrieve 'first' as well
info_res = schema_mgr.get_table_info(table_ident, first).get();
ASSERT_FALSE(info_res.has_error());
ASSERT_EQ(info_res.value().schema.schema_struct, first);
}
10 changes: 9 additions & 1 deletion src/v/iceberg/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ redpanda_cc_library(

redpanda_cc_library(
name = "compatibility_utils",
srcs = [],
srcs = [
"compatibility_utils.cc",
],
hdrs = [
"compatibility_utils.h",
],
Expand Down Expand Up @@ -791,17 +793,23 @@ redpanda_cc_library(

redpanda_cc_library(
name = "table_metadata",
srcs = [
"table_metadata.cc",
],
hdrs = [
"table_metadata.h",
],
include_prefix = "iceberg",
visibility = ["//visibility:public"],
deps = [
":compatibility_utils",
":datatypes",
":manifest",
":manifest_entry",
":partition",
":schema",
":snapshot",
":transform",
":uri",
"//src/v/container:fragmented_vector",
"//src/v/model",
Expand Down
2 changes: 2 additions & 0 deletions src/v/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ v_cc_library(
catalog.cc
compatibility.cc
compatibility_types.cc
compatibility_utils.cc
datatypes.cc
datatypes_json.cc
filesystem_catalog.cc
Expand Down Expand Up @@ -70,6 +71,7 @@ v_cc_library(
snapshot_json.cc
struct_accessor.cc
table_io.cc
table_metadata.cc
table_metadata_json.cc
table_requests_json.cc
table_update_applier.cc
Expand Down
62 changes: 62 additions & 0 deletions src/v/iceberg/compatibility_utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "iceberg/compatibility_utils.h"

namespace iceberg {
bool schemas_equivalent(const struct_type& source, const struct_type& dest) {
chunked_vector<const nested_field*> source_stk;
source_stk.reserve(source.fields.size());
for (const auto& f : std::ranges::reverse_view(source.fields)) {
source_stk.push_back(f.get());
}
chunked_vector<const nested_field*> dest_stk;
dest_stk.reserve(dest.fields.size());
for (const auto& f : std::ranges::reverse_view(dest.fields)) {
dest_stk.push_back(f.get());
}

while (!source_stk.empty() && !dest_stk.empty()) {
const auto* sf = source_stk.back();
source_stk.pop_back();
const auto* df = dest_stk.back();
dest_stk.pop_back();

if (sf == nullptr || df == nullptr) {
if (sf != df) {
return false;
}
continue;
}

static constexpr auto is_primitive = [](const field_type& ft) -> bool {
return std::holds_alternative<primitive_type>(ft);
};

static constexpr auto get_index = [](const field_type& ft) -> size_t {
if (is_primitive(ft)) {
return std::get<primitive_type>(ft).index();
}
return ft.index();
};

if (
sf->name != df->name || sf->required != df->required
|| is_primitive(sf->type) != is_primitive(df->type)
|| get_index(sf->type) != get_index(df->type)) {
return false;
}
std::visit(
reverse_const_field_collecting_visitor{source_stk}, sf->type);
std::visit(reverse_const_field_collecting_visitor{dest_stk}, df->type);
}
return source_stk.empty() && dest_stk.empty();
}
} // namespace iceberg
7 changes: 7 additions & 0 deletions src/v/iceberg/compatibility_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,11 @@ schema_errc_result for_each_field(
s, fn, [](const nested_field*) { return true; });
}

/**
* schemas_equivalent - returns whether the two input structs are identical
* in structure and type *with the exception of field IDs*. The purpose is
* to check equivalence before assigning IDs to the fields in dest.
*/
bool schemas_equivalent(const struct_type& source, const struct_type& dest);

} // namespace iceberg
26 changes: 26 additions & 0 deletions src/v/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "iceberg/table_metadata.h"

#include "iceberg/compatibility_utils.h"

namespace iceberg {
const schema*
table_metadata::get_equivalent_schema(const struct_type& type) const {
auto schemas_reversed = std::ranges::reverse_view(schemas);
auto it = std::ranges::find_if(
schemas_reversed,
[&type](const iceberg::struct_type& source) {
return iceberg::schemas_equivalent(source, type);
},
&iceberg::schema::schema_struct);
return it != schemas_reversed.end() ? &*it : nullptr;
}
} // namespace iceberg
5 changes: 5 additions & 0 deletions src/v/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ struct table_metadata {
return it != schemas.end() ? &*it : nullptr;
}

// Search for a schema that matches the provided type. Start from the end of
// the list to short circuit on the common case (desired type is current,
// latest schema)
const schema* get_equivalent_schema(const struct_type& type) const;

// TODO: consider making this a lazy data member if it gets used by many
// callers for the same metadata.
chunked_hash_map<snapshot_id, snapshot> get_snapshots_by_id() const {
Expand Down
1 change: 1 addition & 0 deletions src/v/iceberg/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ redpanda_cc_gtest(
],
deps = [
":test_table_metadata",
"//src/v/iceberg:compatibility_utils",
"//src/v/iceberg:json_writer",
"//src/v/iceberg:table_metadata",
"//src/v/iceberg:table_metadata_json",
Expand Down
11 changes: 11 additions & 0 deletions src/v/iceberg/tests/compatibility_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1123,3 +1123,14 @@ TEST_P(StructEvoCompatibilityTest, CanEvolveStructsAndDetectErrors) {
ASSERT_TRUE(validator(original_schema_struct, type)) << fmt::format(
"Original: {}\nEvolved: {}", original_schema_struct, evolve_res.value());
}

TEST_P(StructEvoCompatibilityTest, CanCheckEquivalence) {
auto original = generator();

EXPECT_TRUE(schemas_equivalent(original, original));

auto next = update(original);

EXPECT_FALSE(schemas_equivalent(original, next));
EXPECT_FALSE(schemas_equivalent(next, original));
}
Loading