From 598341080a307fb3c3c6cd9d33779ab256a59530 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Fri, 17 Jan 2025 13:26:31 -0800 Subject: [PATCH] dt/dl: Integration testing for schema compat integration test that: - table schema updates work - valid evolved schemas are backwards compatible w/ historical rows - adding columns never results in reading pre-existing values from another column - removing a column and adding it back in a subsequent update creates a NEW column that does not collide with the old 'version' - attempting to read from a dropped column fails gracefully - changing the order of columns doesn't change the values associated with the a column or field name Signed-off-by: Oren Leiman --- .../tests/datalake/schema_evolution_test.py | 659 +++++++++++++++++- 1 file changed, 643 insertions(+), 16 deletions(-) diff --git a/tests/rptest/tests/datalake/schema_evolution_test.py b/tests/rptest/tests/datalake/schema_evolution_test.py index 6a6543d3d0004..f81e80efd31e9 100644 --- a/tests/rptest/tests/datalake/schema_evolution_test.py +++ b/tests/rptest/tests/datalake/schema_evolution_test.py @@ -7,10 +7,17 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 import tempfile +from time import time +from typing import NamedTuple +from collections.abc import Callable +import pyhive +from confluent_kafka import avro +from confluent_kafka.avro import AvroProducer +from confluent_kafka.schema_registry import SchemaRegistryClient from rptest.clients.rpk import RpkTool from rptest.services.cluster import cluster -from rptest.services.redpanda import SISettings, SchemaRegistryConfig +from rptest.services.redpanda import PandaproxyConfig, SISettings, SchemaRegistryConfig from rptest.services.redpanda_connect import RedpandaConnectService from rptest.tests.datalake.datalake_services import DatalakeServices from rptest.tests.datalake.datalake_verifier import DatalakeVerifier @@ -19,6 +26,7 @@ from rptest.tests.datalake.utils import supported_storage_types from rptest.util import expect_exception from ducktape.mark import matrix +from ducktape.errors import TimeoutError schema_avro = """ { @@ -32,6 +40,19 @@ { "name": "ordinal", "type": "int" + }, + { + "name": "some_struct", + "type": { + "type": "record", + "name": "nested", + "fields": [ + { + "name": "nested_1", + "type": "int" + } + ] + } } ] } @@ -43,16 +64,36 @@ "name": "VerifierRecord", "fields": [ { - "name": "verifier_string", - "type": "string" + "name": "another", + "type": [ + "null", + "float" + ], + "default": null }, { "name": "ordinal", "type": "long" }, { - "name": "another", - "type": "float" + "name": "some_struct", + "type": { + "type": "record", + "name": "nested", + "fields": [ + { + "name": "nested_1", + "type": "long" + }, + { + "name": "nested_2", + "type": { + "type": "array", + "items": "int" + } + } + ] + } } ] } @@ -92,10 +133,123 @@ } """ +single_nested_avro = """ +{ + "type": "record", + "name": "VerifierRecord", + "fields": [ + { + "name": "some_struct", + "type": { + "type": "record", + "name": "nested", + "fields": [ + { + "name": "nested_1", + "type": "int" + } + ] + } + } + ] +} +""" + +drop_nested_avro = """ +{ + "type": "record", + "name": "VerifierRecord", + "fields": [ + { + "name": "some_struct", + "type": { + "type": "record", + "name": "nested", + "fields": [ + { + "name": "nested_2", + "type": "int" + } + ] + } + } + ] +} +""" -class SchemaEvolutionTest(RedpandaTest): +reintroduce_nested_avro = """ +{ + "type": "record", + "name": "VerifierRecord", + "fields": [ + { + "name": "some_struct", + "type": { + "type": "record", + "name": "nested", + "fields": [ + { + "name": "nested_1", + "type": "int" + }, + { + "name": "nested_2", + "type": "int" + } + ] + } + } + ] +} +""" + +two_field_avro = """ +{ + "type": "record", + "name": "VerifierRecord", + "fields": [ + { + "name": "positive", + "type": "int" + }, + { + "name": "negative", + "type": "int" + } + ] +} +""" + +two_field_reorder_avro = """ +{ + "type": "record", + "name": "VerifierRecord", + "fields": [ + { + "name": "negative", + "type": "int" + }, + { + "name": "positive", + "type": "int" + }, + { + "name": "another", + "type": "string" + } + ] +} +""" + +QUERY_ENGINES = [ + QueryEngineType.SPARK, + QueryEngineType.TRINO, +] + + +class SchemaEvolutionSmokeTest(RedpandaTest): def __init__(self, test_context): - super(SchemaEvolutionTest, self).__init__( + super(SchemaEvolutionSmokeTest, self).__init__( test_context=test_context, num_brokers=1, si_settings=SISettings(test_context, @@ -147,10 +301,7 @@ def _create_schema(self, subject: str, schema: str, schema_type="avro"): @cluster(num_nodes=4, log_allow_list=["Schema \d+ already exists"]) @matrix( cloud_storage_type=supported_storage_types(), - query_engine=[ - QueryEngineType.SPARK, - QueryEngineType.TRINO, - ], + query_engine=QUERY_ENGINES, ) def test_evolving_avro_schemas(self, cloud_storage_type, query_engine): topic_name = "ducky-topic" @@ -174,9 +325,12 @@ def get_qe(): return dl.trino() self._create_schema("schema_avro", schema_avro) + # drop a field, add an optional field, change order self._create_schema("legal_evo_schema_avro", legal_evo_schema_avro) + # drop the optional field, reintroduce the previously dropped one (it should get a fresh ID) self._create_schema("legal_evo_schema_avro_2", legal_evo_schema_avro_2) + # illegal type promotion string -> double self._create_schema("illegal_evo_schema_avro", illegal_evo_schema_avro) @@ -198,22 +352,26 @@ def run_verifier(schema: str, mapping: str): verifier.wait() run_verifier("schema_avro", - mapping=f""" + mapping=""" root.verifier_string = uuid_v4() root.ordinal = counter() + root.some_struct = {} + root.some_struct.nested_1 = counter() """) # again w/ a new schema (extra field and promoted ordinal) run_verifier("legal_evo_schema_avro", - mapping=f""" - root.verifier_string = uuid_v4() + mapping=""" + root.another = {"float": 12.0} root.ordinal = counter() - root.another = 12.0 + root.some_struct = {} + root.some_struct.nested_1 = counter() + root.some_struct.nested_2 = [counter()] """) # remove the extra field and do it one more time run_verifier("legal_evo_schema_avro_2", - mapping=f""" + mapping=""" root.verifier_string = uuid_v4() root.ordinal = counter() """) @@ -227,3 +385,472 @@ def run_verifier(schema: str, mapping: str): root.verifier_string = 23.0 root.ordinal = counter() """) + + +class Schema(NamedTuple): + schema: str + example: Callable[[float], dict] + spark_rep: list[tuple] + trino_rep: list[tuple] + select: list[str] + + +init_schema = Schema( + schema_avro, + lambda x: { + "verifier_string": str(f"verify-{x}"), + "ordinal": int(x), + "some_struct": { + "nested_1": int(x), + }, + }, + [ + ('verifier_string', 'string', None), + ('ordinal', 'int', None), + ('some_struct', 'struct', None), + ], + [ + ('verifier_string', 'varchar', '', ''), + ('ordinal', 'integer', '', ''), + ('some_struct', 'row(nested_1 integer)', '', ''), + ], + ['verifier_string'], +) + +legal_evo = Schema( + legal_evo_schema_avro, + lambda x: { + "another": float(x), + "ordinal": int(x), + "some_struct": { + "nested_1": int(x), + "nested_2": [ + int(x), + int(x) + 1, + int(x) + 2, + ], + }, + }, + [ + ('another', 'struct', None), + ('ordinal', 'bigint', None), + ('some_struct', 'struct>', None), + ], + [ + ('another', 'row(union_opt_1 real)', '', ''), + ('ordinal', 'bigint', '', ''), + ( + 'some_struct', + 'row(nested_1 bigint, nested_2 array(integer))', + '', + '', + ), + ], + ['some_struct.nested_2'], +) + +illegal_evo = Schema( + illegal_evo_schema_avro, + lambda x: { + "verifier_string": float(x), + "ordinal": int(x), + }, + [ + ('verifier_string', 'double', None), + ('ordinal', 'bigint', None), + ], + [ + ('verifier_string', 'real', '', ''), + ('ordinal', 'bigint', '', ''), + ], + ['verifier_string'], +) + +single_nested = Schema( + single_nested_avro, + lambda x: { + "some_struct": { + "nested_1": int(x), + }, + }, + [('some_struct', 'struct', None)], + [('some_struct', 'row(nested_1 integer)', '', '')], + ['some_struct.nested_1'], +) + +drop_nested = Schema( + drop_nested_avro, + lambda x: { + "some_struct": { + "nested_2": int(x), + }, + }, + [('some_struct', 'struct', None)], + [('some_struct', 'row(nested_2 integer)', '', '')], + ['some_struct.nested_2'], +) + +reintroduce_nested = Schema( + reintroduce_nested_avro, + lambda x: { + "some_struct": { + "nested_1": int(x), + "nested_2": int(x), + }, + }, + [('some_struct', 'struct', None)], + [('some_struct', 'row(nested_1 integer, nested_2 integer)', '', '')], + ['some_struct.nested_1'], +) + +two_field = Schema( + two_field_avro, + lambda x: { + "positive": int(x), + "negative": -int(x), + }, + [('positive', 'int', None), ('negative', 'int', None)], + [('positive', 'integer', '', ''), ('negative', 'integer', '', '')], + ['positive', 'negative'], +) + +two_field_reorder = Schema( + two_field_reorder_avro, + lambda x: { + "negative": -int(x), + "positive": int(x), + "another": f"foo-{int(x)}", + }, + [ + ('negative', 'int', None), + ('positive', 'int', None), + ('another', 'string', None), + ], + [ + ('negative', 'integer', '', ''), + ('positive', 'integer', '', ''), + ('another', 'varchar', '', ''), + ], + ['positive', 'negative'], +) + + +class SchemaEvoTestCase(NamedTuple): + source: Schema + dest: Schema + valid: bool + + +# TODO: could do with some more specific cases viz map keys +# and something more deeply nested +compat_test_cases = { + "add_remove_promote": + SchemaEvoTestCase(init_schema, legal_evo, True), + "illegal promotion string->double": + SchemaEvoTestCase(init_schema, illegal_evo, False), +} + + +# for keeping track of the expected total number of rows across rounds +# of translation (i.e. calls to _produce) +class TranslationContext: + total: int = 0 + + +class SchemaEvolutionE2ETests(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + super(SchemaEvolutionE2ETests, + self).__init__(test_ctx, + num_brokers=1, + si_settings=SISettings(test_context=test_ctx), + extra_rp_conf={ + "iceberg_enabled": "true", + "iceberg_catalog_commit_interval_ms": 5000 + }, + schema_registry_config=SchemaRegistryConfig(), + pandaproxy_config=PandaproxyConfig(), + *args, + **kwargs) + self.test_ctx = test_ctx + self.topic_name = "test" + self.table_name = f"redpanda.{self.topic_name}" + + def setUp(self): + # redpanda will be started by DatalakeServices + pass + + def _produce( + self, + dl: DatalakeServices, + schema: Schema, + count: int, + context: TranslationContext, + should_translate: bool = True, + ): + avro_schema = avro.loads(schema.schema) + producer = AvroProducer( + { + 'bootstrap.servers': self.redpanda.brokers(), + 'schema.registry.url': self.redpanda.schema_reg().split(",")[0] + }, + default_value_schema=avro_schema) + + for i in range(count): + #TODO(oren): use something other than the index here? + record = schema.example(context.total + i) + producer.produce(topic=self.topic_name, value=record) + + producer.flush() + if should_translate: + dl.wait_for_translation(self.topic_name, + msg_count=context.total + count) + context.total = context.total + count + return + + with expect_exception(TimeoutError, lambda _: True): + dl.wait_for_translation(self.topic_name, + msg_count=context.total + count, + timeout=10) + + def _select(self, + dl: DatalakeServices, + query_engine: QueryEngineType, + cols=list[str], + sort_by_offset: bool = True): + qe = dl.spark() if query_engine == QueryEngineType.SPARK else dl.trino( + ) + query = f"select redpanda.offset, {', '.join(cols)} from {self.table_name}" + self.redpanda.logger.debug(f"QUERY: '{query}'") + out = qe.run_query_fetch_all(query) + if sort_by_offset: + out.sort(key=lambda r: r[0]) + return out + + def _check_schema( + self, + dl: DatalakeServices, + query_engine: QueryEngineType, + expected: Schema, + ): + if query_engine == QueryEngineType.SPARK: + spark = dl.spark() + spark_expected_out = expected.spark_rep + spark_describe_out = spark.run_query_fetch_all( + f"describe {self.table_name}") + assert spark_describe_out[1:-3] == spark_expected_out, str( + spark_describe_out) + elif query_engine == QueryEngineType.TRINO: + trino = dl.trino() + trino_expected_out = expected.trino_rep + trino_describe_out = trino.run_query_fetch_all( + f"describe {self.table_name}") + assert trino_describe_out[1:] == trino_expected_out, str( + trino_describe_out) + else: + assert False, f"Unrecognized query engine: {query_engine}" + + def _set_sr_compat(self, level: str): + SchemaRegistryClient({ + 'url': self.redpanda.schema_reg().split(",")[0] + }).set_compatibility(subject_name=f"{self.topic_name}-value", + level=level) + + @cluster(num_nodes=3) + @matrix( + cloud_storage_type=supported_storage_types(), + query_engine=QUERY_ENGINES, + # NOTE: using keys here because we can't pickle the lambda + test_case=list(compat_test_cases.keys()), + ) + def test_describe_schema_change(self, cloud_storage_type, query_engine, + test_case): + """ + Test that schema changes are reflected in 'describe' queries with + various combos of cloud storage and query engine. + """ + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=True, + include_query_engines=[query_engine]) as dl: + dl.create_iceberg_enabled_topic( + self.topic_name, iceberg_mode="value_schema_id_prefix") + + SchemaRegistryClient({ + 'url': + self.redpanda.schema_reg().split(",")[0] + }).set_compatibility(subject_name=f"{self.topic_name}-value", + level="NONE") + + count = 100 + ctx = TranslationContext() + tc = compat_test_cases[test_case] + self._produce(dl, tc.source, count, ctx) + self._check_schema(dl, query_engine, tc.source) + self._produce(dl, tc.dest, count, ctx, tc.valid) + self._check_schema(dl, query_engine, + tc.dest if tc.valid else tc.source) + + @cluster(num_nodes=3) + @matrix( + cloud_storage_type=supported_storage_types(), + query_engine=QUERY_ENGINES, + # NOTE: using keys here because we can't pickle the lambda + test_case=list(compat_test_cases.keys()), + ) + def test_backward_compatible_read(self, cloud_storage_type, query_engine, + test_case): + """ + Test that rows written with schema A are still readable after evolving + the table to schema B. Also check that records produced with an incompatible + schema don't wind up in the main table. + TODO: Integration with DLQ? + """ + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=True, + include_query_engines=[query_engine]) as dl: + dl.create_iceberg_enabled_topic( + self.topic_name, iceberg_mode="value_schema_id_prefix") + + SchemaRegistryClient({ + 'url': + self.redpanda.schema_reg().split(",")[0] + }).set_compatibility(subject_name=f"{self.topic_name}-value", + level="NONE") + + count = 10 + + ctx = TranslationContext() + tc = compat_test_cases[test_case] + self._produce(dl, tc.source, count, ctx) + self._check_schema(dl, query_engine, tc.source) + self._produce(dl, tc.dest, count, ctx, tc.valid) + self._check_schema(dl, query_engine, + tc.dest if tc.valid else tc.source) + + if tc.valid: + select_out = self._select(dl, query_engine, tc.dest.select) + assert len( + select_out + ) == count * 2, f"Expected {count*2} rows, got {select_out}" + assert all(r[1] is None for r in select_out[:count]), \ + f"Expected null in col '{column_name}' for first {count} rows: {select_out}" + assert all( r[1] is not None for r in select_out[count:]), \ + f"Expected non-null in col '{column_name}' for last {count} rows: {select_out}" + else: + select_out = self._select(dl, query_engine, tc.source.select) + assert len( + select_out) == count, f"Unexpected extra rows {select_out}" + + @cluster(num_nodes=3) + @matrix( + cloud_storage_type=supported_storage_types(), + query_engine=QUERY_ENGINES, + ) + def test_dropped_column_no_collision(self, cloud_storage_type, + query_engine): + """ + Translate some records, drop field A, translate some more, reintroduce field A *by name* + (this should create a *new* column). Confirm that 'select A' reads only the new column, + producing nulls for all rows written prior to the final update. + """ + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=True, + include_query_engines=[query_engine]) as dl: + dl.create_iceberg_enabled_topic( + self.topic_name, iceberg_mode="value_schema_id_prefix") + + SchemaRegistryClient({ + 'url': + self.redpanda.schema_reg().split(",")[0] + }).set_compatibility(subject_name=f"{self.topic_name}-value", + level="NONE") + + count = 10 + ctx = TranslationContext() + for schema in [single_nested, drop_nested, reintroduce_nested]: + self._produce(dl, schema, count, ctx) + self._check_schema(dl, query_engine, schema) + column_name = schema.select + select_out = self._select(dl, query_engine, column_name) + assert len(select_out) == ctx.total, \ + f"Expected {ctx.total} rows: {select_out}" + first = ctx.total - count + assert all(r[1] is None for r in select_out[:first]), \ + f"Expected null in col '{column_name}' for first {first} rows: {select_out}" + assert all( r[1] is not None for r in select_out[first:]), \ + f"Expected non-null in col '{column_name}' for last {count} rows: {select_out}" + + @cluster(num_nodes=3) + @matrix( + cloud_storage_type=supported_storage_types(), + query_engine=QUERY_ENGINES, + ) + def test_dropped_column_select_fails(self, cloud_storage_type, + query_engine): + """ + Test that selecting a dropped column fails "gracefully" - or at least + predictably and consistently. + """ + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=True, + include_query_engines=[query_engine]) as dl: + dl.create_iceberg_enabled_topic( + self.topic_name, iceberg_mode="value_schema_id_prefix") + self._set_sr_compat("NONE") + + count = 10 + ctx = TranslationContext() + for schema in [single_nested, drop_nested]: + self._produce(dl, schema, count, ctx) + self._check_schema(dl, query_engine, schema) + + if query_engine == QueryEngineType.SPARK: + with expect_exception( + pyhive.exc.OperationalError, lambda e: + 'FIELD_NOT_FOUND' in e.args[0].status.errorMessage): + self._select(dl, query_engine, single_nested.select) + else: + with expect_exception( + pyhive.exc.DatabaseError, lambda e: e.args[0].get( + 'errorName') == 'INVALID_COLUMN_REFERENCE'): + select_out = self._select(dl, query_engine, + single_nested.select) + + @cluster(num_nodes=3) + @matrix( + cloud_storage_type=supported_storage_types(), + query_engine=QUERY_ENGINES, + ) + def test_reorder_columns(self, cloud_storage_type, query_engine): + """ + Test that changing the order of columns doesn't change the values + associated with a column or field name. + """ + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=True, + include_query_engines=[query_engine]) as dl: + dl.create_iceberg_enabled_topic( + self.topic_name, iceberg_mode="value_schema_id_prefix") + self._set_sr_compat("NONE") + + count = 10 + ctx = TranslationContext() + for schema in [two_field, two_field_reorder]: + self._produce(dl, schema, count, ctx) + self._check_schema(dl, query_engine, schema) + + select_out = self._select(dl, query_engine, + two_field_reorder.select) + + assert len(select_out) == count * 2, \ + f"Expected {count*2} rows, got {len(select_out)}" + + assert all(r[1] == r[0] for r in select_out), \ + f"Expected all '{two_field_reorder.select[0]}' columns to match the offset: {select_out}" + + assert all(r[2] == -r[0] for r in select_out), \ + f"Expected all '{two_field_reorder.select[1]}' columns to match the -offset: {select_out}"