Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-8530] Integrate struct evolution machinery into catalog_schema_manager #24862

Merged

Conversation

oleiman
Copy link
Member

@oleiman oleiman commented Jan 17, 2025

This PR enables full-featured struct evolution into the iceberg record translation path.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.3.x
  • v24.2.x
  • v24.1.x

Release Notes

Improvements

  • Enables full struct schema evolution for Iceberg topics

@oleiman oleiman self-assigned this Jan 17, 2025
@oleiman oleiman force-pushed the dlib/core-8530/schema-manager-integration branch 4 times, most recently from c1bc5a8 to 0ef4ff0 Compare January 22, 2025 23:07
@oleiman oleiman force-pushed the dlib/core-8530/schema-manager-integration branch from 0ef4ff0 to 55adc4d Compare January 23, 2025 03:18
@oleiman
Copy link
Member Author

oleiman commented Jan 23, 2025

/dt

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Jan 23, 2025

Retry command for Build#61083

please wait until all jobs are finished before running the slash command



/ci-repeat 1
tests/rptest/tests/datalake/schema_evolution_test.py::SchemaEvolutionSmokeTest.test_evolving_avro_schemas@{"cloud_storage_type":1,"query_engine":"trino"}
tests/rptest/tests/datalake/schema_evolution_test.py::SchemaEvolutionSmokeTest.test_evolving_avro_schemas@{"cloud_storage_type":1,"query_engine":"spark"}

@oleiman oleiman force-pushed the dlib/core-8530/schema-manager-integration branch from 55adc4d to a2272eb Compare January 23, 2025 06:40
@oleiman
Copy link
Member Author

oleiman commented Jan 23, 2025

/dt

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Jan 23, 2025

CI test results

test results on build#61091
test_id test_kind job_url test_status passed
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/61091#0194922f-073a-4947-b34b-3353530985e5 FLAKY 1/2
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/61091#01949241-b80a-4f89-a11f-3a319048aef9 FLAKY 1/2
test results on build#61182
test_id test_kind job_url test_status passed
rptest.tests.availability_test.AvailabilityTests.test_availability_when_one_node_failed ducktape https://buildkite.com/redpanda/redpanda/builds/61182#01949a86-78ad-47df-9823-f07155ce8822 FLAKY 1/2
rptest.tests.datalake.partition_movement_test.PartitionMovementTest.test_cross_core_movements.cloud_storage_type=CloudStorageType.S3 ducktape https://buildkite.com/redpanda/redpanda/builds/61182#01949a86-78ae-4205-b067-94d41afa8aec FLAKY 1/2
test results on build#61281
test_id test_kind job_url test_status passed
rptest.tests.compaction_recovery_test.CompactionRecoveryTest.test_index_recovery ducktape https://buildkite.com/redpanda/redpanda/builds/61281#0194abcc-3f7d-473e-9a6a-29388045798e FLAKY 1/3
rptest.tests.data_migrations_api_test.DataMigrationsApiTest.test_creating_and_listing_migrations ducktape https://buildkite.com/redpanda/redpanda/builds/61281#0194abcc-3f7c-46b5-a249-3bdbe10ff19a FLAKY 1/2
rptest.tests.partition_force_reconfiguration_test.PartitionForceReconfigurationTest.test_basic_reconfiguration.acks=-1.restart=False.controller_snapshots=False ducktape https://buildkite.com/redpanda/redpanda/builds/61281#0194abc6-d457-4756-bb37-b6e4f440901b FLAKY 1/2
rptest.tests.partition_movement_test.SIPartitionMovementTest.test_cross_shard.num_to_upgrade=0.cloud_storage_type=CloudStorageType.ABS ducktape https://buildkite.com/redpanda/redpanda/builds/61281#0194abcc-3f7e-46b4-80c7-0367aaa74002 FLAKY 1/2
test results on build#61320
test_id test_kind job_url test_status passed
rptest.tests.compaction_recovery_test.CompactionRecoveryTest.test_index_recovery ducktape https://buildkite.com/redpanda/redpanda/builds/61320#0194afea-9668-4506-838b-04afc600755d FLAKY 1/2
rptest.tests.compaction_recovery_test.CompactionRecoveryUpgradeTest.test_index_recovery_after_upgrade ducktape https://buildkite.com/redpanda/redpanda/builds/61320#0194b005-9b89-4463-ad7e-3f6f78c4604c FLAKY 1/2
rptest.tests.data_migrations_api_test.DataMigrationsApiTest.test_higher_level_migration_api ducktape https://buildkite.com/redpanda/redpanda/builds/61320#0194afea-9665-4f04-bfb0-1905a9a5f6b2 FLAKY 1/2
rptest.tests.e2e_shadow_indexing_test.ShadowIndexingWhileBusyTest.test_create_or_delete_topics_while_busy.short_retention=True.cloud_storage_type=CloudStorageType.ABS ducktape https://buildkite.com/redpanda/redpanda/builds/61320#0194b005-9b89-4463-ad7e-3f6f78c4604c FLAKY 1/2
test results on build#61405
test_id test_kind job_url test_status passed
rptest.tests.datalake.compaction_test.CompactionGapsTest.test_translation_no_gaps.cloud_storage_type=CloudStorageType.S3 ducktape https://buildkite.com/redpanda/redpanda/builds/61405#0194b8c9-f043-4cdf-b85b-df86ce66afde FLAKY 1/2
rptest.tests.datalake.custom_partitioning_test.DatalakeCustomPartitioningTest.test_basic.cloud_storage_type=CloudStorageType.S3.filesystem_catalog_mode=False ducktape https://buildkite.com/redpanda/redpanda/builds/61405#0194b8c5-0994-4915-a786-bb9b1643288a FLAKY 1/2
rptest.tests.datalake.datalake_dlq_test.DatalakeDLQTest.test_invalid_record_action_runtime_change.cloud_storage_type=CloudStorageType.S3.query_engine=QueryEngineType.SPARK.use_topic_property=True ducktape https://buildkite.com/redpanda/redpanda/builds/61405#0194b8c9-f044-4800-a9ac-ec7da0834308 FLAKY 1/2
rptest.tests.scaling_up_test.ScalingUpTest.test_scaling_up_with_recovered_topic ducktape https://buildkite.com/redpanda/redpanda/builds/61405#0194b8c9-f044-4315-903f-869b22e9384f FLAKY 1/3

@oleiman
Copy link
Member Author

oleiman commented Jan 23, 2025

/cdt
tests/rptest/tests/datalake/schema_evolution_test.py

@oleiman oleiman force-pushed the dlib/core-8530/schema-manager-integration branch 2 times, most recently from 3c99abb to 60ba13a Compare January 24, 2025 05:41
@oleiman oleiman marked this pull request as ready for review January 24, 2025 05:52
@oleiman oleiman changed the title DNM: Dlib/core 8530/schema manager integration [CORE-8530] Integrate struct evolution machinery into catalog_schema_manager Jan 24, 2025
@oleiman oleiman force-pushed the dlib/core-8530/schema-manager-integration branch 2 times, most recently from d4bb0d8 to 20ddb11 Compare January 24, 2025 21:14
@oleiman
Copy link
Member Author

oleiman commented Jan 25, 2025

/cdt
tests/rptest/tests/datalake/schema_evolution_test.py
dt-repeat=5

@oleiman oleiman force-pushed the dlib/core-8530/schema-manager-integration branch from 20ddb11 to 5983410 Compare January 28, 2025 03:22
@oleiman
Copy link
Member Author

oleiman commented Jan 28, 2025

force push moved "historical writes" commits into a separate PR #24955

Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fairly straightforward, I had a comment on the commit order throwing me off. I haven't gone over the last two commits yet, I promise I will do so tomorrow.

src/v/iceberg/datatypes_json.cc Outdated Show resolved Hide resolved
src/v/datalake/record_translator.cc Outdated Show resolved Hide resolved
@oleiman
Copy link
Member Author

oleiman commented Jan 28, 2025

Seems fairly straightforward,

Ya. Feels a little "too straightforward", but maybe it just is.

I haven't gone over the last two commits yet, I promise I will do so tomorrow.

Cool thanks. The last two commits in this PR are the meat of the change & new tests, but do note that I spun off part of the stack into a separate PR for handling incoming records from an "old" schema here: #24955 - that bit might be worthy of discussion w.r.t. product def'n.

@oleiman oleiman force-pushed the dlib/core-8530/schema-manager-integration branch from 5983410 to 7d81009 Compare January 28, 2025 06:03
@oleiman
Copy link
Member Author

oleiman commented Jan 28, 2025

force push reorder commits

Comment on lines 112 to 115
vassert(
field->id != 0,
"Nested field '{}' wasn't assigned an ID",
field->name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it should always be true, but is there a guarantee in code that this never fires? Could a buggy catalog cause a crash, for example?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i've gone back and forth on this. I think the assertion holds for structs processed through record translation, but...

could a buggy catalog cause a crash

good point, hadn't considered that. I think it's fair to say that a field w/ a zero ID here is strictly ill formed, but since that value could have come from disk probably a non-fatal error would be more appropriate.

src/v/iceberg/table_update_applier.cc Outdated Show resolved Hide resolved
Comment on lines +193 to +201
std::ignore = iceberg::for_each_field(
struct_type, [](iceberg::nested_field* f) {
f->required = iceberg::field_required::no;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without loss of generality, we can internally treat all fields as nullable to
avoid complexity associated with maintaining requiredness invariants across
schema versions.

I'm not sure I follow this -- is this really just an internal detail? Doesn't it get propagated to users in their tables?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the intention here is really to work around how strict Avro is, I wonder if it's less surprising to either:

  • not allow evolving Avro schemas
  • require evolution of Avro schemas to be in the form of adding null-unions

I personally don't feel strongly that either is correct, but I just found this part of the PR surprising (sorry if we decided this a while ago and I'm just forgetting) and want to make sure this is the least surprising option for users

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really just an internal detail

No not really, just poor phrasing. What I mean is "strictly within iceberg land". Trying to emphasize that the kafka record serde language imposes its own compatibility invariants, so on the Iceberg side we can, in principle, stay in the business of aggressively maintaining table compatibility, with the explicit assumption that record compat semantics are handled elsewhere.

if the intention here is really to work around how strict Avro is

Sort of. I think we're still working around an impedance mismatch between Avro or protobuf (or JSON as the case may be) compat vs Iceberg compat.

require evolution of Avro schemas to be in the form of adding null-unions

is a good example. For Avro serde purposes {"name": "foo", "type": ["null", "int"]} w/ a null default is effectively an optional field called "foo". BUT in Iceberg schema we translate that into a required field "foo" of type struct<union_opt_1:int> where union_opt_1 is non-required.

We could do something like make the null-union structs optional during type resolution, but it still feels like we're smashing abstractions together in an uncomfortable way.

not allow evolving Avro schemas

Right. My read is that we should either be maximally restrictive or maximally permissive to avoid reasoning about (and documenting) these special cases.

My intuitive preference is to start with the permissive version because it feels more "capable" in a product sense. My argument to "generality" is that we're translating records that are already subject to an SR schema (w/ associated compat rules), so the schema language's requiredness invariants have already been applied prior to translation. So even if the columns are "nullable" for compat purposes, the values stored there are still subject to the semantics of the corresponding record stream.

sorry if we decided this a while ago and I'm just forgetting

not at all. this came up during implementation and I think now is the time to settle it. might be easier to chat through face to face (and maybe pull in @mattschumpert and anyone else interested).

Copy link
Member Author

@oleiman oleiman Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, other thing i wanted to mention is that the databricks connector does this (configurable) for the same(ish) reason, which is where i got the idea. So not without precedent 🤷‍♂️

Config is iceberg.tables.schema-force-optional per https://github.com/databricks/iceberg-kafka-connect#configuration

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the details here!

but it still feels like we're smashing abstractions together in an uncomfortable way.

Yea generally I agree that mixing Iceberg and Avro/Kafka semantics feels pretty ick. At this point in the data path, it feels cleaner to think of the question, "what should my Iceberg table do?" without thinking about Kafka schema semantics at all.

My intuitive preference is to start with the permissive version because it feels more "capable" in a product sense.

Ah yea that's a fair point, though there's a case to be made that it may be less problematic to do the less surprising, more limited thing first, and then extend our capabilities later with some additional config.

so the schema language's requiredness invariants have already been applied prior to translation

Yea this is also fair. The remaining question though is whether requiredness of Kafka schema holds meaning to query engine users.

I guess another option to consider: what about only allowing adding required fields that have defaults? I seem to recall some work around serializing values moving in that direction anyway. Did it turn out to be not viable?

Let's set up some time to chat about this -- it'd be good to get product's thoughts around users' table expectations and default behaviors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I'll try to set something up for tomorrow.

whether requiredness of Kafka schema holds meaning to query engine users

Yeah, it's an open question from my POV. I imagine there are distinct personas here - e.g.

  • "analyst Iceberg end user for whom table schema is the source of truth"
  • "infra engineer who may directly control both the source (topic) and sink (table)".

do the less surprising thing

100% agree that should be the goal, and I think we get to draw the map here (within reason) since the ecosystem is evolving so quickly. But yeah, mostly a product question at this point.

what about only allowing adding required fields that have defaults?

That should be the north star I think [1]. The tricky bit is how to specify defaults - do they come from the source schema? Or maybe we do what protobuf does (i.e. a string's default is always the empty string, default bool is false, etc)? I think we talked about this last week and decided to punt for GA, but the value serialization code should be fit for purpose.

[1] Also note that the spec requires default values for any non-optional field added to an existing schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe we do what protobuf does (i.e. a string's default is always the empty string, default bool is false, etc)?

Just FYI it's possible to override the default value in protobuf. We should be respecting that.

TBH, I'm a fan of making everything nullable (not required). It looks like the kafka connector has an option for this: iceberg.tables.schema-force-optional, FWIW Snowflake's connector seems to always add new columns as null and not mess with defaults.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we reached as clear consensus as I would have hoped, but that's ok. My read is that the permissive approach should be sufficient (in a product sense) for initial GA, modulo any lingering concerns from this thread.

I expect we'll be asked to support a more strict adherence to record schema before long, with requiredness and defaults and whatnot, but I don't think we're painting ourselves into a corner w/ the proposed implementation. Happy to hear differently from anyone though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I think from @mattschumpert the thought was that this is a good default, which is the more important thing. We can also ad options to tune this behavior for customers later on.

src/v/iceberg/datatypes_json.cc Outdated Show resolved Hide resolved
Comment on lines +45 to +49
// - the table's last_column_id increases monotonically
// - carry-over fields in a compat-checked schema already have
// IDs assigned
// - those carry-over IDs are all strictly <= table.last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not suggesting we change even if it isn't the case, but just curious if these are the same invariants Iceberg's Java impl has?

Copy link
Member Author

@oleiman oleiman Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which part? (2) and (3) are implementation details viz. 2) we update incoming fields w/ existing IDs and 3) that the last column ID of the table is correct with respect to the columns therein

(1) is perhaps expressed here:

https://github.com/apache/iceberg/blob/8e456aeeabd0a40b23864edadd622b45cb44572c/core/src/main/java/org/apache/iceberg/TableMetadata.java#L577-L581

Which matches the new logic in table_update_applier.

tests/rptest/tests/datalake/schema_evolution_test.py Outdated Show resolved Hide resolved
@oleiman oleiman force-pushed the dlib/core-8530/schema-manager-integration branch from 7d81009 to 17eee07 Compare January 29, 2025 01:17
@oleiman
Copy link
Member Author

oleiman commented Jan 29, 2025

force push CR comments

@oleiman oleiman requested review from andrwng and rockwotj January 29, 2025 01:18
Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thanks, I have a few comments and questions, mostly around tests, but that's not necessarily blocking...

src/v/datalake/record_translator.cc Outdated Show resolved Hide resolved
src/v/iceberg/datatypes_json.cc Outdated Show resolved Hide resolved
total: int = 0


class SchemaEvolutionE2ETests(RedpandaTest):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot here for these tests. The tests themselves are great, but I'd love to be able to describe them more concisely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to have tests that have a higher level description and are translated into the specifics. As an example if we had something like this:

COMPATIBLE_CHANGES = {
  "add_column": EvolutionTest(
    type = "avro",
    initial_schema = {
    "type": "record",
    "name": "VerifierRecord",
    "fields": [
        {"name":"foo", "type": "string"},
        {
            "name": "bar",
            "type": {
                "type": "record",
                "name": "bar",
                "fields": [
                    {"name": "qux", "type": "int"},
                    {"name": "thud", "type": "bool"}
                ]
            }
        }
    ]},
    next_schema = {
    "type": "record",
    "name": "VerifierRecord",
    "fields": [
        {"name":"foo", "type": "string"},
        {
            "name": "bar",
            "type": {
                "type": "record",
                "name": "bar",
                "fields": [
                    {"name": "qux", "type": "int"},
                    {"name": "thud", "type": "bool"}
                ]
            }
        },
        {"name": "baz", "type": "string}
    ]},
    initial_table = [
     ("foo", "string"),
     ("bar", "struct", (
       "qux", "int",
       "thud", bool",
     )),
    ],
    next_table = [
     ("foo", "string"),
     ("bar", "struct", (
       "qux", "int",
       "thud", bool",
     )),
     ("baz", "string")
    ],
  ),
  "add_nested": EvolutionTest(...),
  ...
}

INCOMPATIBLE_CHANGES = {
  "illegal_promotion": EvolutionTest(...),
  ...,
}

Although even that is probably a lot... I wonder if we can do something like:

  "add_column": EvolutionTest(
    type = "avro",
    initial_schema = {
    "type": "record",
    "name": "VerifierRecord",
    "fields": [
        {"name":"foo", "type": "string"},
        {
            "name": "bar",
            "type": {
                "type": "record",
                "name": "bar",
                "fields": [
                    {"name": "qux", "type": "int"},
                    {"name": "thud", "type": "bool"}
                ]
            }
        }
    ]},
    next_schema = lambda og_schema: og_schema.fields.append({"name": "baz", "type": "string"}),
    initial_table = [
     ("foo", "string"),
     ("bar", "struct", (
       "qux", "int",
       "thud", bool",
     )),
    ],
    next_table = lambda og_schema: og_schmea.append(("baz", "string")),
  ),

Anyways not blocking, but I think that would help from making this file sprawl with all the cases here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I rather like the idea of representing the schema as a python object then evolution as diffs. will be less text, surely.

as far as the test functions themselves, I didn't have much luck distilling the inputs to something more general than "some schemas and some fields", since tests have pretty distinct semantics.

there's a lot of repeated code though. could pull out another context manager for the shared stuff maybe. I'll take another run at it tonight.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐍 🕳️


from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning on having protobuf support here too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah should do. the code we're testing is downstream of schema translation, but there's no harm in more verification. these tests aren't very expensive to run.

may kick it to a follow-up to unblock #24955 though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally 👍

@oleiman oleiman force-pushed the dlib/core-8530/schema-manager-integration branch 2 times, most recently from 4eb456a to 97b3e10 Compare January 30, 2025 08:05
@oleiman
Copy link
Member Author

oleiman commented Jan 30, 2025

force push reworked ducktape tests, removed an old one, couple minor CR comments

@oleiman
Copy link
Member Author

oleiman commented Jan 30, 2025

/dt

@oleiman oleiman force-pushed the dlib/core-8530/schema-manager-integration branch from 97b3e10 to b9ca4b4 Compare January 30, 2025 18:15
For structs, prime the stack with all top-level fields instead of visiting
one by one. This results in fewer allocations.

Signed-off-by: Oren Leiman <[email protected]>
Previously we would assign an ID to every field in the input schema starting
from the minimum valid ID (1).

Instead, we assign IDs _only_ to those fields which did not have one assigned
during compat checks, and (instead of starting from 'id_t{1}') starting from some
ID specified at the interface. In the case of applying a schema update, this
should be one past the last_column_id recorded in table metadata.

Signed-off-by: Oren Leiman <[email protected]>
Header only modules are not subject to dependency checks by bazel, so these
were causing linker errors in a dependent test.
In effect, this allows us to support full field removal.

We still enforce that meta.last increases monotonically. This invariant
is important to avoid reusing column IDs, which would in turn violate
specified schema evolution correctness guarantees.

Signed-off-by: Oren Leiman <[email protected]>
nullability semantics differ significantly between schema definition languages.
without loss of generality, we can internally treat all fields as nullable to
avoid complexity associated with maintaining requiredness invariants across
schema versions.

Signed-off-by: Oren Leiman <[email protected]>
Introduce check_schema_compat, called on the ensure_schema path, to call into
the compatibility layer. Returns an error, or an indication that the schema
changed, or nothing (no schema change).

Also update fill_field_ids to use the compat machinery to support filling field
IDs in the presence of type changes, changes in field order, or records with
missing fields. Errors iff not all destination fields could be filled.

Signed-off-by: Oren Leiman <[email protected]>
test that:
- table schema updates work
- valid evolved schemas are backwards compatible w/ historical rows
- adding columns never results in reading pre-existing values from another column
- removing a column and adding it back in a subsequent update creates a NEW
  column that does not collide with the old 'version'
- attempting to read from a dropped column fails gracefully
- changing the order of columns doesn't change the values associated with the
  a column or field name

Signed-off-by: Oren Leiman <[email protected]>
@oleiman oleiman force-pushed the dlib/core-8530/schema-manager-integration branch from b9ca4b4 to 605246c Compare January 30, 2025 18:35
@oleiman
Copy link
Member Author

oleiman commented Jan 30, 2025

force push rebase dev to fix conflict

@oleiman oleiman requested a review from rockwotj January 30, 2025 19:10
Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

giphy


from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally 👍

],
),
),
"drop_column":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this is too aggressive of behavior, I rarely see people dropping columns and old data in analytical datasets, but that's not really a discussion for this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, do you mean

  1. People won't use this - they will never (or rarely) drop fields from their record schemas
  2. People will use this - they will drop fields from record schemas, but they will expect (or want) different behavior with respect to table schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think #2 and ideally long term we gracefully having the user manually drop the column

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, yeah. Variations on the theme of strict vs flexible interpretation of record schema.

I don't have strong intuition one way or the other. Are we collecting open questions like this somewhere? If not I can start a fresh doc

next_schema: AvroSchema


LEGAL_TEST_CASES = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so great thank you I think its' way easier to grok when everything is colocated together ❤️

table = [(t[0], t[1]) for t in table[1:]]

assert table == expected.table(query_engine), \
str(table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a little more color here about what was expected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protobuf diff is close behind, i'll be sure to improve these assert messages in there.

Comment on lines +528 to +529
assert all(r[1] is None for r in select_out[:count * 2])
assert all(r[1] is not None for r in select_out[count * 2:])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add assertion messages

@oleiman oleiman merged commit d082177 into redpanda-data:dev Jan 31, 2025
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants