Skip to content

Commit

Permalink
ducktape progress
Browse files Browse the repository at this point in the history
use confluent kafka directly
change up schemas
describe table to see whether a certain change took

TODO: check the actual rows. want to read the old records with the new schema
  • Loading branch information
oleiman committed Jan 19, 2025
1 parent 5c61d98 commit 21d3a90
Showing 1 changed file with 229 additions and 4 deletions.
233 changes: 229 additions & 4 deletions tests/rptest/tests/datalake/schema_evolution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import tempfile
from time import time
from typing import NamedTuple
from collections.abc import Callable

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from rptest.clients.rpk import RpkTool
from rptest.services.cluster import cluster
from rptest.services.redpanda import SISettings, SchemaRegistryConfig
from rptest.services.redpanda import PandaproxyConfig, SISettings, SchemaRegistryConfig
from rptest.services.redpanda_connect import RedpandaConnectService
from rptest.tests.datalake.datalake_services import DatalakeServices
from rptest.tests.datalake.datalake_verifier import DatalakeVerifier
Expand All @@ -19,6 +25,7 @@
from rptest.tests.datalake.utils import supported_storage_types
from rptest.util import expect_exception
from ducktape.mark import matrix
from ducktape.errors import TimeoutError

schema_avro = """
{
Expand All @@ -32,6 +39,19 @@
{
"name": "ordinal",
"type": "int"
},
{
"name": "some_struct",
"type": {
"type": "record",
"name": "nested",
"fields": [
{
"name": "nested_1",
"type": "int"
}
]
}
}
]
}
Expand All @@ -47,11 +67,29 @@
"type": [
"null",
"float"
]
],
"default": null
},
{
"name": "ordinal",
"type": "long"
},
{
"name": "some_struct",
"type": {
"type": "record",
"name": "nested",
"fields": [
{
"name": "nested_1",
"type": "long"
},
{
"name": "nested_2",
"type": "int"
}
]
}
}
]
}
Expand Down Expand Up @@ -92,9 +130,9 @@
"""


class SchemaEvolutionTest(RedpandaTest):
class SchemaEvolutionSmokeTest(RedpandaTest):
def __init__(self, test_context):
super(SchemaEvolutionTest, self).__init__(
super(SchemaEvolutionSmokeTest, self).__init__(
test_context=test_context,
num_brokers=1,
si_settings=SISettings(test_context,
Expand Down Expand Up @@ -228,3 +266,190 @@ def run_verifier(schema: str, mapping: str):
root.verifier_string = 23.0
root.ordinal = counter()
""")


class Schema(NamedTuple):
schema: str
example: Callable[[float], dict]
spark_rep: list[tuple]
trino_rep: list[tuple]


class SchemaEvoTestCase(NamedTuple):
source: Schema
dest: Schema
valid: bool


init_schema = Schema(
schema_avro,
lambda x: {
"verifier_string": str(f"verify-{x}"),
"ordinal": int(x),
"some_struct": {
"nested_1": int(x),
},
},
[
('verifier_string', 'string', None),
('ordinal', 'int', None),
('some_struct', 'struct<nested_1:int>', None),
],
[
('verifier_string', 'varchar', '', ''),
('ordinal', 'integer', '', ''),
('some_struct', 'row(nested_1 int, nested_2 int)', '', ''),
],
)

first_evo = Schema(
legal_evo_schema_avro,
lambda x: {
"another": float(x),
"ordinal": int(x),
"some_struct": {
"nested_1": int(x),
"nested_2": int(x),
},
},
[
('another', 'struct<union_opt_1:float>', None),
('ordinal', 'bigint', None),
('some_struct', 'struct<nested_1:bigint,nested_2:int>', None),
],
[
('another', 'row(union_opt_1 real)', '', ''),
('ordinal', 'bigint', '', ''),
('some_struct', 'row(nested_1 bigint, nested_2 integer)', '', ''),
],
)

illegal_evo = Schema(
illegal_evo_schema_avro,
lambda x: {
"verifier_string": float(x),
"ordinal": int(x),
},
[
('verifier_string', 'double', None),
('ordinal', 'bigint', None),
],
[
('verifier_string', 'real', '', ''),
('ordinal', 'bigint', '', ''),
],
)

test_cases = [
SchemaEvoTestCase(init_schema, first_evo, True),
# SchemaEvoTestCase(init_schema, illegal_evo, False),
]


class SchemaEvolutionE2ETests(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
super(SchemaEvolutionE2ETests,
self).__init__(test_ctx,
num_brokers=1,
si_settings=SISettings(test_context=test_ctx),
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000
},
schema_registry_config=SchemaRegistryConfig(),
pandaproxy_config=PandaproxyConfig(),
*args,
**kwargs)
self.test_ctx = test_ctx
self.topic_name = "test"

def setUp(self):
# redpanda will be started by DatalakeServices
pass

def _produce_and_evolve(
self,
dl: DatalakeServices,
test_case: SchemaEvoTestCase,
count_each: int,
):
total = count_each * 2
schema = avro.loads(test_case.source.schema)
producer = AvroProducer(
{
'bootstrap.servers': self.redpanda.brokers(),
'schema.registry.url': self.redpanda.schema_reg().split(",")[0]
},
default_value_schema=schema)

for _ in range(count_each):
record = test_case.source.example(time())
producer.produce(topic=self.topic_name, value=record)

producer.flush()

schema = avro.loads(test_case.dest.schema)
producer = AvroProducer(
{
'bootstrap.servers': self.redpanda.brokers(),
'schema.registry.url': self.redpanda.schema_reg().split(",")[0]
},
default_value_schema=schema)

for _ in range(count_each):
record = test_case.dest.example(time())
producer.produce(topic=self.topic_name, value=record)

producer.flush()

if test_case.valid:
dl.wait_for_translation(self.topic_name, msg_count=total)
return

with expect_exception(TimeoutError, lambda _: True):
dl.wait_for_translation(self.topic_name,
msg_count=count * 2,
timeout=10)

@cluster(num_nodes=3)
@matrix(
cloud_storage_type=supported_storage_types(),
query_engine=[
QueryEngineType.SPARK,
QueryEngineType.TRINO,
],
test_case=list(range(len(test_cases))),
)
def test_describe_schema_change(self, cloud_storage_type, query_engine,
test_case):
table_name = f"redpanda.{self.topic_name}"
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=True,
include_query_engines=[query_engine]) as dl:
dl.create_iceberg_enabled_topic(
self.topic_name, iceberg_mode="value_schema_id_prefix")

SchemaRegistryClient({
'url':
self.redpanda.schema_reg().split(",")[0]
}).set_compatibility(subject_name=f"{self.topic_name}-value",
level="NONE")

tc = test_cases[test_case]
self._produce_and_evolve(dl, tc, 100)

if query_engine == QueryEngineType.SPARK:
spark = dl.spark()
spark_expected_out = tc.dest.spark_rep if tc.valid else tc.source.spark_rep
spark_describe_out = spark.run_query_fetch_all(
f"describe {table_name}")
assert spark_describe_out[1:-3] == spark_expected_out, str(
spark_describe_out)
else:
trino = dl.trino()
trino_expected_out = tc.dest.trino_rep if tc.valid else tc.source.trino_rep
trino_describe_out = trino.run_query_fetch_all(
f"describe {table_name}")
assert trino_describe_out[1:] == trino_expected_out, str(
trino_describe_out)

0 comments on commit 21d3a90

Please sign in to comment.