-
Notifications
You must be signed in to change notification settings - Fork 602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-8530] Integrate struct evolution machinery into catalog_schema_manager #24862
[CORE-8530] Integrate struct evolution machinery into catalog_schema_manager #24862
Conversation
c1bc5a8
to
0ef4ff0
Compare
0ef4ff0
to
55adc4d
Compare
/dt |
Retry command for Build#61083please wait until all jobs are finished before running the slash command
|
55adc4d
to
a2272eb
Compare
/dt |
CI test resultstest results on build#61091
test results on build#61182
test results on build#61281
test results on build#61320
test results on build#61405
|
/cdt |
3c99abb
to
60ba13a
Compare
d4bb0d8
to
20ddb11
Compare
/cdt |
20ddb11
to
5983410
Compare
force push moved "historical writes" commits into a separate PR #24955 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems fairly straightforward, I had a comment on the commit order throwing me off. I haven't gone over the last two commits yet, I promise I will do so tomorrow.
Ya. Feels a little "too straightforward", but maybe it just is.
Cool thanks. The last two commits in this PR are the meat of the change & new tests, but do note that I spun off part of the stack into a separate PR for handling incoming records from an "old" schema here: #24955 - that bit might be worthy of discussion w.r.t. product def'n. |
5983410
to
7d81009
Compare
force push reorder commits |
src/v/iceberg/schema.cc
Outdated
vassert( | ||
field->id != 0, | ||
"Nested field '{}' wasn't assigned an ID", | ||
field->name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like it should always be true, but is there a guarantee in code that this never fires? Could a buggy catalog cause a crash, for example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, i've gone back and forth on this. I think the assertion holds for structs processed through record translation, but...
could a buggy catalog cause a crash
good point, hadn't considered that. I think it's fair to say that a field w/ a zero ID here is strictly ill formed, but since that value could have come from disk probably a non-fatal error would be more appropriate.
std::ignore = iceberg::for_each_field( | ||
struct_type, [](iceberg::nested_field* f) { | ||
f->required = iceberg::field_required::no; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without loss of generality, we can internally treat all fields as nullable to
avoid complexity associated with maintaining requiredness invariants across
schema versions.
I'm not sure I follow this -- is this really just an internal detail? Doesn't it get propagated to users in their tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the intention here is really to work around how strict Avro is, I wonder if it's less surprising to either:
- not allow evolving Avro schemas
- require evolution of Avro schemas to be in the form of adding null-unions
I personally don't feel strongly that either is correct, but I just found this part of the PR surprising (sorry if we decided this a while ago and I'm just forgetting) and want to make sure this is the least surprising option for users
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this really just an internal detail
No not really, just poor phrasing. What I mean is "strictly within iceberg land". Trying to emphasize that the kafka record serde language imposes its own compatibility invariants, so on the Iceberg side we can, in principle, stay in the business of aggressively maintaining table compatibility, with the explicit assumption that record compat semantics are handled elsewhere.
if the intention here is really to work around how strict Avro is
Sort of. I think we're still working around an impedance mismatch between Avro or protobuf (or JSON as the case may be) compat vs Iceberg compat.
require evolution of Avro schemas to be in the form of adding null-unions
is a good example. For Avro serde purposes {"name": "foo", "type": ["null", "int"]}
w/ a null default is effectively an optional field called "foo". BUT in Iceberg schema we translate that into a required field "foo" of type struct<union_opt_1:int>
where union_opt_1
is non-required.
We could do something like make the null-union structs optional during type resolution, but it still feels like we're smashing abstractions together in an uncomfortable way.
not allow evolving Avro schemas
Right. My read is that we should either be maximally restrictive or maximally permissive to avoid reasoning about (and documenting) these special cases.
My intuitive preference is to start with the permissive version because it feels more "capable" in a product sense. My argument to "generality" is that we're translating records that are already subject to an SR schema (w/ associated compat rules), so the schema language's requiredness invariants have already been applied prior to translation. So even if the columns are "nullable" for compat purposes, the values stored there are still subject to the semantics of the corresponding record stream.
sorry if we decided this a while ago and I'm just forgetting
not at all. this came up during implementation and I think now is the time to settle it. might be easier to chat through face to face (and maybe pull in @mattschumpert and anyone else interested).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, other thing i wanted to mention is that the databricks connector does this (configurable) for the same(ish) reason, which is where i got the idea. So not without precedent 🤷♂️
Config is iceberg.tables.schema-force-optional
per https://github.com/databricks/iceberg-kafka-connect#configuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the details here!
but it still feels like we're smashing abstractions together in an uncomfortable way.
Yea generally I agree that mixing Iceberg and Avro/Kafka semantics feels pretty ick. At this point in the data path, it feels cleaner to think of the question, "what should my Iceberg table do?" without thinking about Kafka schema semantics at all.
My intuitive preference is to start with the permissive version because it feels more "capable" in a product sense.
Ah yea that's a fair point, though there's a case to be made that it may be less problematic to do the less surprising, more limited thing first, and then extend our capabilities later with some additional config.
so the schema language's requiredness invariants have already been applied prior to translation
Yea this is also fair. The remaining question though is whether requiredness of Kafka schema holds meaning to query engine users.
I guess another option to consider: what about only allowing adding required fields that have defaults? I seem to recall some work around serializing values moving in that direction anyway. Did it turn out to be not viable?
Let's set up some time to chat about this -- it'd be good to get product's thoughts around users' table expectations and default behaviors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, I'll try to set something up for tomorrow.
whether requiredness of Kafka schema holds meaning to query engine users
Yeah, it's an open question from my POV. I imagine there are distinct personas here - e.g.
- "analyst Iceberg end user for whom table schema is the source of truth"
- "infra engineer who may directly control both the source (topic) and sink (table)".
do the less surprising thing
100% agree that should be the goal, and I think we get to draw the map here (within reason) since the ecosystem is evolving so quickly. But yeah, mostly a product question at this point.
what about only allowing adding required fields that have defaults?
That should be the north star I think [1]. The tricky bit is how to specify defaults - do they come from the source schema? Or maybe we do what protobuf does (i.e. a string's default is always the empty string, default bool is false, etc)? I think we talked about this last week and decided to punt for GA, but the value serialization code should be fit for purpose.
[1] Also note that the spec requires default values for any non-optional field added to an existing schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe we do what protobuf does (i.e. a string's default is always the empty string, default bool is false, etc)?
Just FYI it's possible to override the default value in protobuf. We should be respecting that.
TBH, I'm a fan of making everything nullable (not required). It looks like the kafka connector has an option for this: iceberg.tables.schema-force-optional
, FWIW Snowflake's connector seems to always add new columns as null and not mess with defaults.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we reached as clear consensus as I would have hoped, but that's ok. My read is that the permissive approach should be sufficient (in a product sense) for initial GA, modulo any lingering concerns from this thread.
I expect we'll be asked to support a more strict adherence to record schema before long, with requiredness and defaults and whatnot, but I don't think we're painting ourselves into a corner w/ the proposed implementation. Happy to hear differently from anyone though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I think from @mattschumpert the thought was that this is a good default, which is the more important thing. We can also ad options to tune this behavior for customers later on.
// - the table's last_column_id increases monotonically | ||
// - carry-over fields in a compat-checked schema already have | ||
// IDs assigned | ||
// - those carry-over IDs are all strictly <= table.last |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not suggesting we change even if it isn't the case, but just curious if these are the same invariants Iceberg's Java impl has?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which part? (2) and (3) are implementation details viz. 2) we update incoming fields w/ existing IDs and 3) that the last column ID of the table is correct with respect to the columns therein
(1) is perhaps expressed here:
Which matches the new logic in table_update_applier.
7d81009
to
17eee07
Compare
force push CR comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Thanks, I have a few comments and questions, mostly around tests, but that's not necessarily blocking...
total: int = 0 | ||
|
||
|
||
class SchemaEvolutionE2ETests(RedpandaTest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lot here for these tests. The tests themselves are great, but I'd love to be able to describe them more concisely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would love to have tests that have a higher level description and are translated into the specifics. As an example if we had something like this:
COMPATIBLE_CHANGES = {
"add_column": EvolutionTest(
type = "avro",
initial_schema = {
"type": "record",
"name": "VerifierRecord",
"fields": [
{"name":"foo", "type": "string"},
{
"name": "bar",
"type": {
"type": "record",
"name": "bar",
"fields": [
{"name": "qux", "type": "int"},
{"name": "thud", "type": "bool"}
]
}
}
]},
next_schema = {
"type": "record",
"name": "VerifierRecord",
"fields": [
{"name":"foo", "type": "string"},
{
"name": "bar",
"type": {
"type": "record",
"name": "bar",
"fields": [
{"name": "qux", "type": "int"},
{"name": "thud", "type": "bool"}
]
}
},
{"name": "baz", "type": "string}
]},
initial_table = [
("foo", "string"),
("bar", "struct", (
"qux", "int",
"thud", bool",
)),
],
next_table = [
("foo", "string"),
("bar", "struct", (
"qux", "int",
"thud", bool",
)),
("baz", "string")
],
),
"add_nested": EvolutionTest(...),
...
}
INCOMPATIBLE_CHANGES = {
"illegal_promotion": EvolutionTest(...),
...,
}
Although even that is probably a lot... I wonder if we can do something like:
"add_column": EvolutionTest(
type = "avro",
initial_schema = {
"type": "record",
"name": "VerifierRecord",
"fields": [
{"name":"foo", "type": "string"},
{
"name": "bar",
"type": {
"type": "record",
"name": "bar",
"fields": [
{"name": "qux", "type": "int"},
{"name": "thud", "type": "bool"}
]
}
}
]},
next_schema = lambda og_schema: og_schema.fields.append({"name": "baz", "type": "string"}),
initial_table = [
("foo", "string"),
("bar", "struct", (
"qux", "int",
"thud", bool",
)),
],
next_table = lambda og_schema: og_schmea.append(("baz", "string")),
),
Anyways not blocking, but I think that would help from making this file sprawl with all the cases here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I rather like the idea of representing the schema as a python object then evolution as diffs. will be less text, surely.
as far as the test functions themselves, I didn't have much luck distilling the inputs to something more general than "some schemas and some fields", since tests have pretty distinct semantics.
there's a lot of repeated code though. could pull out another context manager for the shared stuff maybe. I'll take another run at it tonight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐍 🕳️
|
||
from confluent_kafka import avro | ||
from confluent_kafka.avro import AvroProducer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we planning on having protobuf support here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah should do. the code we're testing is downstream of schema translation, but there's no harm in more verification. these tests aren't very expensive to run.
may kick it to a follow-up to unblock #24955 though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally 👍
4eb456a
to
97b3e10
Compare
force push reworked ducktape tests, removed an old one, couple minor CR comments |
/dt |
97b3e10
to
b9ca4b4
Compare
For structs, prime the stack with all top-level fields instead of visiting one by one. This results in fewer allocations. Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Previously we would assign an ID to every field in the input schema starting from the minimum valid ID (1). Instead, we assign IDs _only_ to those fields which did not have one assigned during compat checks, and (instead of starting from 'id_t{1}') starting from some ID specified at the interface. In the case of applying a schema update, this should be one past the last_column_id recorded in table metadata. Signed-off-by: Oren Leiman <[email protected]>
Header only modules are not subject to dependency checks by bazel, so these were causing linker errors in a dependent test.
In effect, this allows us to support full field removal. We still enforce that meta.last increases monotonically. This invariant is important to avoid reusing column IDs, which would in turn violate specified schema evolution correctness guarantees. Signed-off-by: Oren Leiman <[email protected]>
nullability semantics differ significantly between schema definition languages. without loss of generality, we can internally treat all fields as nullable to avoid complexity associated with maintaining requiredness invariants across schema versions. Signed-off-by: Oren Leiman <[email protected]>
Introduce check_schema_compat, called on the ensure_schema path, to call into the compatibility layer. Returns an error, or an indication that the schema changed, or nothing (no schema change). Also update fill_field_ids to use the compat machinery to support filling field IDs in the presence of type changes, changes in field order, or records with missing fields. Errors iff not all destination fields could be filled. Signed-off-by: Oren Leiman <[email protected]>
test that: - table schema updates work - valid evolved schemas are backwards compatible w/ historical rows - adding columns never results in reading pre-existing values from another column - removing a column and adding it back in a subsequent update creates a NEW column that does not collide with the old 'version' - attempting to read from a dropped column fails gracefully - changing the order of columns doesn't change the values associated with the a column or field name Signed-off-by: Oren Leiman <[email protected]>
b9ca4b4
to
605246c
Compare
force push rebase dev to fix conflict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
from confluent_kafka import avro | ||
from confluent_kafka.avro import AvroProducer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally 👍
], | ||
), | ||
), | ||
"drop_column": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think this is too aggressive of behavior, I rarely see people dropping columns and old data in analytical datasets, but that's not really a discussion for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, do you mean
- People won't use this - they will never (or rarely) drop fields from their record schemas
- People will use this - they will drop fields from record schemas, but they will expect (or want) different behavior with respect to table schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think #2 and ideally long term we gracefully having the user manually drop the column
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, yeah. Variations on the theme of strict vs flexible interpretation of record schema.
I don't have strong intuition one way or the other. Are we collecting open questions like this somewhere? If not I can start a fresh doc
next_schema: AvroSchema | ||
|
||
|
||
LEGAL_TEST_CASES = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is so great thank you I think its' way easier to grok when everything is colocated together ❤️
table = [(t[0], t[1]) for t in table[1:]] | ||
|
||
assert table == expected.table(query_engine), \ | ||
str(table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a little more color here about what was expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Protobuf diff is close behind, i'll be sure to improve these assert messages in there.
assert all(r[1] is None for r in select_out[:count * 2]) | ||
assert all(r[1] is not None for r in select_out[count * 2:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add assertion messages
This PR enables full-featured struct evolution into the iceberg record translation path.
Backports Required
Release Notes
Improvements